You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/30 19:53:26 UTC

svn commit: r1379038 - in /cxf/sandbox/dkulp_async_clients/http-hc/src: main/java/org/apache/cxf/transport/http/asyncclient/ main/resources/ main/resources/META-INF/ main/resources/META-INF/cxf/ test/java/org/apache/cxf/transport/http/asyncclient/

Author: dkulp
Date: Thu Aug 30 17:53:26 2012
New Revision: 1379038

URL: http://svn.apache.org/viewvc?rev=1379038&view=rev
Log:
Update to use the new conduit factory mechanism.
Auto-switch to async conduit for async methods, stick with URL conduit otherwise.
Get all the systest/jaxws tests passing with this.

Added:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt
Removed:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPTransportFactory.java
Modified:
    cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1379038&r1=1379037&r2=1379038&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Thu Aug 30 17:53:26 2012
@@ -34,8 +34,8 @@ import org.apache.cxf.io.CacheAndWriteOu
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transport.http.Headers;
+import org.apache.cxf.transport.http.URLConnectionHTTPConduit;
 import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.version.Version;
@@ -51,17 +51,29 @@ import org.apache.http.protocol.BasicHtt
 /**
  * 
  */
-public class AsyncHTTPConduit extends HTTPConduit {
+public class AsyncHTTPConduit extends URLConnectionHTTPConduit {
+    public static final String USE_ASYNC = "use.async.http.conduit";
 
-    AsyncHTTPTransportFactory factory;
+    AsyncHTTPConduitFactory factory;
     
     public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t,
-                            AsyncHTTPTransportFactory factory) throws IOException {
+                            AsyncHTTPConduitFactory factory) throws IOException {
         super(b, ei, t);
         this.factory = factory;
     }
 
     protected void setupConnection(Message message, URI uri, HTTPClientPolicy csPolicy) throws IOException {
+        Object o = message.getContextualProperty(USE_ASYNC);
+        if (o == null) {
+            o = !message.getExchange().isSynchronous();
+        }
+        if (!MessageUtils.isTrue(o)) {
+            message.put(USE_ASYNC, Boolean.FALSE);
+            super.setupConnection(message, uri, csPolicy);
+            return;
+        }
+        message.put(USE_ASYNC, Boolean.TRUE);
+        
         String httpRequestMethod = 
             (String)message.get(Message.HTTP_REQUEST_METHOD);
         if (httpRequestMethod == null) {
@@ -87,13 +99,17 @@ public class AsyncHTTPConduit extends HT
                                               boolean needToCacheRequest, 
                                               boolean isChunking,
                                               int chunkThreshold) {
-        CXFHttpRequest entity = message.get(CXFHttpRequest.class);
-        return new AsyncWrappedOutputStream(message,
-                                            needToCacheRequest, 
-                                            isChunking,
-                                            chunkThreshold,
-                                            getConduitName(),
-                                            entity.getURI().toString());
+        if (Boolean.TRUE.equals(message.get(USE_ASYNC))) {
+            CXFHttpRequest entity = message.get(CXFHttpRequest.class);
+            return new AsyncWrappedOutputStream(message,
+                                                needToCacheRequest, 
+                                                isChunking,
+                                                chunkThreshold,
+                                                getConduitName(),
+                                                entity.getURI().toString());
+            
+        }
+        return super.createOutputStream(message, needToCacheRequest, isChunking, chunkThreshold);
     }
     
     
@@ -231,7 +247,7 @@ public class AsyncHTTPConduit extends HT
             if (isAsync) {
                 //got a response, need to start the response processing now
                 try {
-                    handleResponseOnWorkqueue(false);
+                    handleResponseOnWorkqueue(false, true);
                     isAsync = false; // don't trigger another start on next block. :-)
                 } catch (Exception ex) {
                     ex.printStackTrace();
@@ -241,6 +257,15 @@ public class AsyncHTTPConduit extends HT
         }
         protected synchronized void setException(Exception ex) {
             exception = ex;
+            if (isAsync) {
+                //got a response, need to start the response processing now
+                try {
+                    handleResponseOnWorkqueue(false, true);
+                    isAsync = false; // don't trigger another start on next block. :-)
+                } catch (Exception ex2) {
+                    ex2.printStackTrace();
+                }
+            }
             notifyAll();
         }
 

Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1379038&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java Thu Aug 30 17:53:26 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.asyncclient;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import javax.annotation.Resource;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.transport.http.HTTPTransportFactory.HTTPConduitFactory;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
+import org.apache.http.impl.nio.pool.BasicNIOConnFactory;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOReactorException;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+
+/**
+ * 
+ */
+@NoJSR250Annotations(unlessNull = "bus")
+public class AsyncHTTPConduitFactory implements BusLifeCycleListener, HTTPConduitFactory {
+    CXFAsyncRequester requester;
+    CXFConnectionManager connManager;
+    
+    public AsyncHTTPConduitFactory() {
+        super();
+    }
+    public AsyncHTTPConduitFactory(Bus b) {
+        addListener(b);
+    }
+
+    @Override
+    public HTTPConduit createConduit(HTTPTransportFactory f, EndpointInfo localInfo,
+                                     EndpointReferenceType target) throws IOException {
+        return new AsyncHTTPConduit(f.getBus(), localInfo, target, this);
+    }
+
+    @Resource 
+    public void setBus(Bus b) {
+        addListener(b);
+    }
+    public void initComplete() {
+    }
+    public synchronized void preShutdown() {
+        if (connManager != null) {
+            try {
+                connManager.shutdown(1000);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+    public void postShutdown() {
+    }    
+    
+    private void addListener(Bus b) {
+        b.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(this);
+    }
+    
+    
+    public synchronized void setupNIOClient() throws IOReactorException {
+        if (requester != null) {
+            return;
+        }
+     // HTTP parameters for the client
+        HttpParams params = new BasicHttpParams();
+        params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 16 * 1024);
+        // Create HTTP protocol processing chain
+        BasicHttpProcessor httpproc = new BasicHttpProcessor();
+        httpproc.addInterceptor(new RequestContent());
+        httpproc.addInterceptor(new RequestTargetHost());
+        httpproc.addInterceptor(new RequestConnControl());
+        httpproc.addInterceptor(new RequestExpectContinue());
+
+        // Create client-side HTTP protocol handler
+        CXFAsyncRequestExecutor protocolHandler = new CXFAsyncRequestExecutor();
+        // Create client-side I/O event dispatch
+        CXFPlainConnectionFactory plainConnFactory = new CXFPlainConnectionFactory(params);
+        CXFSSLConnectionFactory sslConnFactory = new CXFSSLConnectionFactory(params);
+        final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, 
+                plainConnFactory);
+        // Create client-side I/O reactor
+        IOReactorConfig config = new IOReactorConfig();
+        config.setTcpNoDelay(true);
+        
+        final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);
+        // Create HTTP connection pool
+        BasicNIOConnFactory poolConnFactory = new BasicNIOConnFactory(
+                plainConnFactory, sslConnFactory);
+        connManager = new CXFConnectionManager(ioReactor, poolConnFactory, params);
+        connManager.setDefaultMaxPerRoute(1000);
+        connManager.setMaxTotal(5000);
+
+        // Run the I/O reactor in a separate thread
+        Thread t = new Thread(new Runnable() {
+
+            public void run() {
+                try {
+                    // Ready to go!
+                    ioReactor.execute(ioEventDispatch);
+                } catch (InterruptedIOException ex) {
+                    System.err.println("Interrupted");
+                } catch (IOException e) {
+                    System.err.println("I/O error: " + e.getMessage());
+                }
+            }
+
+        });
+        // Start the client thread
+        t.start();
+        
+        requester = new CXFAsyncRequester(connManager, httpproc, 
+                new DefaultConnectionReuseStrategy(), params);
+    }
+    
+    public CXFAsyncRequester getRequester() throws IOException {
+        if (requester == null) {
+            setupNIOClient();
+        }
+
+        return requester;
+    }
+
+
+
+}

Added: cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt?rev=1379038&view=auto
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt (added)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/main/resources/META-INF/cxf/bus-extensions.txt Thu Aug 30 17:53:26 2012
@@ -0,0 +1,2 @@
+org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory:org.apache.cxf.transport.http.HTTPTransportFactory$HTTPConduitFactory:true
+

Modified: cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java?rev=1379038&r1=1379037&r2=1379038&view=diff
==============================================================================
--- cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java (original)
+++ cxf/sandbox/dkulp_async_clients/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java Thu Aug 30 17:53:26 2012
@@ -56,7 +56,7 @@ public class AsyncHTTPConduitTest extend
     @BeforeClass
     public static void start() throws Exception {
         Bus b = createStaticBus();
-        new AsyncHTTPTransportFactory(b);
+        new AsyncHTTPConduitFactory(b);
         ep = Endpoint.publish("http://localhost:" + PORT + "/SoapContext/SoapPort",
                               new org.apache.hello_world_soap_http.GreeterImpl() {
                 public String greetMeLater(long cnt) {