You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2010/09/10 20:22:06 UTC
svn commit: r995917 - in /cxf/branches/async-client:
api/src/main/java/org/apache/cxf/io/
api/src/main/java/org/apache/cxf/transport/
rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/
rt/transports/http/ rt/transports/http/src/main/java/org/...
Author: dkulp
Date: Fri Sep 10 18:22:05 2010
New Revision: 995917
URL: http://svn.apache.org/viewvc?rev=995917&view=rev
Log:
Some work for using http-client for some async client stuff
Added:
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java (with props)
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java (with props)
Modified:
cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java
cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java
cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
cxf/branches/async-client/rt/transports/http/pom.xml
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java
cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java
cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java
Modified: cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java (original)
+++ cxf/branches/async-client/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java Fri Sep 10 18:22:05 2010
@@ -35,6 +35,10 @@ public abstract class AbstractWrappedOut
protected AbstractWrappedOutputStream() {
super();
}
+ protected AbstractWrappedOutputStream(OutputStream wrapped) {
+ super();
+ wrappedStream = wrapped;
+ }
@Override
public void write(byte[] b, int off, int len) throws IOException {
Modified: cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java (original)
+++ cxf/branches/async-client/api/src/main/java/org/apache/cxf/transport/TransportURIResolver.java Fri Sep 10 18:22:05 2010
@@ -105,6 +105,7 @@ public class TransportURIResolver extend
Message message = new MessageImpl();
Exchange exch = new ExchangeImpl();
message.setExchange(exch);
+ exch.put(Bus.class, bus);
message.put(Message.HTTP_REQUEST_METHOD, "GET");
c.setMessageObserver(new MessageObserver() {
@@ -116,11 +117,22 @@ public class TransportURIResolver extend
c.close(message);
} catch (IOException e) {
//ignore
+ message.getExchange().put(Exception.class, e);
+ } finally {
+ synchronized (message.getExchange()) {
+ message.getExchange().notifyAll();
+ }
}
}
});
- c.prepare(message);
- c.close(message);
+ synchronized (exch) {
+ c.prepare(message);
+ c.close(message);
+ if (exch.get(InputStream.class) == null
+ && exch.get(Exception.class) == null) {
+ exch.wait();
+ }
+ }
InputStream ins = exch.get(InputStream.class);
resourceOpened.addElement(ins);
InputSource src = new InputSource(ins);
@@ -132,6 +144,7 @@ public class TransportURIResolver extend
}
} catch (Exception e) {
//ignore
+ e.printStackTrace();
}
}
if (is == null
Modified: cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java (original)
+++ cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/AbstractClient.java Fri Sep 10 18:22:05 2010
@@ -312,7 +312,7 @@ public class AbstractClient implements C
protected ResponseBuilder setResponseBuilder(HttpURLConnection conn, Exchange exchange) throws Throwable {
Message inMessage = exchange.getInMessage();
if (conn == null) {
- throw new WebApplicationException();
+ throw new WebApplicationException(Response.noContent().build());
}
Integer responseCode = (Integer)exchange.get(Message.RESPONSE_CODE);
if (responseCode == null) {
@@ -578,6 +578,8 @@ public class AbstractClient implements C
MultivaluedMap<String, String> headers,
URI currentURI) {
Message m = cfg.getConduitSelector().getEndpoint().getBinding().createMessage();
+ m.put("force.http.url.connection", Boolean.TRUE);
+
m.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
m.put(Message.INBOUND_MESSAGE, Boolean.FALSE);
Modified: cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java (original)
+++ cxf/branches/async-client/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java Fri Sep 10 18:22:05 2010
@@ -640,6 +640,8 @@ public class WebClient extends AbstractC
rb.entity(entity);
return rb.build();
+ } catch (WebApplicationException ex) {
+ throw ex;
} catch (Throwable ex) {
throw new WebApplicationException(ex);
}
Modified: cxf/branches/async-client/rt/transports/http/pom.xml
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/pom.xml?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/pom.xml (original)
+++ cxf/branches/async-client/rt/transports/http/pom.xml Fri Sep 10 18:22:05 2010
@@ -68,6 +68,17 @@
<artifactId>${servlet-api.artifact}</artifactId>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore-nio</artifactId>
+ <version>4.1-beta2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.1-alpha2</version>
+ </dependency>
</dependencies>
<build>
Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/ClientOnlyHTTPTransportFactory.java Fri Sep 10 18:22:05 2010
@@ -28,6 +28,7 @@ import org.apache.cxf.common.injection.N
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.ConduitInitiator;
+import org.apache.cxf.transport.http.async.AsyncHTTPConduit;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
@NoJSR250Annotations(unlessNull = "bus")
@@ -60,9 +61,15 @@ public class ClientOnlyHTTPTransportFact
EndpointInfo endpointInfo,
EndpointReferenceType target
) throws IOException {
+ /*
HTTPConduit conduit = target == null
? new HTTPConduit(bus, endpointInfo)
: new HTTPConduit(bus, endpointInfo, target);
+ */
+ HTTPConduit conduit = target == null
+ ? new AsyncHTTPConduit(bus, endpointInfo)
+ : new AsyncHTTPConduit(bus, endpointInfo, target);
+
// Spring configure the conduit.
String address = conduit.getAddress();
if (address != null && address.indexOf('?') != -1) {
Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Cookie.java Fri Sep 10 18:22:05 2010
@@ -26,7 +26,7 @@ import java.util.Map;
* session state.
*
*/
-class Cookie {
+public class Cookie {
public static final String DISCARD_ATTRIBUTE = "discard";
public static final String MAX_AGE_ATTRIBUTE = "max-age";
public static final String PATH_ATTRIBUTE = "path";
Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Fri Sep 10 18:22:05 2010
@@ -154,6 +154,11 @@ public class HTTPConduit
public static final String KEY_HTTP_CONNECTION = "http.connection";
/**
+ * The Logger for this class.
+ */
+ protected static final Logger LOG = LogUtils.getL7dLogger(HTTPConduit.class);
+
+ /**
* This constant is the Message(Map) key for a list of visited URLs that
* is used in redirect loop protection.
*/
@@ -165,10 +170,6 @@ public class HTTPConduit
*/
private static final String KEY_AUTH_URLS = "AuthURLs";
- /**
- * The Logger for this class.
- */
- private static final Logger LOG = LogUtils.getL7dLogger(HTTPConduit.class);
/**
* This constant holds the suffix ".http-conduit" that is appended to the
@@ -189,7 +190,7 @@ public class HTTPConduit
/**
* This field holds a reference to the CXF bus associated this conduit.
*/
- private final Bus bus;
+ protected final Bus bus;
/**
* This field is used for two reasons. First it provides the base name for
@@ -197,16 +198,16 @@ public class HTTPConduit
* address information, should it not be supplied in the Message Map, by the
* Message.ENDPOINT_ADDRESS property.
*/
- private final EndpointInfo endpointInfo;
+ protected final EndpointInfo endpointInfo;
/**
* This field holds the "default" URL for this particular conduit, which
* is created on demand.
*/
- private URL defaultEndpointURL;
- private String defaultEndpointURLString;
- private boolean fromEndpointReferenceType;
+ protected URL defaultEndpointURL;
+ protected String defaultEndpointURLString;
+ protected boolean fromEndpointReferenceType;
// Configurable values
@@ -215,37 +216,46 @@ public class HTTPConduit
* This field is injected via spring configuration based on the conduit
* name.
*/
- private HTTPClientPolicy clientSidePolicy;
+ protected HTTPClientPolicy clientSidePolicy;
/**
* This field holds the password authorization configuration.
* This field is injected via spring configuration based on the conduit
* name.
*/
- private AuthorizationPolicy authorizationPolicy;
+ protected AuthorizationPolicy authorizationPolicy;
/**
* This field holds the password authorization configuration for the
* configured proxy. This field is injected via spring configuration based
* on the conduit name.
*/
- private ProxyAuthorizationPolicy proxyAuthorizationPolicy;
+ protected ProxyAuthorizationPolicy proxyAuthorizationPolicy;
/**
* This field holds the configuration TLS configuration which
* is programmatically configured.
*/
- private TLSClientParameters tlsClientParameters;
+ protected TLSClientParameters tlsClientParameters;
/**
* This field contains the MessageTrustDecider.
*/
- private MessageTrustDecider trustDecider;
+ protected MessageTrustDecider trustDecider;
/**
* This field contains the HttpAuthSupplier.
*/
- private HttpAuthSupplier authSupplier;
+ protected HttpAuthSupplier authSupplier;
+
+
+ /**
+ * Variables for holding session state if sessions are supposed to be maintained
+ */
+ protected Map<String, Cookie> sessionCookies = new ConcurrentHashMap<String, Cookie>();
+ protected boolean maintainSession;
+
+ protected CertConstraints certConstraints;
/**
* This boolean signfies that that finalizeConfig is called, which is
@@ -254,15 +264,7 @@ public class HTTPConduit
* should be handled as such.
*/
private boolean configFinalized;
-
- /**
- * Variables for holding session state if sessions are supposed to be maintained
- */
- private Map<String, Cookie> sessionCookies = new ConcurrentHashMap<String, Cookie>();
- private boolean maintainSession;
- private CertConstraints certConstraints;
-
/**
* Constructor
*
@@ -733,7 +735,7 @@ public class HTTPConduit
*
* @throws MalformedURLException
*/
- private URL setupURL(Message message) throws MalformedURLException {
+ protected URL setupURL(Message message) throws MalformedURLException {
String result = (String)message.get(Message.ENDPOINT_ADDRESS);
String pathInfo = (String)message.get(Message.PATH_INFO);
String queryString = (String)message.get(Message.QUERY_STRING);
@@ -828,7 +830,7 @@ public class HTTPConduit
* @param message The outbound message
* @return The PROTOCOL_HEADERS map
*/
- private Map<String, List<String>> getSetProtocolHeaders(Message message) {
+ protected Map<String, List<String>> getSetProtocolHeaders(Message message) {
Map<String, List<String>> headers =
CastUtils.cast((Map<?, ?>)message.get(Message.PROTOCOL_HEADERS));
if (null == headers) {
@@ -882,7 +884,7 @@ public class HTTPConduit
* @param level The Logging Level.
* @param headers The Message protocol headers.
*/
- private void logProtocolHeaders(
+ protected void logProtocolHeaders(
Level level,
Message message
) {
@@ -946,7 +948,7 @@ public class HTTPConduit
*
* @param exchange The exchange in question
*/
- private boolean isOneway(Exchange exchange) {
+ protected final boolean isOneway(Exchange exchange) {
return exchange != null && exchange.isOneWay();
}
@@ -959,15 +961,16 @@ public class HTTPConduit
*/
protected static InputStream getPartialResponse(
HttpURLConnection connection,
- int responseCode
+ int responseCode,
+ Map<String, List<String>> headers
) throws IOException {
InputStream in = null;
if (responseCode == HttpURLConnection.HTTP_ACCEPTED
|| responseCode == HttpURLConnection.HTTP_OK) {
if (connection.getContentLength() > 0) {
in = connection.getInputStream();
- } else if (hasChunkedResponse(connection)
- || hasEofTerminatedResponse(connection)) {
+ } else if (hasChunkedResponse(headers)
+ || hasEofTerminatedResponse(headers)) {
// ensure chunked or EOF-terminated response is non-empty
in = getNonEmptyContent(connection);
}
@@ -977,22 +980,36 @@ public class HTTPConduit
/**
* @param connection the given HttpURLConnection
- * @return true iff the connection has a chunked response pending
+ * @return true if the connection has a chunked response pending
*/
- private static boolean hasChunkedResponse(HttpURLConnection connection) {
- return HttpHeaderHelper.CHUNKED.equalsIgnoreCase(
- connection.getHeaderField(HttpHeaderHelper.TRANSFER_ENCODING));
+ protected static boolean hasChunkedResponse(Map<String, List<String>> headers) {
+ List<String> s = headers.get(HttpHeaderHelper.TRANSFER_ENCODING);
+ if (s != null) {
+ for (String s2 : s) {
+ if (HttpHeaderHelper.CHUNKED.equalsIgnoreCase(s2)) {
+ return true;
+ }
+ }
+ }
+ return false;
}
/**
* @param connection the given HttpURLConnection
- * @return true iff the connection has a chunked response pending
+ * @return true if the connection should be closed at EOF
*/
- private static boolean hasEofTerminatedResponse(
- HttpURLConnection connection
+ protected static boolean hasEofTerminatedResponse(
+ Map<String, List<String>> headers
) {
- return HttpHeaderHelper.CLOSE.equalsIgnoreCase(
- connection.getHeaderField(HttpHeaderHelper.CONNECTION));
+ List<String> s = headers.get(HttpHeaderHelper.CONNECTION);
+ if (s != null) {
+ for (String s2 : s) {
+ if (HttpHeaderHelper.CLOSE.equalsIgnoreCase(s2)) {
+ return true;
+ }
+ }
+ }
+ return false;
}
/**
@@ -1058,7 +1075,7 @@ public class HTTPConduit
* @param message
* @param headers
*/
- private void setHeadersByAuthorizationPolicy(
+ protected void setHeadersByAuthorizationPolicy(
Message message,
URL url,
Map<String, List<String>> headers
@@ -1129,7 +1146,7 @@ public class HTTPConduit
}
}
}
- private static List<String> createMutableList(String val) {
+ public static List<String> createMutableList(String val) {
return new ArrayList<String>(Arrays.asList(new String[] {val}));
}
/**
@@ -1199,7 +1216,7 @@ public class HTTPConduit
* @param url The URL the message is going to.
* @param headers The headers in the outgoing message.
*/
- private void setHeadersByPolicy(
+ protected void setHeadersByPolicy(
Message message,
URL url,
Map<String, List<String>> headers
@@ -1454,7 +1471,6 @@ public class HTTPConduit
// it is meant to make it to the end. (Too bad that information
// went to every URL along the way, but that's what the user
// wants!
- // TODO: Make this issue a security release note.
setHeadersByAuthorizationPolicy(message, url, headers);
connection = retransmit(
@@ -1471,7 +1487,7 @@ public class HTTPConduit
* @param message The message where the Set of URLs is stored.
* @return The modifiable set of URLs that were visited.
*/
- private Set<String> getSetAuthoriationURLs(Message message) {
+ protected Set<String> getSetAuthoriationURLs(Message message) {
@SuppressWarnings("unchecked")
Set<String> authURLs = (Set<String>) message.get(KEY_AUTH_URLS);
if (authURLs == null) {
@@ -1489,7 +1505,7 @@ public class HTTPConduit
* @param message The message where the Set is stored.
* @return The modifiable set of URLs that were visited.
*/
- private Set<String> getSetVisitedURLs(Message message) {
+ protected Set<String> getSetVisitedURLs(Message message) {
@SuppressWarnings("unchecked")
Set<String> visitedURLs = (Set<String>) message.get(KEY_VISITED_URLS);
if (visitedURLs == null) {
@@ -1528,7 +1544,7 @@ public class HTTPConduit
URL currentURL = connection.getURL();
- String realm = extractAuthorizationRealm(connection.getHeaderFields());
+ String realm = extractAuthorizationRealm(connection.getHeaderFields().get("WWW-Authenticate"));
Set<String> authURLs = getSetAuthoriationURLs(message);
@@ -1660,10 +1676,9 @@ public class HTTPConduit
* @param headers The Http Response Headers
* @return The realm, or null if it is non-existent.
*/
- private String extractAuthorizationRealm(
- Map<String, List<String>> headers
+ protected String extractAuthorizationRealm(
+ List<String> auth
) {
- List<String> auth = headers.get("WWW-Authenticate");
if (auth != null) {
for (String a : auth) {
int idx = a.indexOf("realm=");
@@ -2111,10 +2126,19 @@ public class HTTPConduit
}
+ Map<String, List<String>> origHeaders = connection.getHeaderFields();
+ Map<String, List<String>> headers =
+ new HashMap<String, List<String>>();
+ for (String key : connection.getHeaderFields().keySet()) {
+ if (key != null) {
+ headers.put(HttpHeaderHelper.getHeaderKey(key),
+ origHeaders.get(key));
+ }
+ }
InputStream in = null;
if (isOneway(exchange)) {
- in = getPartialResponse(connection, responseCode);
+ in = getPartialResponse(connection, responseCode, headers);
if (in == null) {
// oneway operation or decoupled MEP without
// partial response
@@ -2133,15 +2157,6 @@ public class HTTPConduit
Message inMessage = new MessageImpl();
inMessage.setExchange(exchange);
- Map<String, List<String>> origHeaders = connection.getHeaderFields();
- Map<String, List<String>> headers =
- new HashMap<String, List<String>>();
- for (String key : connection.getHeaderFields().keySet()) {
- if (key != null) {
- headers.put(HttpHeaderHelper.getHeaderKey(key),
- origHeaders.get(key));
- }
- }
inMessage.put(Message.PROTOCOL_HEADERS, headers);
inMessage.put(Message.RESPONSE_CODE, responseCode);
Added: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java?rev=995917&view=auto
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java (added)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java Fri Sep 10 18:22:05 2010
@@ -0,0 +1,857 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.http.async;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PushbackInputStream;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.io.CachedOutputStream;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.http.Cookie;
+import org.apache.cxf.transport.http.DigestAuthSupplier;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transport.https.CertConstraints;
+import org.apache.cxf.transport.https.CertConstraintsInterceptor;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.version.Version;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.http.Header;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpHead;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpTrace;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.ContentEncoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.entity.ConsumingNHttpEntity;
+import org.apache.http.nio.entity.ProducingNHttpEntity;
+
+/**
+ *
+ */
+public class AsyncHTTPConduit extends HTTPConduit {
+
+ public AsyncHTTPConduit(Bus b, EndpointInfo ei) throws IOException {
+ super(b, ei);
+ }
+ public AsyncHTTPConduit(Bus b, EndpointInfo ei,
+ EndpointReferenceType t) throws IOException {
+ super(b, ei, t);
+ }
+
+ public void prepare(Message message) throws IOException {
+ message.put(AsyncHTTPConduit.class, this);
+ Map<String, List<String>> headers = getSetProtocolHeaders(message);
+
+ URL currentURL = setupURL(message);
+ MessageUtils.getContextualBoolean(message, "force.http.url.connection", false);
+ if (MessageUtils.getContextualBoolean(message, "force.http.url.connection", false)
+ || "https".equalsIgnoreCase(currentURL.getProtocol())) {
+ //delegate to the parent for any https things for now
+ super.prepare(message);
+ return;
+ }
+
+ HTTPClientPolicy csPolicy = getClient(message);
+ message.put(HTTPClientPolicy.class.getName() + ".complete", csPolicy);
+
+ // If the HTTP_REQUEST_METHOD is not set, the default is "POST".
+ String httpRequestMethod =
+ (String)message.get(Message.HTTP_REQUEST_METHOD);
+
+ boolean isChunking = false;
+ int chunkThreshold = 0;
+ boolean needToCacheRequest = false;
+ // We must cache the request if we have basic auth supplier
+ // without preemptive basic auth.
+ if (authSupplier != null) {
+ String auth = authSupplier.getPreemptiveAuthorization(
+ this, currentURL, message);
+ if (auth == null || authSupplier.requiresRequestCaching()) {
+ needToCacheRequest = true;
+ isChunking = false;
+ LOG.log(Level.FINE,
+ "Auth Supplier, but no Premeptive User Pass or Digest auth (nonce may be stale)"
+ + " We must cache request.");
+ }
+ message.put("AUTH_VALUE", auth);
+ }
+ if (csPolicy.isAutoRedirect()) {
+ needToCacheRequest = true;
+ LOG.log(Level.FINE, "AutoRedirect is turned on.");
+ }
+ if (csPolicy.getMaxRetransmits() > 0) {
+ needToCacheRequest = true;
+ LOG.log(Level.FINE, "MaxRetransmits is set > 0.");
+ }
+ // DELETE does not work and empty PUTs cause misleading exceptions
+ // if chunking is enabled
+ // TODO : ensure chunking can be enabled for non-empty PUTs - if requested
+ if ((httpRequestMethod == null || "POST".equals(httpRequestMethod))
+ && csPolicy.isAllowChunking()) {
+ //TODO: The chunking mode be configured or at least some
+ // documented client constant.
+ //use -1 and allow the URL connection to pick a default value
+ isChunking = true;
+ chunkThreshold = csPolicy.getChunkingThreshold();
+ if (chunkThreshold <= 0) {
+ chunkThreshold = 64 * 1024;
+ }
+ }
+
+ //Do we need to maintain a session?
+ maintainSession = Boolean.TRUE.equals((Boolean)message.get(Message.MAINTAIN_SESSION));
+
+ //If we have any cookies and we are maintaining sessions, then use them
+ if (maintainSession && sessionCookies.size() > 0) {
+ List<String> cookies = null;
+ for (String s : headers.keySet()) {
+ if (HttpHeaderHelper.COOKIE.equalsIgnoreCase(s)) {
+ cookies = headers.remove(s);
+ break;
+ }
+ }
+ if (cookies == null) {
+ cookies = new ArrayList<String>();
+ } else {
+ cookies = new ArrayList<String>(cookies);
+ }
+ headers.put(HttpHeaderHelper.COOKIE, cookies);
+ for (Cookie c : sessionCookies.values()) {
+ cookies.add(c.requestCookieHeader());
+ }
+ }
+
+
+ if (certConstraints != null) {
+ message.put(CertConstraints.class.getName(), certConstraints);
+ message.getInterceptorChain().add(CertConstraintsInterceptor.INSTANCE);
+ }
+
+ // Set the headers on the message according to configured
+ // client side policy.
+ setHeadersByPolicy(message, currentURL, headers);
+
+ WrappedOutputStream wout = new WrappedOutputStream(currentURL,
+ message,
+ isChunking,
+ chunkThreshold,
+ needToCacheRequest);
+ message.setContent(OutputStream.class, wout);
+ message.put(WrappedOutputStream.class, wout);
+ }
+
+
+ protected class WrappedOutputStream extends OutputStream
+ implements ProducingNHttpEntity, ConsumingNHttpEntity {
+
+ protected Message outMessage;
+ protected URL address;
+ protected String contentType;
+ protected LoadingByteArrayOutputStream outStream;
+ protected CachedOutputStream requestStream;
+ protected boolean written;
+ protected boolean connected;
+ protected boolean closed;
+ protected HttpResponse response;
+ protected InputStream inStream;
+ private int chunkThreshold;
+ private boolean isChunking;
+ private int contentLength = -1;
+ private int responseCode;
+ private String responseMessage;
+ private int nretransmits;
+
+ protected WrappedOutputStream(
+ URL address,
+ Message m,
+ boolean isChunking,
+ int chunkThreshold,
+ boolean needToCacheRequest
+ ) {
+ this.outMessage = m;
+ this.address = address;
+ this.chunkThreshold = chunkThreshold;
+ this.isChunking = isChunking;
+ if (needToCacheRequest || !isChunking) {
+ requestStream = new CachedOutputStream();
+ }
+ if (isChunking) {
+ outStream = new LoadingByteArrayOutputStream(64 * 1024);
+ }
+ }
+ public void setResponse(HttpResponse r) {
+ response = r;
+ }
+
+ public void doConnection() throws IOException {
+ connected = true;
+ if (closed) {
+ contentLength = outStream == null ? -1 : outStream.size();
+ isChunking = false;
+ }
+ final String httpRequestMethod =
+ (String)outMessage.get(Message.HTTP_REQUEST_METHOD);
+
+ HttpUriRequest request = null;
+ URI uri2;
+ try {
+ uri2 = address.toURI();
+ uri2 = new URI(null, null, uri2.getPath(), uri2.getQuery(), uri2.getFragment());
+ } catch (URISyntaxException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ final URI uri = uri2;
+ if (httpRequestMethod == null || "POST".equals(httpRequestMethod)) {
+ HttpPost post = new HttpPost(uri);
+ request = post;
+ post.setEntity(this);
+ } else if ("PUT".equals(httpRequestMethod)) {
+ HttpPut post = new HttpPut(uri);
+ request = post;
+ post.setEntity(this);
+ } else if ("GET".equals(httpRequestMethod)) {
+ request = new HttpGet(uri);
+ } else if ("DELETE".equals(httpRequestMethod)) {
+ request = new HttpDelete(uri);
+ } else if ("HEAD".equals(httpRequestMethod)) {
+ request = new HttpHead(uri);
+ } else if ("TRACE".equals(httpRequestMethod)) {
+ request = new HttpTrace(uri);
+ } else if ("OPTIONS".equals(httpRequestMethod)) {
+ request = new HttpOptions(uri);
+ } else {
+ HttpRequestBase r = new HttpRequestBase() {
+ public String getMethod() {
+ return httpRequestMethod;
+ }
+ };
+ r.setURI(uri);
+ request = r;
+ }
+ setURLRequestHeaders(request);
+
+ HttpClientController.getHttpClientController(outMessage)
+ .execute(AsyncHTTPConduit.this,
+ address,
+ request,
+ outMessage);
+ }
+ public boolean handleRetransmits(HttpResponse r) {
+ return false;
+ }
+ public synchronized boolean writeTo(ContentEncoder out) throws IOException {
+ if (isChunking || requestStream == null) {
+ if (outStream.size() > 0) {
+ ByteBuffer buf = ByteBuffer.wrap(outStream.getRawBytes(),
+ 0, outStream.size());
+ int size = outStream.size();
+ int i = out.write(buf);
+ while (i < size) {
+ int i2 = out.write(buf);
+ i += i2;
+ }
+
+ outStream.reset();
+ notifyAll();
+ }
+ } else if (requestStream != null) {
+ InputStream in = requestStream.getInputStream();
+ byte bytes[] = new byte[8192];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try {
+ int len = in.read(bytes);
+ while (len != -1) {
+ buffer.limit(len);
+ int i = out.write(buffer);
+ while (i < len) {
+ int i2 = out.write(buffer);
+ i += i2;
+ }
+ buffer.clear();
+ len = in.read(bytes);
+ }
+ } finally {
+ in.close();
+ }
+ }
+ return closed;
+ }
+
+ private void setURLRequestHeaders(HttpUriRequest request) throws IOException {
+ String ct = (String)outMessage.get(Message.CONTENT_TYPE);
+ String enc = (String)outMessage.get(Message.ENCODING);
+
+ if (null != ct) {
+ if (enc != null
+ && ct.indexOf("charset=") == -1
+ && !ct.toLowerCase().contains("multipart/related")) {
+ ct = ct + "; charset=" + enc;
+ }
+ } else if (enc != null) {
+ ct = "text/xml; charset=" + enc;
+ } else {
+ ct = "text/xml";
+ }
+ request.setHeader(HttpHeaderHelper.CONTENT_TYPE, ct);
+ contentType = ct;
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Sending "
+ + request.getMethod()
+ + " Message with Headers to "
+ + request.getURI()
+ + " Conduit :"
+ + getConduitName()
+ + "\nContent-Type: " + ct + "\n");
+ logProtocolHeaders(Level.FINE, outMessage);
+ }
+ Map<String, List<String>> headers = getSetProtocolHeaders(outMessage);
+ for (String header : headers.keySet()) {
+ List<String> headerList = headers.get(header);
+ if (HttpHeaderHelper.CONTENT_TYPE.equalsIgnoreCase(header)) {
+ continue;
+ }
+ if (HttpHeaderHelper.COOKIE.equalsIgnoreCase(header)) {
+ for (String s : headerList) {
+ request.addHeader(HttpHeaderHelper.COOKIE, s);
+ }
+ } else {
+ StringBuilder b = new StringBuilder();
+ for (int i = 0; i < headerList.size(); i++) {
+ b.append(headerList.get(i));
+ if (i + 1 < headerList.size()) {
+ b.append(',');
+ }
+ }
+ request.addHeader(header, b.toString());
+ }
+ }
+ if (request.getFirstHeader("User-Agent") == null) {
+ request.addHeader("User-Agent", Version.getCompleteVersionString());
+ }
+ }
+
+ private InputStream getNonEmptyContent(HttpResponse resp) {
+ InputStream in = null;
+ try {
+ PushbackInputStream pin =
+ new PushbackInputStream(resp.getEntity().getContent());
+ int c = pin.read();
+ if (c != -1) {
+ pin.unread((byte)c);
+ in = pin;
+ }
+ } catch (IOException ioe) {
+ //ignore
+ }
+ return in;
+ }
+ protected InputStream getPartialResponse(HttpResponse r,
+ Map<String, List<String>> headers)
+ throws IOException {
+
+ InputStream in = null;
+ if (responseCode == HttpURLConnection.HTTP_ACCEPTED
+ || responseCode == HttpURLConnection.HTTP_OK) {
+ int cl = -1;
+ List<String> cls = headers.get("content-length");
+ if (cls != null) {
+ cl = Integer.parseInt(cls.get(0));
+ }
+ if (cl > 0) {
+ in = r.getEntity().getContent();
+ } else if (hasChunkedResponse(headers)
+ || hasEofTerminatedResponse(headers)) {
+ // ensure chunked or EOF-terminated response is non-empty
+ in = getNonEmptyContent(r);
+ }
+ }
+ return in;
+ }
+ private Map<String, List<String>> mapResponseHeaders() {
+ Map<String, List<String>> headers =
+ new HashMap<String, List<String>>();
+ for (Header header : response.getAllHeaders()) {
+ String val = header.getValue();
+ List<String> v = headers.get(HttpHeaderHelper.getHeaderKey(header.getName()));
+ if (v == null) {
+ v = new ArrayList<String>(2);
+ headers.put(HttpHeaderHelper.getHeaderKey(header.getName()), v);
+ }
+ v.add(val);
+ }
+ return headers;
+ }
+
+ protected void handleResponseInternal() throws IOException {
+ // Process retransmits until we fall out.
+ responseCode = response.getStatusLine().getStatusCode();
+ responseMessage = response.getStatusLine().getReasonPhrase();
+
+ try {
+ if (handleRetransmits()) {
+ return;
+ }
+ } catch (IOException ioex) {
+ //need to propogate the exception
+ Exception ex = new Fault(ioex);
+ outMessage.getExchange().put(Exception.class, ex);
+ Message inMessage = new MessageImpl();
+ inMessage.setExchange(outMessage.getExchange());
+ inMessage.setContent(Exception.class, ex);
+ incomingObserver.onMessage(inMessage);
+ response.getEntity().getContent().close();
+ return;
+ }
+ if (requestStream != null) {
+ //don't need to retransmit, make sure it gets cleaned up
+ requestStream.close();
+ }
+
+ Exchange exchange = outMessage.getExchange();
+ synchronized (this) {
+ notifyAll();
+ }
+ if (outMessage != null && exchange != null) {
+ exchange.put(Message.RESPONSE_CODE, responseCode);
+ }
+
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Response Code: "
+ + responseCode
+ + " Conduit: " + getConduitName());
+ Header[] headerFields = response.getAllHeaders();
+ if (null != headerFields) {
+ StringBuilder buf = new StringBuilder();
+ buf.append("Header fields: ");
+ buf.append(System.getProperty("line.separator"));
+ for (Header h : headerFields) {
+ buf.append(" ");
+ buf.append(h.getName());
+ buf.append(": ");
+ buf.append(h.getValue());
+ buf.append(System.getProperty("line.separator"));
+ }
+ LOG.fine(buf.toString());
+ }
+ }
+ Message inMessage = new MessageImpl();
+ inMessage.setExchange(exchange);
+
+ if (responseCode == HttpURLConnection.HTTP_NOT_FOUND
+ && !MessageUtils.isTrue(outMessage.getContextualProperty(
+ "org.apache.cxf.http.no_io_exceptions"))) {
+
+ Exception ex = new Fault(new IOException("HTTP response '" + responseCode + ": "
+ + response.getStatusLine().getReasonPhrase() + "'"));
+ exchange.put(Exception.class, ex);
+ inMessage.setContent(Exception.class, ex);
+ incomingObserver.onMessage(inMessage);
+ response.getEntity().getContent().close();
+ return;
+ }
+
+ Map<String, List<String>> headers = mapResponseHeaders();
+ InputStream in = null;
+ if (isOneway(exchange)) {
+ in = getPartialResponse(response, headers);
+ if (in == null) {
+ // oneway operation or decoupled MEP without
+ // partial response
+ response.getEntity().getContent().close();
+ return;
+ }
+ } else {
+ //not going to be resending or anything, clear out the stuff in the out message
+ //to free memory
+ /*
+ outMessage.removeContent(OutputStream.class);
+ if (cachingForRetransmission && cachedStream != null) {
+ cachedStream.close();
+ }
+ cachedStream = null;
+ */
+ }
+
+
+
+ inMessage.put(Message.PROTOCOL_HEADERS, headers);
+ inMessage.put(Message.RESPONSE_CODE, responseCode);
+ List<String> ctList = headers.get("content-type");
+ if (ctList != null) {
+ String ct = null;
+ for (String s : ctList) {
+ if (ct == null) {
+ ct = s;
+ } else {
+ ct += "; " + s;
+ }
+ }
+ inMessage.put(Message.CONTENT_TYPE, ct);
+ String charset = HttpHeaderHelper.findCharset(ct);
+ String normalizedEncoding = HttpHeaderHelper.mapCharset(charset);
+ if (normalizedEncoding == null) {
+ String m = new org.apache.cxf.common.i18n.Message("INVALID_ENCODING_MSG",
+ LOG, charset).toString();
+ LOG.log(Level.WARNING, m);
+ throw new IOException(m);
+ }
+ inMessage.put(Message.ENCODING, normalizedEncoding);
+ }
+
+ if (maintainSession) {
+ List<String> cookies = headers.get("Set-Cookie");
+ Cookie.handleSetCookie(sessionCookies, cookies);
+ }
+ if (responseCode != HttpURLConnection.HTTP_NOT_FOUND
+ && in == null) {
+ in = response.getEntity().getContent();
+ }
+ // if (in == null) : it's perfectly ok for non-soap http services
+ // have no response body : those interceptors which do need it will check anyway
+ inMessage.setContent(InputStream.class, in);
+ incomingObserver.onMessage(inMessage);
+ }
+
+ private boolean handleRetransmits() throws IOException {
+ if (requestStream != null) {
+ HTTPClientPolicy policy = getClient(outMessage);
+ // Default MaxRetransmits is -1 which means unlimited.
+ int maxRetransmits = (policy == null)
+ ? -1
+ : policy.getMaxRetransmits();
+
+ // MaxRetransmits of zero means zero.
+ if (maxRetransmits == 0) {
+ return false;
+ }
+ if (maxRetransmits != -1 && nretransmits >= maxRetransmits) {
+ return false;
+ }
+ if (processRetransmit(outMessage)) {
+ ++nretransmits;
+ return true;
+ }
+ }
+ return false;
+ }
+ private boolean processRetransmit(Message message) throws IOException {
+
+ if ((message != null) && (message.getExchange() != null)) {
+ message.getExchange().put(Message.RESPONSE_CODE, responseCode);
+ }
+
+ // Process Redirects first.
+ switch(responseCode) {
+ case HttpURLConnection.HTTP_MOVED_PERM:
+ case HttpURLConnection.HTTP_MOVED_TEMP:
+ return redirectRetransmit();
+ case HttpURLConnection.HTTP_UNAUTHORIZED:
+ return authorizationRetransmit();
+ default:
+ break;
+ }
+ return false;
+ }
+ private boolean authorizationRetransmit() throws IOException {
+ // If we don't have a dynamic supply of user pass, then
+ // we don't retransmit. We just die with a Http 401 response.
+ if (authSupplier == null) {
+ String auth = response.getFirstHeader("WWW-Authenticate").getValue();
+ if (auth.startsWith("Digest ")) {
+ authSupplier = new DigestAuthSupplier();
+ } else {
+ return false;
+ }
+ }
+
+ URL currentURL = address;
+
+ Header heads[] = response.getHeaders("WWW-Authenticate");
+ List<String> auth = new ArrayList<String>();
+ if (heads != null) {
+ for (Header h : heads) {
+ auth.add(h.getValue());
+ }
+ }
+ String realm = extractAuthorizationRealm(auth);
+
+ Set<String> authURLs = getSetAuthoriationURLs(outMessage);
+
+ // If we have been here (URL & Realm) before for this particular message
+ // retransmit, it means we have already supplied information
+ // which must have been wrong, or we wouldn't be here again.
+ // Otherwise, the server may be 401 looping us around the realms.
+ if (authURLs.contains(currentURL.toString() + realm)) {
+ if (LOG.isLoggable(Level.INFO)) {
+ LOG.log(Level.INFO, "Authorization loop detected on Conduit \""
+ + getConduitName()
+ + "\" on URL \""
+ + "\" with realm \""
+ + realm
+ + "\"");
+ }
+
+ throw new IOException("Authorization loop detected on Conduit \""
+ + getConduitName()
+ + "\" on URL \""
+ + "\" with realm \""
+ + realm
+ + "\"");
+ }
+
+ String up =
+ authSupplier.getAuthorizationForRealm(
+ AsyncHTTPConduit.this, currentURL, outMessage,
+ realm, response.getFirstHeader("WWW-Authenticate").getValue());
+
+ // No user pass combination. We give up.
+ if (up == null) {
+ return false;
+ }
+
+ // Register that we have been here before we go.
+ authURLs.add(currentURL.toString() + realm);
+
+ Map<String, List<String>> headers = getSetProtocolHeaders(outMessage);
+ headers.put("Authorization",
+ createMutableList(up));
+ inStream = null;
+ doConnection();
+ return true;
+ }
+ private boolean redirectRetransmit() throws IOException {
+ // If we are not redirecting by policy, then we don't.
+ if (!getClient(outMessage).isAutoRedirect()) {
+ return false;
+ }
+ // We keep track of the redirections for redirect loop protection.
+ Set<String> visitedURLs = getSetVisitedURLs(outMessage);
+ visitedURLs.add(address.toString());
+
+ String newURL = response.getFirstHeader("Location").getValue();
+ if (newURL != null) {
+ if (visitedURLs.contains(newURL)) {
+ throw new IOException("Redirect loop detected on Conduit \""
+ + getConduitName()
+ + "\" on '"
+ + newURL
+ + "'");
+ }
+ inStream = null;
+
+ address = new URL(newURL);
+ // We are going to redirect.
+ // Remove any Server Authentication Information for the previous
+ // URL.
+ Map<String, List<String>> headers =
+ getSetProtocolHeaders(outMessage);
+ headers.remove("Authorization");
+ headers.remove("Proxy-Authorization");
+
+ // If user configured this Conduit with preemptive authorization
+ // it is meant to make it to the end. (Too bad that information
+ // went to every URL along the way, but that's what the user
+ // wants!
+ setHeadersByAuthorizationPolicy(outMessage, address, headers);
+ doConnection();
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isRepeatable() {
+ return false;
+ }
+
+ public boolean isChunked() {
+ return this.isChunking;
+ }
+
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ public Header getContentType() {
+ return new BasicHeader(Message.CONTENT_TYPE, contentType);
+ }
+
+ public Header getContentEncoding() {
+ return null;
+ }
+
+ public InputStream getContent() throws IOException, IllegalStateException {
+ return inStream;
+ }
+
+ public boolean isStreaming() {
+ return true;
+ }
+
+ public void consumeContent() throws IOException {
+ }
+
+ public void produceContent(final ContentEncoder encoder,
+ IOControl ioctrl) throws IOException {
+ if (writeTo(encoder)) {
+ encoder.complete();
+ }
+ }
+
+ public void finish() throws IOException {
+ }
+
+ public synchronized void write(byte[] b, int off, int len) throws IOException {
+ if (!connected
+ && isChunking
+ && ((outStream.size() + len) > this.chunkThreshold)) {
+ doConnection();
+ }
+ written = true;
+ if (isChunking) {
+ if (outStream.size() > 0
+ && ((outStream.size() + len) > (64 * 1024))) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ outStream.write(b, off, len);
+ }
+ if (requestStream != null) {
+ requestStream.write(b, off, len);
+ }
+ }
+ public synchronized void write(int b) throws IOException {
+ if (!connected
+ && isChunking
+ && ((outStream.size() + 1) > this.chunkThreshold)) {
+ doConnection();
+ }
+ written = true;
+ if (isChunking) {
+ outStream.write(b);
+ }
+ if (requestStream != null) {
+ requestStream.write(b);
+ }
+ }
+ public synchronized void close() throws IOException {
+ closed = true;
+ if (!connected) {
+ doConnection();
+ }
+ if (isOneway(outMessage.getExchange())) {
+ //for a one way, we at least need to wait for the response code
+ while (responseCode == 0) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (responseCode != HttpURLConnection.HTTP_ACCEPTED
+ && responseCode != HttpURLConnection.HTTP_OK) {
+ throw new IOException(responseMessage);
+ }
+ }
+ }
+
+ public void consumeContent(final ContentDecoder decoder, IOControl ioctrl) throws IOException {
+ if (inStream != null) {
+ return;
+ }
+ inStream = new InputStream() {
+ public int read(byte b[], int off, int len) throws IOException {
+ if (decoder.isCompleted()) {
+ return -1;
+ }
+ ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+ int i = decoder.read(buf);
+ while (i == 0) {
+ i = decoder.read(buf);
+ }
+ return i;
+ }
+
+ public int read() throws IOException {
+ if (decoder.isCompleted()) {
+ return -1;
+ }
+ byte[] bytes = new byte[1];
+ int i = read(bytes, 0, 1);
+ if (i == -1) {
+ return -1;
+ }
+ i = bytes[0];
+ i &= 0xFF;
+ return i;
+ }
+ public void close() throws IOException {
+ }
+ };
+ this.handleResponseInternal();
+ }
+ public void writeTo(OutputStream outstream) throws IOException {
+ throw new UnsupportedOperationException("Not used for NIO");
+ }
+ }
+
+
+ protected void sendException(Message m, Exception exception) {
+ Message m2 = new MessageImpl();
+ m2.setExchange(m.getExchange());
+ m2.setContent(Exception.class, exception);
+ m.getExchange().put(Exception.class, exception);
+ this.incomingObserver.onMessage(m2);
+ }
+
+}
Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java?rev=995917&view=auto
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java (added)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java Fri Sep 10 18:22:05 2010
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.http.async;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.http.async.AsyncHTTPConduit.WrappedOutputStream;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.version.Version;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.DefaultConnectionReuseStrategy;
+import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.nio.NHttpClientIOTarget;
+import org.apache.http.nio.NHttpConnection;
+import org.apache.http.nio.entity.ConsumingNHttpEntity;
+import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
+import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.nio.reactor.IOEventDispatch;
+import org.apache.http.nio.reactor.IOSession;
+import org.apache.http.nio.reactor.SessionRequest;
+import org.apache.http.nio.reactor.SessionRequestCallback;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.params.CoreProtocolPNames;
+import org.apache.http.params.HttpParams;
+import org.apache.http.protocol.BasicHttpProcessor;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.RequestConnControl;
+import org.apache.http.protocol.RequestContent;
+import org.apache.http.protocol.RequestExpectContinue;
+import org.apache.http.protocol.RequestTargetHost;
+import org.apache.http.protocol.RequestUserAgent;
+
+public class HttpClientController implements BusLifeCycleListener,
+ NHttpRequestExecutionHandler {
+ ConnectingIOReactor ioReactor;
+
+ HttpClientController() {
+ }
+
+ public void setUp() {
+ try {
+ HttpParams params = new BasicHttpParams();
+ params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 60000)
+ .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000)
+ .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
+ .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
+ .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
+ .setParameter(CoreProtocolPNames.USER_AGENT, Version.getCompleteVersionString());
+ ioReactor = new DefaultConnectingIOReactor(4, params);
+
+
+ BasicHttpProcessor httpproc = new BasicHttpProcessor();
+ httpproc.addInterceptor(new RequestContent());
+ httpproc.addInterceptor(new RequestTargetHost());
+ httpproc.addInterceptor(new RequestConnControl());
+ httpproc.addInterceptor(new RequestUserAgent());
+ httpproc.addInterceptor(new RequestExpectContinue());
+
+ AsyncNHttpClientHandler handler = new AsyncNHttpClientHandler(
+ httpproc,
+ this,
+ new DefaultConnectionReuseStrategy(),
+ params) {
+ protected void handleTimeout(final NHttpConnection conn) {
+ super.handleTimeout(conn);
+ Message m = (Message)conn.getContext().getAttribute("MESSAGE");
+ m.get(AsyncHTTPConduit.class).sendException(m, new SocketTimeoutException());
+ }
+ };
+
+
+ final IOEventDispatch ioEventDispatch
+ = new DefaultClientIOEventDispatch(handler, params) {
+ protected NHttpClientIOTarget createConnection(IOSession session) {
+ Message m = (Message)session.getAttribute(IOSession.ATTACHMENT_KEY);
+ HTTPClientPolicy client = (HTTPClientPolicy)m
+ .get(HTTPClientPolicy.class.getName() + ".complete");
+ if (client != null) {
+ session.setSocketTimeout((int)client.getReceiveTimeout());
+ }
+ return super.createConnection(session);
+ }
+ };
+
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ ioReactor.execute(ioEventDispatch);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ t.setDaemon(true);
+ t.start();
+
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ public void execute(AsyncHTTPConduit conduit,
+ final URL address,
+ HttpUriRequest request,
+ final Message message) throws IOException {
+ int port = address.getPort();
+ if (port == -1) {
+ port = 80;
+ }
+ InetSocketAddress add = new InetSocketAddress(address.getHost(), port);
+ message.put(HttpUriRequest.class, request);
+ synchronized (message) {
+ SessionRequest req
+ = ioReactor.connect(add, null, message,
+ new SessionRequestCallback() {
+ public void completed(SessionRequest request) {
+ synchronized (message) {
+ message.notifyAll();
+ }
+ }
+
+ public void failed(SessionRequest request) {
+ message.put(IOException.class,
+ new ConnectException("Failed to connect to " + address));
+ synchronized (message) {
+ message.notifyAll();
+ }
+ }
+
+ public void timeout(SessionRequest request) {
+ message.put(IOException.class,
+ new ConnectException("Failed to connect to " + address));
+ synchronized (message) {
+ message.notifyAll();
+ }
+ }
+
+ public void cancelled(SessionRequest request) {
+ message.put(IOException.class,
+ new ConnectException("Failed to connect to " + address));
+ synchronized (message) {
+ message.notifyAll();
+ }
+ }
+ });
+ HTTPClientPolicy client
+ = (HTTPClientPolicy)message.get(HTTPClientPolicy.class.getName() + ".complete");
+ req.setConnectTimeout((int)client.getConnectionTimeout());
+
+ try {
+ message.wait();
+ if (message.get(IOException.class) != null) {
+ throw message.get(IOException.class);
+ }
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ message.put(SessionRequest.class, req);
+ }
+ }
+
+ public static HttpClientController getHttpClientController(Message message) {
+ Bus bus = message.getExchange() == null ? null : message.getExchange().getBus();
+ if (bus == null) {
+ bus = BusFactory.getThreadDefaultBus();
+ }
+ HttpClientController stuff = bus.getExtension(HttpClientController.class);
+ if (stuff == null) {
+ stuff = registerHttpClient(bus);
+ }
+ return stuff;
+ }
+ static synchronized HttpClientController registerHttpClient(Bus bus) {
+ HttpClientController stuff = bus.getExtension(HttpClientController.class);
+ if (stuff == null) {
+ stuff = new HttpClientController();
+ stuff.setUp();
+ bus.setExtension(stuff, HttpClientController.class);
+ bus.getExtension(BusLifeCycleManager.class).registerLifeCycleListener(stuff);
+ }
+ return stuff;
+ }
+
+ public void initComplete() {
+ }
+ public void preShutdown() {
+ }
+ public void postShutdown() {
+ }
+
+ public void initalizeContext(HttpContext context, Object attachment) {
+ context.setAttribute("MESSAGE", attachment);
+ }
+
+ public HttpRequest submitRequest(HttpContext context) {
+ Message m = (Message)context.getAttribute("MESSAGE");
+ if (m == null) {
+ return null;
+ }
+ return m.get(HttpUriRequest.class);
+ }
+
+ public ConsumingNHttpEntity responseEntity(HttpResponse response, HttpContext context)
+ throws IOException {
+ Message m = (Message)context.getAttribute("MESSAGE");
+ WrappedOutputStream out = m.get(WrappedOutputStream.class);
+ out.setResponse(response);
+ return out;
+ }
+
+ public void handleResponse(HttpResponse response, HttpContext context) throws IOException {
+ }
+
+ public void finalizeContext(HttpContext context) {
+ context.removeAttribute("MESSAGE");
+ }
+
+}
\ No newline at end of file
Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java (original)
+++ cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java Fri Sep 10 18:22:05 2010
@@ -379,8 +379,11 @@ public class ClientServerTest extends Ab
long before = System.currentTimeMillis();
long delay = 3000;
+ //System.out.println(System.currentTimeMillis());
Response<GreetMeLaterResponse> r1 = greeter.greetMeLaterAsync(delay);
+ //System.out.println(System.currentTimeMillis());
Response<GreetMeLaterResponse> r2 = greeter.greetMeLaterAsync(delay);
+ //System.out.println(System.currentTimeMillis());
long after = System.currentTimeMillis();
@@ -403,6 +406,7 @@ public class ClientServerTest extends Ab
}
waited += 500;
}
+ //Thread.sleep(100000000);
assertTrue("Response is not available.", r1.isDone());
assertTrue("Response is not available.", r2.isDone());
}
@@ -680,7 +684,10 @@ public class ClientServerTest extends Ab
BindingProvider bp = (BindingProvider)greeter;
Map<String, Object> responseContext = bp.getResponseContext();
String contentType = (String) responseContext.get(Message.CONTENT_TYPE);
- assertEquals("text/xml;charset=utf-8", contentType.toLowerCase());
+
+
+ assertTrue(contentType.toLowerCase().contains("text/xml"));
+ assertTrue(contentType.toLowerCase().contains("charset=utf-8"));
Integer responseCode = (Integer) responseContext.get(Message.RESPONSE_CODE);
assertEquals(500, responseCode.intValue());
assertNotNull(brlf.getFaultInfo());
Modified: cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java (original)
+++ cxf/branches/async-client/systests/jaxws/src/test/java/org/apache/cxf/systest/swa/ClientServerSwaTest.java Fri Sep 10 18:22:05 2010
@@ -127,6 +127,7 @@ public class ClientServerSwaTest extends
String string = IOUtils.newStringFromBytes(b);
assertEquals("testfoobar", string);
assertEquals("Hi", textHolder.value);
+ bis.close();
}
@Test
@@ -157,6 +158,7 @@ public class ClientServerSwaTest extends
assertEquals("testfoobar", string);
assertEquals("Hi", textHolder.value);
assertEquals("Header", headerHolder.value);
+ bis.close();
}
@Test
@@ -184,6 +186,7 @@ public class ClientServerSwaTest extends
bis.read(b, 0, 10);
String string = IOUtils.newStringFromBytes(b);
assertEquals("testfoobar", string);
+ bis.close();
}
@Test
@@ -224,10 +227,14 @@ public class ClientServerSwaTest extends
attach5);
assertNotNull(response);
- Map<?, ?> map = CastUtils.cast((Map<?, ?>)((BindingProvider)port).getResponseContext()
+ Map<String, DataHandler> map
+ = CastUtils.cast((Map<?, ?>)((BindingProvider)port).getResponseContext()
.get(MessageContext.INBOUND_MESSAGE_ATTACHMENTS));
assertNotNull(map);
assertEquals(5, map.size());
+ for (Map.Entry<String, DataHandler> ent : map.entrySet()) {
+ ent.getValue().getInputStream().close();
+ }
}
@Test
Modified: cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java (original)
+++ cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/factory_pattern/MultiplexClientServerTest.java Fri Sep 10 18:22:05 2010
@@ -31,6 +31,7 @@ import org.apache.cxf.factory_pattern.Nu
import org.apache.cxf.factory_pattern.NumberFactory;
import org.apache.cxf.factory_pattern.NumberFactoryService;
import org.apache.cxf.factory_pattern.NumberService;
+import org.apache.cxf.frontend.ClientProxy;
import org.apache.cxf.jaxws.ServiceImpl;
import org.apache.cxf.jaxws.support.ServiceDelegateAccessor;
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
@@ -100,10 +101,14 @@ public class MultiplexClientServerTest e
Number num = (Number)serviceImpl.getPort(numberTwoRef, Number.class);
assertTrue("20 is even", num.isEven().isEven());
+ ClientProxy.getClient(num).destroy();
W3CEndpointReference numberTwentyThreeRef = factory.create("23");
num = (Number)serviceImpl.getPort(numberTwentyThreeRef, Number.class);
assertTrue("23 is not even", !num.isEven().isEven());
+ ClientProxy.getClient(num).destroy();
+
+ ClientProxy.getClient(factory).destroy();
}
@Test
@@ -132,10 +137,14 @@ public class MultiplexClientServerTest e
assertTrue("match on exception message " + expected.getMessage(),
expected.getMessage().indexOf("999") != -1);
}
+ ClientProxy.getClient(num).destroy();
ref = factory.create("37");
assertNotNull("reference", ref);
num = (Number)serviceImpl.getPort(ref, Number.class);
assertTrue("37 is not even", !num.isEven().isEven());
+ ClientProxy.getClient(num).destroy();
+
+ ClientProxy.getClient(factory).destroy();
}
}
Modified: cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java?rev=995917&r1=995916&r2=995917&view=diff
==============================================================================
--- cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java (original)
+++ cxf/branches/async-client/systests/uncategorized/src/test/java/org/apache/cxf/systest/mtom/MtomServerTest.java Fri Sep 10 18:22:05 2010
@@ -91,6 +91,8 @@ public class MtomServerTest extends Abst
conduit.setMessageObserver(obs);
Message m = new MessageImpl();
+ m.setExchange(new ExchangeImpl());
+ m.getExchange().put(Bus.class, getBus());
String ct = "multipart/related; type=\"application/xop+xml\"; "
+ "start=\"<so...@xfire.codehaus.org>\"; "
+ "start-info=\"text/xml\"; "
@@ -157,6 +159,8 @@ public class MtomServerTest extends Abst
conduit.setMessageObserver(obs);
Message m = new MessageImpl();
+ m.setExchange(new ExchangeImpl());
+ m.getExchange().put(Bus.class, getBus());
String ct = "multipart/related; type=\"application/xop+xml\"; "
+ "start=\"<so...@xfire.codehaus.org>\"; "
+ "start-info=\"text/xml; charset=utf-8\"; "
@@ -182,6 +186,8 @@ public class MtomServerTest extends Abst
byte[] res = obs.getResponseStream().toByteArray();
MessageImpl resMsg = new MessageImpl();
+ resMsg.setExchange(new ExchangeImpl());
+ resMsg.getExchange().put(Bus.class, getBus());
resMsg.setContent(InputStream.class, new ByteArrayInputStream(res));
resMsg.put(Message.CONTENT_TYPE, obs.getResponseContentType());
resMsg.setExchange(new ExchangeImpl());