You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2010/09/10 20:22:14 UTC

svn commit: r995918 - in /cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async: AsyncHTTPConduit.java HttpClientController.java

Author: dkulp
Date: Fri Sep 10 18:22:14 2010
New Revision: 995918

URL: http://svn.apache.org/viewvc?rev=995918&view=rev
Log:
Support keep-alives

Modified:
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
    cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java

Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java?rev=995918&r1=995917&r2=995918&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/AsyncHTTPConduit.java Fri Sep 10 18:22:14 2010
@@ -89,7 +89,6 @@ public class AsyncHTTPConduit extends HT
         Map<String, List<String>> headers = getSetProtocolHeaders(message);
         
         URL currentURL = setupURL(message);
-        MessageUtils.getContextualBoolean(message, "force.http.url.connection", false);
         if (MessageUtils.getContextualBoolean(message, "force.http.url.connection", false)
             || "https".equalsIgnoreCase(currentURL.getProtocol())) {
             //delegate to the parent for any https things for now
@@ -836,6 +835,14 @@ public class AsyncHTTPConduit extends HT
                     return i;
                 }
                 public void close() throws IOException {
+                    if (!decoder.isCompleted()) {
+                        ByteBuffer buf = ByteBuffer.allocate(4096);
+                        int i = decoder.read(buf);
+                        while (i != -1) {
+                            buf.clear();
+                            i = decoder.read(buf);
+                        }
+                    }
                 }
             };
             this.handleResponseInternal();

Modified: cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java
URL: http://svn.apache.org/viewvc/cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java?rev=995918&r1=995917&r2=995918&view=diff
==============================================================================
--- cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java (original)
+++ cxf/branches/async-client/rt/transports/http/src/main/java/org/apache/cxf/transport/http/async/HttpClientController.java Fri Sep 10 18:22:14 2010
@@ -21,8 +21,12 @@ package org.apache.cxf.transport.http.as
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URL;
+import java.util.Map;
+import java.util.Stack;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -64,9 +68,44 @@ public class HttpClientController implem
     NHttpRequestExecutionHandler {
     ConnectingIOReactor ioReactor;
     
+    Map<String, Stack<SessionRequest>> sessions 
+        = new ConcurrentHashMap<String, Stack<SessionRequest>>();
+    
+    
     HttpClientController() {
     }
     
+    static final class MessageHolder {
+        private Message msg;
+        private String key;
+        private SessionRequest session;
+        
+        public MessageHolder(Message m, String key) {
+            msg = m;
+            this.key = key;
+        }
+        public String getKey() {
+            return key;
+        }
+        public Message get() {
+            return msg;
+        }
+        public Message getAndRemove() {
+            Message m = msg;
+            msg = null;
+            return m;
+        }
+        public void set(Message message) {
+            msg = message;
+        }
+        public void setSession(SessionRequest r) {
+            session = r;
+        }
+        public SessionRequest getSession() {
+            return session;
+        }
+    }
+    
     public void setUp() {
         try {
             HttpParams params = new BasicHttpParams();
@@ -93,8 +132,9 @@ public class HttpClientController implem
                 params) {
                 protected void handleTimeout(final NHttpConnection conn) {
                     super.handleTimeout(conn);
-                    Message m = (Message)conn.getContext().getAttribute("MESSAGE");
-                    m.get(AsyncHTTPConduit.class).sendException(m, new SocketTimeoutException());
+                    MessageHolder m = (MessageHolder)conn.getContext().getAttribute("MESSAGE");
+                    m.get().get(AsyncHTTPConduit.class).sendException(m.get(),
+                                                                      new SocketTimeoutException());
                 }
             };
 
@@ -102,8 +142,8 @@ public class HttpClientController implem
             final IOEventDispatch ioEventDispatch 
                 = new DefaultClientIOEventDispatch(handler, params) {
                     protected NHttpClientIOTarget createConnection(IOSession session) {
-                        Message m = (Message)session.getAttribute(IOSession.ATTACHMENT_KEY);
-                        HTTPClientPolicy client = (HTTPClientPolicy)m
+                        MessageHolder m = (MessageHolder)session.getAttribute(IOSession.ATTACHMENT_KEY);
+                        HTTPClientPolicy client = (HTTPClientPolicy)m.get()
                             .get(HTTPClientPolicy.class.getName() + ".complete");
                         if (client != null) {
                             session.setSocketTimeout((int)client.getReceiveTimeout());
@@ -128,19 +168,43 @@ public class HttpClientController implem
             ex.printStackTrace();
         }
     }
+    public SessionRequest findSession(String key) {
+        Stack<SessionRequest> stack = sessions.get(key);
+        if (stack != null && !stack.isEmpty()) {
+            return stack.pop();
+        }
+        return null;
+    }
+    
     public void execute(AsyncHTTPConduit conduit, 
                         final URL address, 
                         HttpUriRequest request, 
                         final Message message) throws IOException {
+        message.put(HttpUriRequest.class, request);
         int port = address.getPort();
         if (port == -1) {
             port = 80;
         }
-        InetSocketAddress add = new InetSocketAddress(address.getHost(), port);
-        message.put(HttpUriRequest.class, request);
+        String key = address.getHost() + ":" + port;
+        SessionRequest srequest = findSession(key);
+        if (srequest != null && !srequest.getSession().isClosed()) {
+            message.put(SessionRequest.class, srequest);
+            Object o = srequest.getSession().getAttribute("http.connection");
+            ((MessageHolder)srequest.getAttachment()).set(message);
+            NHttpConnection ioc = (NHttpConnection)o;
+            HTTPClientPolicy client = (HTTPClientPolicy)message
+                .get(HTTPClientPolicy.class.getName() + ".complete");
+            if (client != null) {
+                ioc.setSocketTimeout((int)client.getReceiveTimeout());
+            }
+
+            ioc.requestOutput();
+            return;
+        }
+        SocketAddress add = new InetSocketAddress(address.getHost(), port);
         synchronized (message) {
             SessionRequest req 
-                = ioReactor.connect(add, null, message,
+                = ioReactor.connect(add, null, new MessageHolder(message, key),
                     new SessionRequestCallback() {
                         public void completed(SessionRequest request) {
                             synchronized (message) {
@@ -185,6 +249,7 @@ public class HttpClientController implem
                 //ignore
             }
             message.put(SessionRequest.class, req);
+            srequest = req;
         }
     }
     
@@ -222,26 +287,43 @@ public class HttpClientController implem
     }
 
     public HttpRequest submitRequest(HttpContext context) {
-        Message m = (Message)context.getAttribute("MESSAGE");
+        Message m = ((MessageHolder)context.getAttribute("MESSAGE")).get();
         if (m == null) {
             return null;
         }
-        return m.get(HttpUriRequest.class);
+        return (HttpRequest)m.remove(HttpUriRequest.class.getName());
     }
 
     public ConsumingNHttpEntity responseEntity(HttpResponse response, HttpContext context)
         throws IOException {
-        Message m = (Message)context.getAttribute("MESSAGE");
+        MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE");
+        Message m = holder.get();
         WrappedOutputStream out = m.get(WrappedOutputStream.class);
         out.setResponse(response);
         return out;
     }
 
     public void handleResponse(HttpResponse response, HttpContext context) throws IOException {
+        MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE");
+        Message m = holder.getAndRemove();
+        SessionRequest r = m.get(SessionRequest.class);
+        holder.setSession(r);
+        String key = holder.getKey();
+        Stack<SessionRequest> srs = sessions.get(key);
+        if (srs == null) {
+            srs = new Stack<SessionRequest>();
+            sessions.put(key, srs);
+        }
+        srs.push(r);
     }
 
     public void finalizeContext(HttpContext context) {
-        context.removeAttribute("MESSAGE");
+        MessageHolder holder = (MessageHolder)context.getAttribute("MESSAGE");
+        String key = holder.getKey();
+        Stack<SessionRequest> srs = sessions.get(key);
+        if (srs != null) {
+            srs.removeElement(holder.getSession());
+        }        
     }
     
 }
\ No newline at end of file