You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/10/01 20:52:14 UTC

svn commit: r1392514 - in /cxf/trunk: rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/ systests/transports/ systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/

Author: dkulp
Date: Mon Oct  1 18:52:13 2012
New Revision: 1392514

URL: http://svn.apache.org/viewvc?rev=1392514&view=rev
Log:
[CXF-4525] If the request is retransmittable, we can let the Async client handle the negotiation and use a more optimized way of retransmitting the request.

Modified:
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
    cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java
    cxf/trunk/systests/transports/pom.xml
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Mon Oct  1 18:52:13 2012
@@ -56,6 +56,7 @@ import org.apache.cxf.configuration.jsse
 import org.apache.cxf.helpers.HttpHeaderHelper;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.io.CacheAndWriteOutputStream;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.io.CopyingOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
@@ -169,8 +170,12 @@ public class AsyncHTTPConduit extends UR
             httpRequestMethod = "POST";
             message.put(Message.HTTP_REQUEST_METHOD, httpRequestMethod);
         }
-        CXFHttpRequest e = new CXFHttpRequest(httpRequestMethod);
-        BasicHttpEntity entity = new BasicHttpEntity();
+        final CXFHttpRequest e = new CXFHttpRequest(httpRequestMethod);
+        BasicHttpEntity entity = new BasicHttpEntity() {
+            public boolean isRepeatable() {
+                return e.getOutputStream().retransmitable();
+            }
+        };
         entity.setChunked(true);
         entity.setContentType((String)message.get(Message.CONTENT_TYPE));
         e.setURI(uri);
@@ -247,6 +252,14 @@ public class AsyncHTTPConduit extends UR
             outbuf = new SharedOutputBuffer(16320, allocator);
         }
         
+        public boolean retransmitable() {
+            return cachedStream != null;
+        }
+        public CachedOutputStream getCachedStream() {
+            return cachedStream;
+        }
+
+        
         protected void setProtocolHeaders() throws IOException {
             Headers h = new Headers(outMessage);
             basicEntity.setContentType(h.determineContentType());

Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java Mon Oct  1 18:52:13 2012
@@ -19,9 +19,12 @@
 
 package org.apache.cxf.transport.http.asyncclient;
 
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.http.HttpException;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
@@ -34,6 +37,9 @@ public class CXFHttpAsyncRequestProducer
 
     private final CXFHttpRequest request;
     private final SharedOutputBuffer buf;
+    private volatile CachedOutputStream content;
+    private volatile ByteBuffer buffer;
+    private volatile FileInputStream fis;
     
     public CXFHttpAsyncRequestProducer(final CXFHttpRequest request, final SharedOutputBuffer buf) {
         super();
@@ -61,10 +67,40 @@ public class CXFHttpAsyncRequestProducer
     }
     
     public void produceContent(final ContentEncoder enc, final IOControl ioc) throws IOException {
-        buf.produceContent(enc, ioc);
+        if (content != null) {
+            if (buffer == null) {
+                if (content.getTempFile() == null) {
+                    buffer = ByteBuffer.wrap(content.getBytes());
+                } else {
+                    fis = (FileInputStream)content.getInputStream();
+                    buffer = ByteBuffer.allocate(8 * 1024);
+                }
+            }
+            int i = -1;
+            if (!buffer.hasRemaining() && fis != null) {
+                buffer.reset();
+                i = fis.getChannel().read(buffer);
+                buffer.flip();
+            }
+            enc.write(buffer);
+            if (!buffer.hasRemaining() && i == -1) {
+                enc.complete();
+            }
+        } else {
+            buf.produceContent(enc, ioc);
+        }
     }
     
     public void requestCompleted(final HttpContext context) {
+        if (fis != null) {
+            try {
+                fis.close();
+            } catch (IOException io) {
+                //ignore
+            }
+            fis = null;
+        }
+        buffer = null;
     }
     
     public void failed(final Exception ex) {
@@ -72,15 +108,27 @@ public class CXFHttpAsyncRequestProducer
     }
     
     public boolean isRepeatable() {
-        return false;
+        return request.getOutputStream().retransmitable();
     }
     
     public void resetRequest() throws IOException {
+        if (request.getOutputStream().retransmitable()) {
+            content = request.getOutputStream().getCachedStream();
+        }
     }
 
     @Override
     public void close() throws IOException {
         buf.close();
+        if (fis != null) {
+            try {
+                fis.close();
+            } catch (IOException io) {
+                //ignore
+            }
+            fis = null;
+        }
+        buffer = null;
     }
     
 }

Modified: cxf/trunk/systests/transports/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/pom.xml?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/systests/transports/pom.xml (original)
+++ cxf/trunk/systests/transports/pom.xml Mon Oct  1 18:52:13 2012
@@ -114,6 +114,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-hc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-transports-udp</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -303,17 +308,4 @@
         </dependency>
     </dependencies>
 
-    <profiles>
-        <profile>
-            <id>async</id>
-            <dependencies>
-                <dependency>
-                    <groupId>org.apache.cxf</groupId>
-                    <artifactId>cxf-rt-transports-http-hc</artifactId>
-                    <version>${project.version}</version>
-                </dependency>
-            </dependencies>
-        </profile>
-    </profiles>
-
 </project>

Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java Mon Oct  1 18:52:13 2012
@@ -40,11 +40,14 @@ import org.apache.cxf.phase.Phase;
 import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
 import org.apache.cxf.testutil.common.AbstractClientServerTestBase;
 import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
 import org.apache.cxf.transport.http.auth.DigestAuthSupplier;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.hello_world_soap_http.Greeter;
 import org.apache.hello_world_soap_http.SOAPService;
-import org.junit.Before;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -93,8 +96,7 @@ public class JettyDigestAuthTest extends
                    launchServer(JettyDigestServer.class, true));
     }
 
-    @Before
-    public void setUp() throws Exception {
+    private HTTPConduit setupClient(boolean async) throws Exception {
         URL wsdl = getClass().getResource("/wsdl/hello_world.wsdl");
         greeter = new SOAPService(wsdl, SERVICE_NAME).getPort(Greeter.class);
         BindingProvider bp = (BindingProvider)greeter;
@@ -102,12 +104,26 @@ public class JettyDigestAuthTest extends
         ClientProxy.getClient(greeter).getOutInterceptors().add(new LoggingOutInterceptor()); 
         bp.getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
                                    ADDRESS);
-        bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "ffang");
-        bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "pswd");
         HTTPConduit cond = (HTTPConduit)ClientProxy.getClient(greeter).getConduit();
-        cond.setAuthSupplier(new DigestAuthSupplier());
-        
         HTTPClientPolicy client = new HTTPClientPolicy();
+        cond.setClient(client);
+        if (async) {
+            if (cond instanceof AsyncHTTPConduit) {
+                AsyncHTTPConduit acond = (AsyncHTTPConduit)cond;
+                acond.getClient().setAllowChunking(false);
+                acond.getClient().setAutoRedirect(true);
+                bp.getRequestContext().put(AsyncHTTPConduit.USE_ASYNC, Boolean.TRUE);
+                UsernamePasswordCredentials creds = new UsernamePasswordCredentials("ffang", "pswd");
+                acond.getHttpAsyncClient().getCredentialsProvider().setCredentials(AuthScope.ANY, creds);
+            } else {
+                fail("Not an async conduit");
+            }
+        } else {
+            bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "ffang");
+            bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "pswd");
+            cond.setAuthSupplier(new DigestAuthSupplier());
+        }
+        
         ClientProxy.getClient(greeter).getOutInterceptors()
             .add(new AbstractPhaseInterceptor<Message>(Phase.PRE_STREAM_ENDING) {
                 
@@ -119,18 +135,36 @@ public class JettyDigestAuthTest extends
                 }
             });
         client.setAllowChunking(false);
-        cond.setClient(client);
+        return cond;
     }
 
     @Test
-    public void testDigestAuth() throws Exception { 
+    public void testDigestAuth() throws Exception {
+        //CXF will handle the auth stuff within it's conduit implementation
+        doTest(false);
+    }
+    @Test
+    public void testDigestAuthAsyncClient() throws Exception {
+        //We'll let HTTP async handle it.  Useful for things like NTLM 
+        //which async client can handle but we cannot.
+        doTest(true);
+    }
+    
+    private void doTest(boolean async) throws Exception {
+        HTTPConduit cond = setupClient(async);
         assertEquals("Hello Alice", greeter.greetMe("Alice"));
         assertEquals("Hello Bob", greeter.greetMe("Bob"));
 
         try {
             BindingProvider bp = (BindingProvider)greeter;
-            bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "blah");
-            bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "foo");
+            if (async) {
+                AsyncHTTPConduit acond = (AsyncHTTPConduit)cond;
+                UsernamePasswordCredentials creds = new UsernamePasswordCredentials("blah", "foo");
+                acond.getHttpAsyncClient().getCredentialsProvider().setCredentials(AuthScope.ANY, creds);
+            } else {
+                bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "blah");
+                bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "foo");
+            }
             greeter.greetMe("Alice");
             fail("Password was wrong, should have failed");
         } catch (WebServiceException wse) {