You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/10/01 20:52:14 UTC
svn commit: r1392514 - in /cxf/trunk:
rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/
systests/transports/
systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/
Author: dkulp
Date: Mon Oct 1 18:52:13 2012
New Revision: 1392514
URL: http://svn.apache.org/viewvc?rev=1392514&view=rev
Log:
[CXF-4525] If the request is retransmittable, we can let the Async client handle the negotiation and use a more optimized way of retransmitting the request.
Modified:
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java
cxf/trunk/systests/transports/pom.xml
cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Mon Oct 1 18:52:13 2012
@@ -56,6 +56,7 @@ import org.apache.cxf.configuration.jsse
import org.apache.cxf.helpers.HttpHeaderHelper;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.io.CacheAndWriteOutputStream;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.io.CopyingOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
@@ -169,8 +170,12 @@ public class AsyncHTTPConduit extends UR
httpRequestMethod = "POST";
message.put(Message.HTTP_REQUEST_METHOD, httpRequestMethod);
}
- CXFHttpRequest e = new CXFHttpRequest(httpRequestMethod);
- BasicHttpEntity entity = new BasicHttpEntity();
+ final CXFHttpRequest e = new CXFHttpRequest(httpRequestMethod);
+ BasicHttpEntity entity = new BasicHttpEntity() {
+ public boolean isRepeatable() {
+ return e.getOutputStream().retransmitable();
+ }
+ };
entity.setChunked(true);
entity.setContentType((String)message.get(Message.CONTENT_TYPE));
e.setURI(uri);
@@ -247,6 +252,14 @@ public class AsyncHTTPConduit extends UR
outbuf = new SharedOutputBuffer(16320, allocator);
}
+ public boolean retransmitable() {
+ return cachedStream != null;
+ }
+ public CachedOutputStream getCachedStream() {
+ return cachedStream;
+ }
+
+
protected void setProtocolHeaders() throws IOException {
Headers h = new Headers(outMessage);
basicEntity.setContentType(h.determineContentType());
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpAsyncRequestProducer.java Mon Oct 1 18:52:13 2012
@@ -19,9 +19,12 @@
package org.apache.cxf.transport.http.asyncclient;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
+import java.nio.ByteBuffer;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
@@ -34,6 +37,9 @@ public class CXFHttpAsyncRequestProducer
private final CXFHttpRequest request;
private final SharedOutputBuffer buf;
+ private volatile CachedOutputStream content;
+ private volatile ByteBuffer buffer;
+ private volatile FileInputStream fis;
public CXFHttpAsyncRequestProducer(final CXFHttpRequest request, final SharedOutputBuffer buf) {
super();
@@ -61,10 +67,40 @@ public class CXFHttpAsyncRequestProducer
}
public void produceContent(final ContentEncoder enc, final IOControl ioc) throws IOException {
- buf.produceContent(enc, ioc);
+ if (content != null) {
+ if (buffer == null) {
+ if (content.getTempFile() == null) {
+ buffer = ByteBuffer.wrap(content.getBytes());
+ } else {
+ fis = (FileInputStream)content.getInputStream();
+ buffer = ByteBuffer.allocate(8 * 1024);
+ }
+ }
+ int i = -1;
+ if (!buffer.hasRemaining() && fis != null) {
+ buffer.reset();
+ i = fis.getChannel().read(buffer);
+ buffer.flip();
+ }
+ enc.write(buffer);
+ if (!buffer.hasRemaining() && i == -1) {
+ enc.complete();
+ }
+ } else {
+ buf.produceContent(enc, ioc);
+ }
}
public void requestCompleted(final HttpContext context) {
+ if (fis != null) {
+ try {
+ fis.close();
+ } catch (IOException io) {
+ //ignore
+ }
+ fis = null;
+ }
+ buffer = null;
}
public void failed(final Exception ex) {
@@ -72,15 +108,27 @@ public class CXFHttpAsyncRequestProducer
}
public boolean isRepeatable() {
- return false;
+ return request.getOutputStream().retransmitable();
}
public void resetRequest() throws IOException {
+ if (request.getOutputStream().retransmitable()) {
+ content = request.getOutputStream().getCachedStream();
+ }
}
@Override
public void close() throws IOException {
buf.close();
+ if (fis != null) {
+ try {
+ fis.close();
+ } catch (IOException io) {
+ //ignore
+ }
+ fis = null;
+ }
+ buffer = null;
}
}
Modified: cxf/trunk/systests/transports/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/pom.xml?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/systests/transports/pom.xml (original)
+++ cxf/trunk/systests/transports/pom.xml Mon Oct 1 18:52:13 2012
@@ -114,6 +114,11 @@
</dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-hc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-transports-udp</artifactId>
<version>${project.version}</version>
</dependency>
@@ -303,17 +308,4 @@
</dependency>
</dependencies>
- <profiles>
- <profile>
- <id>async</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.cxf</groupId>
- <artifactId>cxf-rt-transports-http-hc</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
-
</project>
Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java?rev=1392514&r1=1392513&r2=1392514&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java (original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/http_jetty/JettyDigestAuthTest.java Mon Oct 1 18:52:13 2012
@@ -40,11 +40,14 @@ import org.apache.cxf.phase.Phase;
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
import org.apache.cxf.testutil.common.AbstractClientServerTestBase;
import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
import org.apache.cxf.transport.http.auth.DigestAuthSupplier;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.hello_world_soap_http.Greeter;
import org.apache.hello_world_soap_http.SOAPService;
-import org.junit.Before;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+
import org.junit.BeforeClass;
import org.junit.Test;
@@ -93,8 +96,7 @@ public class JettyDigestAuthTest extends
launchServer(JettyDigestServer.class, true));
}
- @Before
- public void setUp() throws Exception {
+ private HTTPConduit setupClient(boolean async) throws Exception {
URL wsdl = getClass().getResource("/wsdl/hello_world.wsdl");
greeter = new SOAPService(wsdl, SERVICE_NAME).getPort(Greeter.class);
BindingProvider bp = (BindingProvider)greeter;
@@ -102,12 +104,26 @@ public class JettyDigestAuthTest extends
ClientProxy.getClient(greeter).getOutInterceptors().add(new LoggingOutInterceptor());
bp.getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
ADDRESS);
- bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "ffang");
- bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "pswd");
HTTPConduit cond = (HTTPConduit)ClientProxy.getClient(greeter).getConduit();
- cond.setAuthSupplier(new DigestAuthSupplier());
-
HTTPClientPolicy client = new HTTPClientPolicy();
+ cond.setClient(client);
+ if (async) {
+ if (cond instanceof AsyncHTTPConduit) {
+ AsyncHTTPConduit acond = (AsyncHTTPConduit)cond;
+ acond.getClient().setAllowChunking(false);
+ acond.getClient().setAutoRedirect(true);
+ bp.getRequestContext().put(AsyncHTTPConduit.USE_ASYNC, Boolean.TRUE);
+ UsernamePasswordCredentials creds = new UsernamePasswordCredentials("ffang", "pswd");
+ acond.getHttpAsyncClient().getCredentialsProvider().setCredentials(AuthScope.ANY, creds);
+ } else {
+ fail("Not an async conduit");
+ }
+ } else {
+ bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "ffang");
+ bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "pswd");
+ cond.setAuthSupplier(new DigestAuthSupplier());
+ }
+
ClientProxy.getClient(greeter).getOutInterceptors()
.add(new AbstractPhaseInterceptor<Message>(Phase.PRE_STREAM_ENDING) {
@@ -119,18 +135,36 @@ public class JettyDigestAuthTest extends
}
});
client.setAllowChunking(false);
- cond.setClient(client);
+ return cond;
}
@Test
- public void testDigestAuth() throws Exception {
+ public void testDigestAuth() throws Exception {
+ //CXF will handle the auth stuff within it's conduit implementation
+ doTest(false);
+ }
+ @Test
+ public void testDigestAuthAsyncClient() throws Exception {
+ //We'll let HTTP async handle it. Useful for things like NTLM
+ //which async client can handle but we cannot.
+ doTest(true);
+ }
+
+ private void doTest(boolean async) throws Exception {
+ HTTPConduit cond = setupClient(async);
assertEquals("Hello Alice", greeter.greetMe("Alice"));
assertEquals("Hello Bob", greeter.greetMe("Bob"));
try {
BindingProvider bp = (BindingProvider)greeter;
- bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "blah");
- bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "foo");
+ if (async) {
+ AsyncHTTPConduit acond = (AsyncHTTPConduit)cond;
+ UsernamePasswordCredentials creds = new UsernamePasswordCredentials("blah", "foo");
+ acond.getHttpAsyncClient().getCredentialsProvider().setCredentials(AuthScope.ANY, creds);
+ } else {
+ bp.getRequestContext().put(BindingProvider.USERNAME_PROPERTY, "blah");
+ bp.getRequestContext().put(BindingProvider.PASSWORD_PROPERTY, "foo");
+ }
greeter.greetMe("Alice");
fail("Password was wrong, should have failed");
} catch (WebServiceException wse) {