You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/10/08 18:27:00 UTC
svn commit: r1530340 - in /cxf/trunk: parent/
rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/
Author: dkulp
Date: Tue Oct 8 16:27:00 2013
New Revision: 1530340
URL: http://svn.apache.org/r1530340
Log:
Update to the latest http async stuff (SNAPSHOT for now, release should be very shortly) to help test it.
Modified:
cxf/trunk/parent/pom.xml
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java
cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
Modified: cxf/trunk/parent/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/parent/pom.xml?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/parent/pom.xml (original)
+++ cxf/trunk/parent/pom.xml Tue Oct 8 16:27:00 2013
@@ -90,11 +90,11 @@
<cxf.ehcache.version>2.7.4</cxf.ehcache.version>
<cxf.fastinfoset.bundle.version>1.2.7_4</cxf.fastinfoset.bundle.version>
<cxf.hazelcast.version>1.9.4</cxf.hazelcast.version>
- <cxf.httpcomponents.asyncclient.version>4.0-beta3</cxf.httpcomponents.asyncclient.version>
- <cxf.httpcomponents.asyncclient.version.range>[4.0-beta3,4.1)</cxf.httpcomponents.asyncclient.version.range>
- <cxf.httpcomponents.client.version>4.2.5</cxf.httpcomponents.client.version>
- <cxf.httpcomponents.core.version>4.2.4</cxf.httpcomponents.core.version>
- <cxf.httpcomponents.core.version.range>[4.2.1,4.3.0)</cxf.httpcomponents.core.version.range>
+ <cxf.httpcomponents.asyncclient.version>4.0-beta5-SNAPSHOT</cxf.httpcomponents.asyncclient.version>
+ <cxf.httpcomponents.asyncclient.version.range>[4.0-beta4,4.1)</cxf.httpcomponents.asyncclient.version.range>
+ <cxf.httpcomponents.client.version>4.3.1</cxf.httpcomponents.client.version>
+ <cxf.httpcomponents.core.version>4.3</cxf.httpcomponents.core.version>
+ <cxf.httpcomponents.core.version.range>[4.3,4.4.0)</cxf.httpcomponents.core.version.range>
<cxf.james.mim4j.version>0.7.2</cxf.james.mim4j.version>
<cxf.logback.classic.version>1.0.13</cxf.logback.classic.version>
<cxf.log4j.version>1.2.17</cxf.log4j.version>
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java Tue Oct 8 16:27:00 2013
@@ -74,20 +74,20 @@ import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.protocol.ClientContext;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.concurrent.BasicFuture;
import org.apache.http.concurrent.FutureCallback;
-import org.apache.http.conn.params.ConnRouteParams;
+import org.apache.http.config.RegistryBuilder;
import org.apache.http.entity.BasicHttpEntity;
-import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
-import org.apache.http.nio.conn.scheme.AsyncScheme;
-import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
-import org.apache.http.nio.conn.ssl.SSLLayeringStrategy;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.util.HeapByteBufferAllocator;
-import org.apache.http.params.CoreConnectionPNames;
-import org.apache.http.protocol.BasicHttpContext;
/**
*
@@ -98,10 +98,12 @@ public class AsyncHTTPConduit extends UR
final AsyncHTTPConduitFactory factory;
volatile int lastTlsHash = -1;
volatile Object sslState;
+ volatile URI sslURL;
volatile SSLContext sslContext;
- volatile DefaultHttpAsyncClient client;
-
- public AsyncHTTPConduit(Bus b,
+ volatile SSLSession session;
+ volatile CloseableHttpAsyncClient client;
+
+ public AsyncHTTPConduit(Bus b,
EndpointInfo ei,
EndpointReferenceType t,
AsyncHTTPConduitFactory factory) throws IOException {
@@ -109,7 +111,7 @@ public class AsyncHTTPConduit extends UR
this.factory = factory;
}
- public synchronized DefaultHttpAsyncClient getHttpAsyncClient() throws IOException {
+ public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws IOException {
if (client == null) {
client = factory.createClient(this);
}
@@ -195,17 +197,18 @@ public class AsyncHTTPConduit extends UR
e.setURI(uri);
e.setEntity(entity);
-
- // Set socket timeout
- e.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT,
- Integer.valueOf((int) csPolicy.getReceiveTimeout()));
-
+
+ RequestConfig.Builder b = RequestConfig.custom()
+ .setSocketTimeout((int) csPolicy.getReceiveTimeout())
+ .setConnectTimeout((int) csPolicy.getConnectionTimeout());
Proxy p = proxyFactory.createProxy(csPolicy , uri);
if (p != null) {
InetSocketAddress isa = (InetSocketAddress)p.address();
HttpHost proxy = new HttpHost(isa.getHostName(), isa.getPort());
- ConnRouteParams.setDefaultProxy(e.getParams(), proxy);
+ b.setProxy(proxy);
}
+ e.setConfig(b.build());
+
message.put(CXFHttpRequest.class, e);
}
@@ -244,7 +247,6 @@ public class AsyncHTTPConduit extends UR
// Objects for the response
volatile HttpResponse httpResponse;
volatile Exception exception;
- volatile SSLSession session;
private Future<Boolean> connectionFuture;
@@ -417,6 +419,7 @@ public class AsyncHTTPConduit extends UR
wrappedStream = cachedStream;
}
}
+
protected void connect(boolean output) throws IOException {
if (connectionFuture != null) {
return;
@@ -456,65 +459,81 @@ public class AsyncHTTPConduit extends UR
tlsClientParameters = new TLSClientParameters();
}
- BasicHttpContext ctx = new BasicHttpContext();
- if (AsyncHTTPConduit.this.proxyAuthorizationPolicy != null
- && AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName() != null) {
- ctx.setAttribute(ClientContext.CREDS_PROVIDER, new CredentialsProvider() {
- public void setCredentials(AuthScope authscope, Credentials credentials) {
+ HttpClientContext ctx = HttpClientContext.create();
+
+ BasicCredentialsProvider credsProvider = new BasicCredentialsProvider() {
+
+ @Override
+ public Credentials getCredentials(final AuthScope authscope) {
+ Credentials creds = super.getCredentials(authscope);
+ if (creds != null) {
+ return creds;
}
- public Credentials getCredentials(AuthScope authscope) {
+ if (AsyncHTTPConduit.this.proxyAuthorizationPolicy != null
+ && AsyncHTTPConduit.this.proxyAuthorizationPolicy.getUserName() != null) {
return new UsernamePasswordCredentials(AsyncHTTPConduit.this
- .proxyAuthorizationPolicy.getUserName(),
- AsyncHTTPConduit.this.proxyAuthorizationPolicy.getPassword());
- }
- public void clear() {
+ .proxyAuthorizationPolicy.getUserName(),
+ AsyncHTTPConduit.this.proxyAuthorizationPolicy.getPassword());
}
- });
- }
- if (tlsClientParameters != null && tlsClientParameters.hashCode() == lastTlsHash && sslState != null) {
- ctx.setAttribute(ClientContext.USER_TOKEN , sslState);
- }
+ return null;
+ }
+
+ };
+
+ ctx.setCredentialsProvider(credsProvider);
- final AsyncSchemeRegistry reg = new AsyncSchemeRegistry();
- reg.register(new AsyncScheme("http", 80, null));
if ("https".equals(url.getScheme())) {
try {
- final SSLContext sslcontext = getSSLContext();
- reg.register(new AsyncScheme("https", 443, new SSLLayeringStrategy(sslcontext) {
- @Override
- protected void initializeEngine(SSLEngine engine) {
- initializeSSLEngine(sslcontext, engine);
- }
- @Override
- protected void verifySession(final IOSession iosession,
- final SSLSession sslsession) throws SSLException {
- super.verifySession(iosession, sslsession);
- iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
- CXFHttpRequest req = (CXFHttpRequest)iosession
- .removeAttribute(CXFHttpRequest.class.getName());
- if (req != null) {
- req.getOutputStream().setSSLSession(sslsession);
- }
- }
- }));
+ RegistryBuilder<SchemeIOSessionStrategy> regBuilder
+ = RegistryBuilder.<SchemeIOSessionStrategy>create()
+ .register("http", NoopIOSessionStrategy.INSTANCE);
+
+
+ TLSClientParameters tlsClientParameters = getTlsClientParameters();
+ if (tlsClientParameters == null) {
+ tlsClientParameters = new TLSClientParameters();
+ }
+ final SSLContext sslcontext = getSSLContext(tlsClientParameters);
+ regBuilder
+ .register("https",
+ new SSLIOSessionStrategy(sslcontext) {
+ @Override
+ protected void initializeEngine(SSLEngine engine) {
+ initializeSSLEngine(sslcontext, engine);
+ }
+ @Override
+ protected void verifySession(final HttpHost host,
+ final IOSession iosession,
+ final SSLSession sslsession) throws SSLException {
+ iosession.setAttribute("cxf.handshake.done", Boolean.TRUE);
+ setSSLSession(sslsession);
+ }
+ });
+ ctx.setAttribute("http.iosession-factory-registry", regBuilder.build());
} catch (GeneralSecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
+ }
+
+
+ if (sslURL != null && !sslURL.equals(url)) {
+ sslURL = null;
+ sslState = null;
+ session = null;
}
- ctx.setAttribute(ClientContext.SCHEME_REGISTRY, reg);
+ if (tlsClientParameters != null && tlsClientParameters.hashCode() == lastTlsHash) {
+ ctx.setUserToken(sslState);
+ }
+
connectionFuture = new BasicFuture<Boolean>(callback);
- DefaultHttpAsyncClient c = getHttpAsyncClient();
- CredentialsProvider credProvider = c.getCredentialsProvider();
+ HttpAsyncClient c = getHttpAsyncClient();
Credentials creds = (Credentials)outMessage.getContextualProperty(Credentials.class.getName());
- if (creds != null && credProvider != null) {
- credProvider.setCredentials(AuthScope.ANY, creds);
+ if (creds != null) {
+ credsProvider.setCredentials(AuthScope.ANY, creds);
+ ctx.setUserToken(creds.getUserPrincipal());
}
- if (credProvider != null && credProvider.getCredentials(AuthScope.ANY) != null) {
- ctx.setAttribute(ClientContext.USER_TOKEN,
- credProvider.getCredentials(AuthScope.ANY).getUserPrincipal());
- }
-
+
c.execute(new CXFHttpAsyncRequestProducer(entity, outbuf),
new CXFHttpAsyncResponseConsumer(this, inbuf, responseCallback),
ctx,
@@ -768,6 +787,8 @@ public class AsyncHTTPConduit extends UR
exception = null;
connectionFuture = null;
session = null;
+ sslState = null;
+ sslURL = null;
//reset the buffers
HeapByteBufferAllocator allocator = new HeapByteBufferAllocator();
@@ -790,6 +811,7 @@ public class AsyncHTTPConduit extends UR
session = sslsession;
synchronized (sessionLock) {
sslState = sslsession.getLocalPrincipal();
+ sslURL = url;
sessionLock.notifyAll();
}
}
@@ -797,13 +819,11 @@ public class AsyncHTTPConduit extends UR
}
- public synchronized SSLContext getSSLContext() throws GeneralSecurityException {
- TLSClientParameters tlsClientParameters = getTlsClientParameters();
- if (tlsClientParameters == null) {
- tlsClientParameters = new TLSClientParameters();
- }
+ public synchronized SSLContext getSSLContext(TLSClientParameters tlsClientParameters)
+ throws GeneralSecurityException {
+
int hash = tlsClientParameters.hashCode();
- if (hash == lastTlsHash) {
+ if (hash == lastTlsHash && sslContext != null) {
return sslContext;
}
@@ -825,6 +845,8 @@ public class AsyncHTTPConduit extends UR
sslContext = ctx;
lastTlsHash = hash;
sslState = null;
+ sslURL = null;
+ session = null;
return ctx;
}
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitFactory.java Tue Oct 8 16:27:00 2013
@@ -20,7 +20,6 @@
package org.apache.cxf.transport.http.asyncclient;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -35,52 +34,33 @@ import org.apache.cxf.service.model.Endp
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transport.http.HTTPConduitFactory;
import org.apache.cxf.transport.http.HTTPTransportFactory;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
-import org.apache.http.HttpResponseFactory;
-import org.apache.http.HttpVersion;
import org.apache.http.ProtocolException;
import org.apache.http.client.RedirectStrategy;
import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.protocol.RequestAuthCache;
-import org.apache.http.client.protocol.RequestClientConnControl;
-import org.apache.http.client.protocol.RequestDefaultHeaders;
-import org.apache.http.client.protocol.RequestProxyAuthentication;
-import org.apache.http.client.protocol.RequestTargetAuthentication;
-import org.apache.http.impl.DefaultHttpResponseFactory;
-import org.apache.http.impl.client.EntityEnclosingRequestWrapper;
-import org.apache.http.impl.client.ProxyAuthenticationStrategy;
-import org.apache.http.impl.client.TargetAuthenticationStrategy;
-import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
-import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
-import org.apache.http.impl.nio.conn.DefaultClientAsyncConnection;
-import org.apache.http.impl.nio.conn.PoolingClientAsyncConnectionManager;
+import org.apache.http.config.ConnectionConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.cookie.Cookie;
+import org.apache.http.impl.client.BasicCookieStore;
+import org.apache.http.impl.conn.DefaultSchemePortResolver;
+import org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
-import org.apache.http.nio.conn.ClientAsyncConnection;
-import org.apache.http.nio.conn.ClientAsyncConnectionFactory;
-import org.apache.http.nio.conn.scheme.AsyncScheme;
-import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
-import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
-import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.conn.ManagedNHttpClientConnection;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.nio.reactor.IOSession;
-import org.apache.http.nio.reactor.ssl.SSLIOSession;
-import org.apache.http.nio.util.ByteBufferAllocator;
-import org.apache.http.nio.util.HeapByteBufferAllocator;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
-import org.apache.http.params.HttpProtocolParams;
-import org.apache.http.params.SyncBasicHttpParams;
-import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.HttpContext;
-import org.apache.http.protocol.RequestContent;
-import org.apache.http.protocol.RequestExpectContinue;
-import org.apache.http.protocol.RequestTargetHost;
-import org.apache.http.protocol.RequestUserAgent;
/**
*
@@ -114,76 +94,55 @@ public class AsyncHTTPConduitFactory imp
ALWAYS, ASYNC_ONLY, NEVER
};
-
- final IOReactorConfig config = new IOReactorConfig();
- volatile ConnectingIOReactor ioReactor;
- volatile PoolingClientAsyncConnectionManager connectionManager;
-
+ volatile PoolingNHttpClientConnectionManager connectionManager;
+ volatile CloseableHttpAsyncClient client;
+
boolean isShutdown;
UseAsyncPolicy policy;
int maxConnections = 5000;
int maxPerRoute = 1000;
int connectionTTL = 60000;
-
- // these have per-instance Logger instances that have sync methods to setup.
- private final TargetAuthenticationStrategy targetAuthenticationStrategy = new TargetAuthenticationStrategy();
- private final ProxyAuthenticationStrategy proxyAuthenticationStrategy = new ProxyAuthenticationStrategy();
- private final BasicHttpProcessor httpproc;
-
+ int ioThreadCount = IOReactorConfig.DEFAULT.getIoThreadCount();
+ long selectInterval = IOReactorConfig.DEFAULT.getSelectInterval();
+ boolean interestOpQueued = IOReactorConfig.DEFAULT.isInterestOpQueued();
+ int soLinger = IOReactorConfig.DEFAULT.getSoLinger();
+ int soTimeout = IOReactorConfig.DEFAULT.getSoTimeout();
+ boolean soKeepalive = IOReactorConfig.DEFAULT.isSoKeepalive();
+ boolean tcpNoDelay = true;
+
AsyncHTTPConduitFactory() {
super();
- httpproc = new BasicHttpProcessor();
- httpproc.addInterceptor(new RequestDefaultHeaders());
- // Required protocol interceptors
- httpproc.addInterceptor(new RequestContent());
- httpproc.addInterceptor(new RequestTargetHost());
- // Recommended protocol interceptors
- httpproc.addInterceptor(new RequestClientConnControl());
- httpproc.addInterceptor(new RequestUserAgent());
- httpproc.addInterceptor(new RequestExpectContinue());
- // HTTP authentication interceptors
- httpproc.addInterceptor(new RequestAuthCache());
- httpproc.addInterceptor(new RequestTargetAuthentication());
- httpproc.addInterceptor(new RequestProxyAuthentication());
-
}
+
public AsyncHTTPConduitFactory(Map<String, Object> conf) {
this();
- config.setTcpNoDelay(true);
setProperties(conf);
}
-
public AsyncHTTPConduitFactory(Bus b) {
this();
addListener(b);
- config.setTcpNoDelay(true);
setProperties(b.getProperties());
}
-
- public BasicHttpProcessor getDefaultHttpProcessor() {
- return httpproc;
- }
-
public UseAsyncPolicy getUseAsyncPolicy() {
return policy;
}
public void update(Map<String, Object> props) {
- if (setProperties(props) && ioReactor != null) {
+ if (setProperties(props) && client != null) {
restartReactor();
}
}
+
private void restartReactor() {
- ConnectingIOReactor ioReactor2 = ioReactor;
- PoolingClientAsyncConnectionManager connectionManager2 = connectionManager;
+ CloseableHttpAsyncClient client2 = client;
resetVars();
- shutdown(ioReactor2, connectionManager2);
+ shutdown(client2);
}
private synchronized void resetVars() {
- ioReactor = null;
+ client = null;
connectionManager = null;
}
@@ -206,6 +165,7 @@ public class AsyncHTTPConduitFactory imp
maxConnections = getInt(s.get(MAX_CONNECTIONS), maxConnections);
connectionTTL = getInt(s.get(CONNECTION_TTL), connectionTTL);
maxPerRoute = getInt(s.get(MAX_PER_HOST_CONNECTIONS), maxPerRoute);
+
if (connectionManager != null) {
connectionManager.setMaxTotal(maxConnections);
connectionManager.setDefaultMaxPerRoute(maxPerRoute);
@@ -214,33 +174,33 @@ public class AsyncHTTPConduitFactory imp
//properties that need a restart of the reactor
boolean changed = false;
- int i = config.getIoThreadCount();
- config.setIoThreadCount(getInt(s.get(THREAD_COUNT), Runtime.getRuntime().availableProcessors()));
- changed |= i != config.getIoThreadCount();
+ int i = ioThreadCount;
+ ioThreadCount = getInt(s.get(THREAD_COUNT), Runtime.getRuntime().availableProcessors());
+ changed |= i != ioThreadCount;
- long l = config.getSelectInterval();
- config.setSelectInterval(getInt(s.get(SELECT_INTERVAL), 1000));
- changed |= l != config.getSelectInterval();
-
- i = config.getSoLinger();
- config.setSoLinger(getInt(s.get(SO_LINGER), -1));
- changed |= i != config.getSoLinger();
-
- i = config.getSoTimeout();
- config.setSoTimeout(getInt(s.get(SO_TIMEOUT), 0));
- changed |= i != config.getSoTimeout();
-
- boolean b = config.isInterestOpQueued();
- config.setInterestOpQueued(getBoolean(s.get(INTEREST_OP_QUEUED), false));
- changed |= b != config.isInterestOpQueued();
+ long l = selectInterval;
+ selectInterval = getInt(s.get(SELECT_INTERVAL), 1000);
+ changed |= l != selectInterval;
+
+ i = soLinger;
+ soLinger = getInt(s.get(SO_LINGER), -1);
+ changed |= i != soLinger;
+
+ i = soTimeout;
+ soTimeout = getInt(s.get(SO_TIMEOUT), 0);
+ changed |= i != soTimeout;
+
+ boolean b = interestOpQueued;
+ interestOpQueued = getBoolean(s.get(INTEREST_OP_QUEUED), false);
+ changed |= b != interestOpQueued;
- b = config.isTcpNoDelay();
- config.setTcpNoDelay(getBoolean(s.get(TCP_NODELAY), true));
- changed |= b != config.isTcpNoDelay();
-
- b = config.isSoKeepalive();
- config.setSoKeepalive(getBoolean(s.get(SO_KEEPALIVE), false));
- changed |= b != config.isSoKeepalive();
+ b = tcpNoDelay;
+ tcpNoDelay = getBoolean(s.get(TCP_NODELAY), true);
+ changed |= b != tcpNoDelay;
+
+ b = soKeepalive;
+ soKeepalive = getBoolean(s.get(SO_KEEPALIVE), false);
+ changed |= b != soKeepalive;
return changed;
}
@@ -292,136 +252,89 @@ public class AsyncHTTPConduitFactory imp
public void setBus(Bus b) {
addListener(b);
}
+
public void initComplete() {
}
+
public synchronized void preShutdown() {
shutdown();
}
+
public void postShutdown() {
}
public void shutdown() {
- if (ioReactor != null) {
- shutdown(ioReactor, connectionManager);
+ if (client != null) {
+ shutdown(client);
connectionManager = null;
- ioReactor = null;
+ client = null;
}
isShutdown = true;
}
- private static void shutdown(ConnectingIOReactor ioReactor2,
- PoolingClientAsyncConnectionManager connectionManager2) {
-
+
+ private static void shutdown(CloseableHttpAsyncClient client) {
try {
- connectionManager2.shutdown();
+ client.close();
} catch (IOException e1) {
e1.printStackTrace();
}
- try {
- ioReactor2.shutdown();
- } catch (IOException e) {
- e.printStackTrace();
- }
}
private void addListener(Bus b) {
b.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(this);
}
-
-
- public synchronized void setupNIOClient() throws IOReactorException {
- if (connectionManager != null) {
+
+ public synchronized void setupNIOClient(HTTPClientPolicy clientPolicy) throws IOReactorException {
+ if (client != null) {
return;
}
- // Create client-side I/O reactor
- final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(new HttpAsyncRequestExecutor(),
- new BasicHttpParams());
- ioReactor = new DefaultConnectingIOReactor(config);
-
- // Run the I/O reactor in a separate thread
- Thread t = new Thread(new Runnable() {
+ IOReactorConfig config = IOReactorConfig.custom()
+ .setIoThreadCount(ioThreadCount)
+ .setSelectInterval(selectInterval)
+ .setInterestOpQueued(interestOpQueued)
+ .setSoLinger(soLinger)
+ .setSoTimeout(soTimeout)
+ .setSoKeepAlive(soKeepalive)
+ .setTcpNoDelay(tcpNoDelay)
+ .build();
+
+ Registry<SchemeIOSessionStrategy> ioSessionFactoryRegistry = RegistryBuilder.<SchemeIOSessionStrategy>create()
+ .register("http", NoopIOSessionStrategy.INSTANCE)
+ .register("https", SSLIOSessionStrategy.getSystemDefaultStrategy())
+ .build();
- public void run() {
- try {
- // Ready to go!
- ioReactor.execute(ioEventDispatch);
- } catch (InterruptedIOException ex) {
- System.err.println("Interrupted");
- } catch (IOException e) {
- System.err.println("I/O error: " + e.getMessage());
- }
- }
- });
- // Start the client thread
- t.start();
-
- AsyncSchemeRegistry registry = new AsyncSchemeRegistry();
- registry.register(new AsyncScheme("http", 80, null));
- registry.register(new AsyncScheme("https", 443, null));
+ ManagedNHttpClientConnectionFactory connectionFactory = new ManagedNHttpClientConnectionFactory() {
- connectionManager = new PoolingClientAsyncConnectionManager(ioReactor, registry,
- connectionTTL, TimeUnit.MILLISECONDS) {
@Override
- protected ClientAsyncConnectionFactory createClientAsyncConnectionFactory() {
- final HttpResponseFactory responseFactory = new DefaultHttpResponseFactory();
- final ByteBufferAllocator allocator = new HeapByteBufferAllocator();
-
- return new ClientAsyncConnectionFactory() {
- @Override
- public ClientAsyncConnection create(String id, IOSession iosession, HttpParams params) {
- return new DefaultClientAsyncConnection(id, iosession,
- responseFactory,
- allocator, params) {
- @Override
- protected void onRequestSubmitted(HttpRequest request) {
- super.onRequestSubmitted(request);
- if (request instanceof EntityEnclosingRequestWrapper) {
- request = ((EntityEnclosingRequestWrapper)request).getOriginal();
- }
- if (getIOSession() instanceof SSLIOSession) {
- SSLIOSession sslio = (SSLIOSession)getIOSession();
- getIOSession().setAttribute(CXFHttpRequest.class.getName(), request);
- if (getIOSession().getAttribute("cxf.handshake.done") != null) {
- ((CXFHttpRequest)request).getOutputStream()
- .setSSLSession(sslio.getSSLSession());
- }
- }
- }
- };
- }
- };
+ public ManagedNHttpClientConnection create(final IOSession iosession, final ConnectionConfig config) {
+ ManagedNHttpClientConnection conn = super.create(iosession, config);
+ return conn;
}
-
};
+
+ DefaultConnectingIOReactor ioreactor = new DefaultConnectingIOReactor(config);
+ connectionManager = new PoolingNHttpClientConnectionManager(
+ ioreactor,
+ connectionFactory,
+ ioSessionFactoryRegistry,
+ DefaultSchemePortResolver.INSTANCE,
+ SystemDefaultDnsResolver.INSTANCE,
+ connectionTTL, TimeUnit.MILLISECONDS);
+
connectionManager.setDefaultMaxPerRoute(maxPerRoute);
connectionManager.setMaxTotal(maxConnections);
- }
-
- public DefaultHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException {
- if (connectionManager == null) {
- setupNIOClient();
- }
-
- DefaultHttpAsyncClient dhac = new DefaultHttpAsyncClient(connectionManager) {
- @Override
- protected HttpParams createHttpParams() {
- HttpParams params = new SyncBasicHttpParams();
- HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
- HttpConnectionParams.setTcpNoDelay(params, true);
- int bufSize = c.getClient().getChunkLength() > 0 ? c.getClient().getChunkLength() : 16332;
- HttpConnectionParams.setSocketBufferSize(params, bufSize);
- HttpConnectionParams.setConnectionTimeout(params, (int)c.getClient().getConnectionTimeout());
- return params;
- }
- @Override
- protected BasicHttpProcessor createHttpProcessor() {
- return httpproc;
- }
- };
- //CXF handles redirects ourselves
- dhac.setRedirectStrategy(new RedirectStrategy() {
+
+ ConnectionConfig connectionConfig = ConnectionConfig.custom()
+ .setBufferSize(clientPolicy.getChunkLength() > 0 ? clientPolicy.getChunkLength() : 16332)
+ .build();
+
+ connectionManager.setDefaultConnectionConfig(connectionConfig);
+
+ RedirectStrategy redirectStrategy = new RedirectStrategy() {
+
public boolean isRedirected(HttpRequest request, HttpResponse response, HttpContext context)
throws ProtocolException {
return false;
@@ -430,12 +343,26 @@ public class AsyncHTTPConduitFactory imp
throws ProtocolException {
return null;
}
- });
- dhac.setTargetAuthenticationStrategy(targetAuthenticationStrategy);
- dhac.setProxyAuthenticationStrategy(proxyAuthenticationStrategy);
- return dhac;
- }
+ };
+ client = HttpAsyncClients.custom()
+ .setConnectionManager(connectionManager)
+ .setRedirectStrategy(redirectStrategy)
+ .setDefaultCookieStore(new BasicCookieStore() {
+ private static final long serialVersionUID = 1L;
+ public void addCookie(Cookie cookie) {
+ }
+ })
+ .build();
+ // Start the client thread
+ client.start();
+ }
+ public CloseableHttpAsyncClient createClient(final AsyncHTTPConduit c) throws IOException {
+ if (connectionManager == null) {
+ setupNIOClient(c.getClient());
+ }
+ return client;
+ }
}
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/CXFHttpRequest.java Tue Oct 8 16:27:00 2013
@@ -28,24 +28,26 @@ import org.apache.http.HttpEntityEnclosi
import org.apache.http.HttpVersion;
import org.apache.http.ProtocolVersion;
import org.apache.http.RequestLine;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.Configurable;
import org.apache.http.message.AbstractHttpMessage;
import org.apache.http.message.BasicRequestLine;
import org.apache.http.protocol.HTTP;
-public class CXFHttpRequest extends AbstractHttpMessage implements HttpEntityEnclosingRequest {
+public class CXFHttpRequest extends AbstractHttpMessage implements HttpEntityEnclosingRequest, Configurable {
private final String method;
private URI uri;
private HttpEntity entity;
private AsyncWrappedOutputStream out;
+ private RequestConfig config;
public CXFHttpRequest(final String method) {
super();
this.method = method;
}
-
public void setOutputStream(AsyncWrappedOutputStream o) {
out = o;
}
@@ -90,4 +92,13 @@ public class CXFHttpRequest extends Abst
return expect != null && HTTP.EXPECT_CONTINUE.equalsIgnoreCase(expect.getValue());
}
+ @Override
+ public RequestConfig getConfig() {
+ return config;
+ }
+
+ public void setConfig(RequestConfig config) {
+ this.config = config;
+ }
+
}
Modified: cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java?rev=1530340&r1=1530339&r2=1530340&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java (original)
+++ cxf/trunk/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/SharedInputBuffer.java Tue Oct 8 16:27:00 2013
@@ -102,10 +102,10 @@ public class SharedInputBuffer extends E
while ((bytesRead = decoder.read(this.waitingBuffer)) > 0) {
totalRead += bytesRead;
}
- } else {
- while ((bytesRead = decoder.read(this.buffer)) > 0) {
- totalRead += bytesRead;
- }
+ }
+ //read more
+ while ((bytesRead = decoder.read(this.buffer)) > 0) {
+ totalRead += bytesRead;
}
if (bytesRead == -1 || decoder.isCompleted()) {
this.endOfStream = true;