You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2017/04/14 11:13:28 UTC

svn commit: r1791350 [2/2] - in /httpcomponents/httpcore/trunk: httpcore5-h2/src/examples/org/apache/hc/core5/http/examples/ httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/http/ httpcore5/src/examples/org/apache/hc/core5/http/examples/...

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java Fri Apr 14 11:13:28 2017
@@ -50,7 +50,7 @@ import org.apache.hc.core5.http.nio.Asyn
 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
 import org.apache.hc.core5.http.nio.BasicResponseProducer;
-import org.apache.hc.core5.http.nio.ContentDecoder;
+import org.apache.hc.core5.http.nio.CapacityChannel;
 import org.apache.hc.core5.http.nio.DataStreamChannel;
 import org.apache.hc.core5.http.nio.HandlerFactory;
 import org.apache.hc.core5.http.nio.HttpContextAware;
@@ -69,13 +69,12 @@ class ServerHttp1StreamHandler implement
     private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
     private final ConnectionReuseStrategy connectionReuseStrategy;
     private final HttpCoreContext context;
-    private final ByteBuffer inputBuffer;
     private final AtomicBoolean responseCommitted;
     private final AtomicBoolean done;
 
+    private volatile boolean keepAlive;
     private volatile AsyncServerExchangeHandler exchangeHandler;
     private volatile HttpRequest receivedRequest;
-    private volatile HttpResponse committedResponse;
     private volatile MessageState requestState;
     private volatile MessageState responseState;
 
@@ -84,8 +83,7 @@ class ServerHttp1StreamHandler implement
             final HttpProcessor httpProcessor,
             final ConnectionReuseStrategy connectionReuseStrategy,
             final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
-            final HttpCoreContext context,
-            final ByteBuffer inputBuffer) {
+            final HttpCoreContext context) {
         this.outputChannel = outputChannel;
         this.internalDataChannel = new DataStreamChannel() {
 
@@ -97,6 +95,9 @@ class ServerHttp1StreamHandler implement
             @Override
             public void endStream(final List<? extends Header> trailers) throws IOException {
                 outputChannel.complete(trailers);
+                if (!keepAlive) {
+                    outputChannel.close();
+                }
                 responseState = MessageState.COMPLETE;
             }
 
@@ -116,9 +117,9 @@ class ServerHttp1StreamHandler implement
         this.connectionReuseStrategy = connectionReuseStrategy;
         this.exchangeHandlerFactory = exchangeHandlerFactory;
         this.context = context;
-        this.inputBuffer = inputBuffer;
         this.responseCommitted = new AtomicBoolean(false);
         this.done = new AtomicBoolean(false);
+        this.keepAlive = true;
         this.requestState = MessageState.HEADERS;
         this.responseState = MessageState.IDLE;
     }
@@ -157,9 +158,16 @@ class ServerHttp1StreamHandler implement
             httpProcessor.process(response, responseEntityDetails, context);
 
             final boolean endStream = responseEntityDetails == null || method.equalsIgnoreCase("HEAD");
+
+            if (!connectionReuseStrategy.keepAlive(receivedRequest, response, context)) {
+                keepAlive = false;
+            }
+
             outputChannel.submit(response, endStream);
-            committedResponse = response;
             if (endStream) {
+                if (!keepAlive) {
+                    outputChannel.close();
+                }
                 responseState = MessageState.COMPLETE;
             } else {
                 responseState = MessageState.BODY;
@@ -189,7 +197,7 @@ class ServerHttp1StreamHandler implement
         outputChannel.activate();
     }
 
-    boolean isResponseCompleted() {
+    boolean isResponseFinal() {
         return responseState == MessageState.COMPLETE;
     }
 
@@ -197,11 +205,6 @@ class ServerHttp1StreamHandler implement
         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
     }
 
-    boolean keepAlive() {
-        return receivedRequest != null && committedResponse != null &&
-                connectionReuseStrategy.keepAlive(receivedRequest, committedResponse, context);
-    }
-
     AsyncResponseProducer handleException(final Exception ex) {
         final int code;
         if (ex instanceof MethodNotSupportedException) {
@@ -303,35 +306,26 @@ class ServerHttp1StreamHandler implement
         }
     }
 
-    int consumeData(final ContentDecoder contentDecoder) throws HttpException, IOException {
+    int consumeData(final ByteBuffer src) throws HttpException, IOException {
         if (done.get() || requestState != MessageState.BODY) {
             throw new ProtocolException("Unexpected message data");
         }
         if (responseState == MessageState.ACK) {
             outputChannel.requestOutput();
         }
-        int total = 0;
-        int byteRead;
-        while ((byteRead = contentDecoder.read(inputBuffer)) > 0) {
-            total += byteRead;
-            inputBuffer.flip();
-            final int capacity = exchangeHandler.consume(inputBuffer);
-            inputBuffer.clear();
-            if (capacity <= 0) {
-                if (!contentDecoder.isCompleted()) {
-                    outputChannel.suspendInput();
-                    exchangeHandler.updateCapacity(outputChannel);
-                }
-                break;
-            }
-        }
-        if (contentDecoder.isCompleted()) {
-            requestState = MessageState.COMPLETE;
-            exchangeHandler.streamEnd(contentDecoder.getTrailers());
-            return total > 0 ? total : -1;
-        } else {
-            return total;
+        return exchangeHandler.consume(src);
+    }
+
+    void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        exchangeHandler.updateCapacity(capacityChannel);
+    }
+
+    void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
+        if (done.get() || requestState != MessageState.BODY) {
+            throw new ProtocolException("Unexpected message data");
         }
+        requestState = MessageState.COMPLETE;
+        exchangeHandler.streamEnd(trailers);
     }
 
     void failed(final Exception cause) {

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/AsyncClientEndpoint.java Fri Apr 14 11:13:28 2017
@@ -114,62 +114,4 @@ public abstract class AsyncClientEndpoin
         return execute(requestProducer, responseConsumer, null, callback);
     }
 
-    /**
-     * Initiates a message exchange using the given request producer and response consumer and
-     * automatically invokes {@link #releaseAndReuse()} upon its successful completion.
-     */
-    public final <T> Future<T> executeAndRelease(
-            final AsyncRequestProducer requestProducer,
-            final AsyncResponseConsumer<T> responseConsumer,
-            final HttpContext context,
-            final FutureCallback<T> callback) {
-        return execute(requestProducer, responseConsumer, context, new FutureCallback<T>() {
-
-            @Override
-            public void completed(final T result) {
-                try {
-                    if (callback != null) {
-                        callback.completed(result);
-                    }
-                } finally {
-                    releaseAndReuse();
-                }
-            }
-
-            @Override
-            public void failed(final Exception ex) {
-                try {
-                    if (callback != null) {
-                        callback.failed(ex);
-                    }
-                } finally {
-                    releaseAndDiscard();
-                }
-            }
-
-            @Override
-            public void cancelled() {
-                try {
-                    if (callback != null) {
-                        callback.cancelled();
-                    }
-                } finally {
-                    releaseAndDiscard();
-                }
-            }
-
-        });
-    }
-
-    /**
-     * Initiates a message exchange using the given request producer and response consumer and
-     * automatically invokes {@link #releaseAndReuse()} upon its successful completion.
-     */
-    public final <T> Future<T> executeAndRelease(
-            final AsyncRequestProducer requestProducer,
-            final AsyncResponseConsumer<T> responseConsumer,
-            final FutureCallback<T> callback) {
-        return executeAndRelease(requestProducer, responseConsumer, null, callback);
-    }
-
 }

Modified: httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicClientExchangeHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicClientExchangeHandler.java?rev=1791350&r1=1791349&r2=1791350&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicClientExchangeHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicClientExchangeHandler.java Fri Apr 14 11:13:28 2017
@@ -101,6 +101,7 @@ public class BasicClientExchangeHandler<
 
             @Override
             public void completed(final T result) {
+                releaseResources();
                 if (resultCallback != null) {
                     resultCallback.completed(result);
                 }
@@ -108,6 +109,7 @@ public class BasicClientExchangeHandler<
 
             @Override
             public void failed(final Exception ex) {
+                releaseResources();
                 if (resultCallback != null) {
                     resultCallback.failed(ex);
                 }
@@ -115,6 +117,7 @@ public class BasicClientExchangeHandler<
 
             @Override
             public void cancelled() {
+                releaseResources();
                 if (resultCallback != null) {
                     resultCallback.cancelled();
                 }
@@ -125,10 +128,10 @@ public class BasicClientExchangeHandler<
 
     @Override
     public void cancel() {
+        releaseResources();
         if (resultCallback != null) {
             resultCallback.cancelled();
         }
-        releaseResources();
     }
 
     @Override
@@ -148,12 +151,15 @@ public class BasicClientExchangeHandler<
 
     @Override
     public final void failed(final Exception cause) {
-        if (resultCallback != null) {
-            resultCallback.failed(cause);
+        try {
+            requestProducer.failed(cause);
+            responseConsumer.failed(cause);
+        } finally {
+            releaseResources();
+            if (resultCallback != null) {
+                resultCallback.failed(cause);
+            }
         }
-        requestProducer.failed(cause);
-        responseConsumer.failed(cause);
-        releaseResources();
     }
 
     @Override