You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2014/05/14 11:56:36 UTC

svn commit: r1594528 - in /httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol: HttpAsyncService.java MessageState.java

Author: olegk
Date: Wed May 14 09:56:36 2014
New Revision: 1594528

URL: http://svn.apache.org/r1594528
Log:
Fixed race condition in async response processing in pipelining mode

Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java?rev=1594528&r1=1594527&r2=1594528&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java Wed May 14 09:56:36 2014
@@ -383,11 +383,11 @@ public class HttpAsyncService implements
                     conn.suspendOutput();
                     return;
                 }
+                state.setResponseState(MessageState.INIT);
                 final Object result = pipelineEntry.getResult();
                 final HttpRequest request = pipelineEntry.getRequest();
                 final HttpContext context = pipelineEntry.getContext();
                 if (result != null) {
-                    state.setResponseState(MessageState.INIT);
                     final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
                             HttpStatus.SC_OK, context);
                     final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
@@ -404,16 +404,22 @@ public class HttpAsyncService implements
                     state.setOutgoing(new Outgoing(request, error, responseProducer, context));
                 }
             }
-            final Outgoing outgoing = state.getOutgoing();
-            if (outgoing == null) {
-                return;
-            }
-            final HttpResponse response = outgoing.getResponse();
-            final int status = response.getStatusLine().getStatusCode();
-            if (status >= 200) {
-                commitFinalResponse(conn, state);
-            } else {
-                throw new HttpException("Invalid response: " + response.getStatusLine());
+            if (state.getResponseState() == MessageState.INIT) {
+                final Outgoing outgoing;
+                synchronized (state) {
+                    outgoing = state.getOutgoing();
+                    if (outgoing == null) {
+                        conn.suspendOutput();
+                        return;
+                    }
+                }
+                final HttpResponse response = outgoing.getResponse();
+                final int status = response.getStatusLine().getStatusCode();
+                if (status >= 200) {
+                    commitFinalResponse(conn, state);
+                } else {
+                    throw new HttpException("Invalid response: " + response.getStatusLine());
+                }
             }
         }
     }
@@ -604,7 +610,9 @@ public class HttpAsyncService implements
         }
         final Queue<PipelineEntry> pipeline = state.getPipeline();
         pipeline.add(pipelineEntry);
-        conn.requestOutput();
+        if (state.getResponseState() == MessageState.READY) {
+            conn.requestOutput();
+        }
     }
 
     private void commitFinalResponse(
@@ -912,35 +920,34 @@ public class HttpAsyncService implements
 
         @Override
         public void setCallback(final Cancellable cancellable) {
-            synchronized (this) {
-                Asserts.check(!this.completed, "Response already submitted");
-                if (this.state.isTerminated() && cancellable != null) {
-                    cancellable.cancel();
-                } else {
-                    this.state.setCancellable(cancellable);
-                    this.conn.requestInput();
-                }
+            Asserts.check(!this.completed, "Response already submitted");
+            if (this.state.isTerminated() && cancellable != null) {
+                cancellable.cancel();
+            } else {
+                this.state.setCancellable(cancellable);
             }
         }
 
         @Override
         public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
             Args.notNull(responseProducer, "Response producer");
-            synchronized (this) {
-                Asserts.check(!this.completed, "Response already submitted");
-                this.completed = true;
-                if (!this.state.isTerminated()) {
-                    final HttpResponse response = responseProducer.generateResponse();
-                    final Outgoing outgoing = new Outgoing(
-                            this.request, response, responseProducer, this.context);
+            Asserts.check(!this.completed, "Response already submitted");
+            this.completed = true;
+            if (!this.state.isTerminated()) {
+                final HttpResponse response = responseProducer.generateResponse();
+                final Outgoing outgoing = new Outgoing(
+                        this.request, response, responseProducer, this.context);
+
+                synchronized (this.state) {
                     this.state.setOutgoing(outgoing);
                     this.state.setCancellable(null);
                     this.conn.requestOutput();
-                } else {
-                    try {
-                        responseProducer.close();
-                    } catch (final IOException ex) {
-                    }
+                }
+
+            } else {
+                try {
+                    responseProducer.close();
+                } catch (final IOException ex) {
                 }
             }
         }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java?rev=1594528&r1=1594527&r2=1594528&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java Wed May 14 09:56:36 2014
@@ -28,6 +28,6 @@ package org.apache.http.nio.protocol;
 
 enum MessageState {
 
-    READY, INIT, ACK_EXPECTED, ACK, BODY_STREAM, COMPLETED
+    READY, INIT, ACK_EXPECTED, BODY_STREAM, COMPLETED
 
 }