You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/30 19:53:26 UTC
svn commit: r1379038 - in /cxf/sandbox/dkulp_async_clients/http-hc/src:
main/java/org/apache/cxf/transport/http/asyncclient/ main/resources/
main/resources/META-INF/ main/resources/META-INF/cxf/
test/java/org/apache/cxf/transport/http/asyncclient/
Author: dkulp
Date: Thu Aug 30 17:53:26 2012
New Revision: 1379038
URL: http://svn.apache.org/viewvc?rev=1379038&view=rev
Log:
Update to use the new conduit factory mechanism.
Auto-switch to async conduit for async methods, stick with URL conduit otherwise.
Get all the systest/jaxws tests passing with this.
Added:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/
cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt
Removed:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Modified:
cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1379038&r1=1379037&r2=1379038&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Thu Aug 30 17:53:26 2012
@@ -34,8 +34,8 @@ import org.apache.cxf.io.CacheAndWriteOu
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.Headers;
+import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.version.Version;
@@ -51,17 +51,29 @@ import org.apache.http.protocol.BasicHtt
/**
*
*/
-public class AsyncHTTPConduit extends HTTPConduit {
+public class AsyncHTTPConduit extends URLConnectionHTTPConduit {
+ public static final String USE_ASYNC = "use.async.http.conduit";
- AsyncHTTPTransportFactory factory;
+ AsyncHTTPConduitFactory factory;
public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t,
- AsyncHTTPTransportFactory factory) throws IOException {
+ AsyncHTTPConduitFactory factory) throws IOException {
super(b, ei, t);
this.factory = factory;
}
protected void setupConnection(Message message, URI uri, HTTPClientPolicy csPolicy) throws IOException {
+ Object o = message.getContextualProperty(USE_ASYNC);
+ if (o == null) {
+ o = !message.getExchange().isSynchronous();
+ }
+ if (!MessageUtils.isTrue(o)) {
+ message.put(USE_ASYNC, Boolean.FALSE);
+ super.setupConnection(message, uri, csPolicy);
+ return;
+ }
+ message.put(USE_ASYNC, Boolean.TRUE);
+
String httpRequestMethod =
(String)message.get(Message.HTTP_REQUEST_METHOD);
if (httpRequestMethod == null) {
@@ -87,13 +99,17 @@ public class AsyncHTTPConduit extends HT
boolean needToCacheRequest,
boolean isChunking,
int chunkThreshold) {
- CXFHttpRequest entity = message.get(CXFHttpRequest.class);
- return new AsyncWrappedOutputStream(message,
- needToCacheRequest,
- isChunking,
- chunkThreshold,
- getConduitName(),
- entity.getURI().toString());
+ if (Boolean.TRUE.equals(message.get(USE_ASYNC))) {
+ CXFHttpRequest entity = message.get(CXFHttpRequest.class);
+ return new AsyncWrappedOutputStream(message,
+ needToCacheRequest,
+ isChunking,
+ chunkThreshold,
+ getConduitName(),
+ entity.getURI().toString());
+
+ }
+ return super.createOutputStream(message, needToCacheRequest, isChunking, chunkThreshold);
}
@@ -231,7 +247,7 @@ public class AsyncHTTPConduit extends HT
if (isAsync) {
//got a response, need to start the response processing now
try {
- handleResponseOnWorkqueue(false);
+ handleResponseOnWorkqueue(false, true);
isAsync = false; // don't trigger another start on next block. :-)
} catch (Exception ex) {
ex.printStackTrace();
@@ -241,6 +257,15 @@ public class AsyncHTTPConduit extends HT
}
protected synchronized void setException(Exception ex) {
exception = ex;
+ if (isAsync) {
+ //got a response, need to start the response processing now
+ try {
+ handleResponseOnWorkqueue(false, true);
+ isAsync = false; // don't trigger another start on next block. :-)
+ } catch (Exception ex2) {
+ ex2.printStackTrace();
+ }
+ }
notifyAll();
}
Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1379038&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java Thu Aug 30 17:53:26 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import javax.annotation.Resource;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.transport.http.HTTPTransportFactory.HTTPConduitFactory;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnFactory;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+
+/**
+ *
+ */
+@NoJSR250Annotations(unlessNull = "bus")
+public class AsyncHTTPConduitFactory implements BusLifeCycleListener, HTTPConduitFactory {
+ CXFAsyncRequester requester;
+ CXFConnectionManager connManager;
+
+ public AsyncHTTPConduitFactory() {
+ super();
+ }
+ public AsyncHTTPConduitFactory(Bus b) {
+ addListener(b);
+ }
+
+ @Override
+ public HTTPConduit createConduit(HTTPTransportFactory f, EndpointInfo localInfo,
+ EndpointReferenceType target) throws IOException {
+ return new AsyncHTTPConduit(f.getBus(), localInfo, target, this);
+ }
+
+ @Resource
+ public void setBus(Bus b) {
+ addListener(b);
+ }
+ public void initComplete() {
+ }
+ public synchronized void preShutdown() {
+ if (connManager != null) {
+ try {
+ connManager.shutdown(1000);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ public void postShutdown() {
+ }
+
+ private void addListener(Bus b) {
+ b.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(this);
+ }
+
+
+ public synchronized void setupNIOClient() throws IOReactorException {
+ if (requester != null) {
+ return;
+ }
+ // HTTP parameters for the client
+ HttpParams params = new BasicHttpParams();
+ params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 16 * 1024);
+ // Create HTTP protocol processing chain
+ BasicHttpProcessor httpproc = new BasicHttpProcessor();
+ httpproc.addInterceptor(new RequestContent());
+ httpproc.addInterceptor(new RequestTargetHost());
+ httpproc.addInterceptor(new RequestConnControl());
+ httpproc.addInterceptor(new RequestExpectContinue());
+
+ // Create client-side HTTP protocol handler
+ CXFAsyncRequestExecutor protocolHandler = new CXFAsyncRequestExecutor();
+ // Create client-side I/O event dispatch
+ CXFPlainConnectionFactory plainConnFactory = new CXFPlainConnectionFactory(params);
+ CXFSSLConnectionFactory sslConnFactory = new CXFSSLConnectionFactory(params);
+ final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler,
+ plainConnFactory);
+ // Create client-side I/O reactor
+ IOReactorConfig config = new IOReactorConfig();
+ config.setTcpNoDelay(true);
+
+ final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);
+ // Create HTTP connection pool
+ BasicNIOConnFactory poolConnFactory = new BasicNIOConnFactory(
+ plainConnFactory, sslConnFactory);
+ connManager = new CXFConnectionManager(ioReactor, poolConnFactory, params);
+ connManager.setDefaultMaxPerRoute(1000);
+ connManager.setMaxTotal(5000);
+
+ // Run the I/O reactor in a separate thread
+ Thread t = new Thread(new Runnable() {
+
+ public void run() {
+ try {
+ // Ready to go!
+ ioReactor.execute(ioEventDispatch);
+ } catch (InterruptedIOException ex) {
+ System.err.println("Interrupted");
+ } catch (IOException e) {
+ System.err.println("I/O error: " + e.getMessage());
+ }
+ }
+
+ });
+ // Start the client thread
+ t.start();
+
+ requester = new CXFAsyncRequester(connManager, httpproc,
+ new DefaultConnectionReuseStrategy(), params);
+ }
+
+ public CXFAsyncRequester getRequester() throws IOException {
+ if (requester == null) {
+ setupNIOClient();
+ }
+
+ return requester;
+ }
+
+
+
+}
Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt?rev=1379038&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt Thu Aug 30 17:53:26 2012
@@ -0,0 +1,2 @@
+org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory:org.apache.cxf.transport.http.HTTPTransportFactory$HTTPConduitFactory:true
+
Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java?rev=1379038&r1=1379037&r2=1379038&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java Thu Aug 30 17:53:26 2012
@@ -56,7 +56,7 @@ public class AsyncHTTPConduitTest extend
@BeforeClass
public static void start() throws Exception {
Bus b = createStaticBus();
- new AsyncHTTPTransportFactory(b);
+ new AsyncHTTPConduitFactory(b);
ep = Endpoint.publish("http://localhost:" + PORT + "/SoapContext/SoapPort",
new org.apache.hello_world_soap_http.GreeterImpl() {
public String greetMeLater(long cnt) {