You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2010/09/10 20:22:14 UTC
svn commit: r995918 - in
/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async:
AsyncHTTPConduit.java HttpClientController.java
Author: dkulp
Date: Fri Sep 10 18:22:14 2010
New Revision: 995918
URL: http://svn.apache.org/viewvc?rev=995918&view=rev
Log:
Support keep-alives
Modified:
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java?rev=995918&r1=995917&r2=995918&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java Fri Sep 10 18:22:14 2010
@@ -89,7 +89,6 @@ public class AsyncHTTPConduit extends HT
Map<String, List<String>> headers = getSetProtocolHeaders(message);
URL currentURL = setupURL(message);
- MessageUtils.getContextualBoolean(message, "force.http.url.connection", false);
if (MessageUtils.getContextualBoolean(message, "force.http.url.connection", false)
|| "https".equalsIgnoreCase(currentURL.getProtocol())) {
//delegate to the parent for any https things for now
@@ -836,6 +835,14 @@ public class AsyncHTTPConduit extends HT
return i;
}
public void close() throws IOException {
+ if (!decoder.isCompleted()) {
+ ByteBuffer buf = ByteBuffer.allocate(4096);
+ int i = decoder.read(buf);
+ while (i != -1) {
+ buf.clear();
+ i = decoder.read(buf);
+ }
+ }
}
};
this.handleResponseInternal();
Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java?rev=995918&r1=995917&r2=995918&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java Fri Sep 10 18:22:14 2010
@@ -21,8 +21,12 @@ package org.apache.cxf.transport.http.as
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.URL;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
@@ -64,9 +68,44 @@ public class HttpClientController implem
NHttpRequestExecutionHandler {
ConnectingIOReactor ioReactor;
+ Map<String, Stack<SessionRequest>> sessions
+ = new ConcurrentHashMap<String, Stack<SessionRequest>>();
+
+
HttpClientController() {
}
+ static final class MessageHolder {
+ private Message msg;
+ private String key;
+ private SessionRequest session;
+
+ public MessageHolder(Message m, String key) {
+ msg = m;
+ this.key = key;
+ }
+ public String getKey() {
+ return key;
+ }
+ public Message get() {
+ return msg;
+ }
+ public Message getAndRemove() {
+ Message m = msg;
+ msg = null;
+ return m;
+ }
+ public void set(Message message) {
+ msg = message;
+ }
+ public void setSession(SessionRequest r) {
+ session = r;
+ }
+ public SessionRequest getSession() {
+ return session;
+ }
+ }
+
public void setUp() {
try {
HttpParams params = new BasicHttpParams();
@@ -93,8 +132,9 @@ public class HttpClientController implem
params) {
protected void handleTimeout(final NHttpConnection conn) {
super.handleTimeout(conn);
- Message m = (Message)conn.getContext().getAttribute("MESSAGE");
- m.get(AsyncHTTPConduit.class).sendException(m, new SocketTimeoutException());
+ MessageHolder m = (MessageHolder)conn.getContext().getAttribute("MESSAGE");
+ m.get().get(AsyncHTTPConduit.class).sendException(m.get(),
+ new SocketTimeoutException());
}
};
@@ -102,8 +142,8 @@ public class HttpClientController implem
final IOEventDispatch ioEventDispatch
= new DefaultClientIOEventDispatch(handler, params) {
protected NHttpClientIOTarget createConnection(IOSession session) {
- Message m = (Message)session.getAttribute(IOSession.ATTACHMENT_KEY);
- HTTPClientPolicy client = (HTTPClientPolicy)m
+ MessageHolder m = (MessageHolder)session.getAttribute(IOSession.ATTACHMENT_KEY);
+ HTTPClientPolicy client = (HTTPClientPolicy)m.get()
.get(HTTPClientPolicy.class.getName() + ".complete");
if (client != null) {
session.setSocketTimeout((int)client.getReceiveTimeout());
@@ -128,19 +168,43 @@ public class HttpClientController implem
ex.printStackTrace();
}
}
+ public SessionRequest findSession(String key) {
+ Stack<SessionRequest> stack = sessions.get(key);
+ if (stack != null && !stack.isEmpty()) {
+ return stack.pop();
+ }
+ return null;
+ }
+
public void execute(AsyncHTTPConduit conduit,
final URL address,
HttpUriRequest request,
final Message message) throws IOException {
+ message.put(HttpUriRequest.class, request);
int port = address.getPort();
if (port == -1) {
port = 80;
}
- InetSocketAddress add = new InetSocketAddress(address.getHost(), port);
- message.put(HttpUriRequest.class, request);
+ String key = address.getHost() + ":" + port;
+ SessionRequest srequest = findSession(key);
+ if (srequest != null && !srequest.getSession().isClosed()) {
+ message.put(SessionRequest.class, srequest);
+ Object o = srequest.getSession().getAttribute("http.connection");
+ ((MessageHolder)srequest.getAttachment()).set(message);
+ NHttpConnection ioc = (NHttpConnection)o;
+ HTTPClientPolicy client = (HTTPClientPolicy)message
+ .get(HTTPClientPolicy.class.getName() + ".complete");
+ if (client != null) {
+ ioc.setSocketTimeout((int)client.getReceiveTimeout());
+ }
+
+ ioc.requestOutput();
+ return;
+ }
+ SocketAddress add = new InetSocketAddress(address.getHost(), port);
synchronized (message) {
SessionRequest req
- = ioReactor.connect(add, null, message,
+ = ioReactor.connect(add, null, new MessageHolder(message, key),
new SessionRequestCallback() {
public void completed(SessionRequest request) {
synchronized (message) {
@@ -185,6 +249,7 @@ public class HttpClientController implem
//ignore
}
message.put(SessionRequest.class, req);
+ srequest = req;
}
}
@@ -222,26 +287,43 @@ public class HttpClientController implem
}
public HttpRequest submitRequest(HttpContext context) {
- Message m = (Message)context.getAttribute("MESSAGE");
+ Message m = ((MessageHolder)context.getAttribute("MESSAGE")).get();
if (m == null) {
return null;
}
- return m.get(HttpUriRequest.class);
+ return (HttpRequest)m.remove(HttpUriRequest.class.getName());
}
public ConsumingNHttpEntity responseEntity(HttpResponse response, HttpContext context)
throws IOException {
- Message m = (Message)context.getAttribute("MESSAGE");
+ MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE");
+ Message m = holder.get();
WrappedOutputStream out = m.get(WrappedOutputStream.class);
out.setResponse(response);
return out;
}
public void handleResponse(HttpResponse response, HttpContext context) throws IOException {
+ MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE");
+ Message m = holder.getAndRemove();
+ SessionRequest r = m.get(SessionRequest.class);
+ holder.setSession(r);
+ String key = holder.getKey();
+ Stack<SessionRequest> srs = sessions.get(key);
+ if (srs == null) {
+ srs = new Stack<SessionRequest>();
+ sessions.put(key, srs);
+ }
+ srs.push(r);
}
public void finalizeContext(HttpContext context) {
- context.removeAttribute("MESSAGE");
+ MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE");
+ String key = holder.getKey();
+ Stack<SessionRequest> srs = sessions.get(key);
+ if (srs != null) {
+ srs.removeElement(holder.getSession());
+ }
}
}
\ No newline at end of file