You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2014/05/14 11:56:36 UTC
svn commit: r1594528 - in
/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol:
HttpAsyncService.java MessageState.java
Author: olegk
Date: Wed May 14 09:56:36 2014
New Revision: 1594528
URL: http://svn.apache.org/r1594528
Log:
Fixed race condition in async response processing in pipelining mode
Modified:
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java?rev=1594528&r1=1594527&r2=1594528&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncService.java Wed May 14 09:56:36 2014
@@ -383,11 +383,11 @@ public class HttpAsyncService implements
conn.suspendOutput();
return;
}
+ state.setResponseState(MessageState.INIT);
final Object result = pipelineEntry.getResult();
final HttpRequest request = pipelineEntry.getRequest();
final HttpContext context = pipelineEntry.getContext();
if (result != null) {
- state.setResponseState(MessageState.INIT);
final HttpResponse response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
HttpStatus.SC_OK, context);
final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
@@ -404,16 +404,22 @@ public class HttpAsyncService implements
state.setOutgoing(new Outgoing(request, error, responseProducer, context));
}
}
- final Outgoing outgoing = state.getOutgoing();
- if (outgoing == null) {
- return;
- }
- final HttpResponse response = outgoing.getResponse();
- final int status = response.getStatusLine().getStatusCode();
- if (status >= 200) {
- commitFinalResponse(conn, state);
- } else {
- throw new HttpException("Invalid response: " + response.getStatusLine());
+ if (state.getResponseState() == MessageState.INIT) {
+ final Outgoing outgoing;
+ synchronized (state) {
+ outgoing = state.getOutgoing();
+ if (outgoing == null) {
+ conn.suspendOutput();
+ return;
+ }
+ }
+ final HttpResponse response = outgoing.getResponse();
+ final int status = response.getStatusLine().getStatusCode();
+ if (status >= 200) {
+ commitFinalResponse(conn, state);
+ } else {
+ throw new HttpException("Invalid response: " + response.getStatusLine());
+ }
}
}
}
@@ -604,7 +610,9 @@ public class HttpAsyncService implements
}
final Queue<PipelineEntry> pipeline = state.getPipeline();
pipeline.add(pipelineEntry);
- conn.requestOutput();
+ if (state.getResponseState() == MessageState.READY) {
+ conn.requestOutput();
+ }
}
private void commitFinalResponse(
@@ -912,35 +920,34 @@ public class HttpAsyncService implements
@Override
public void setCallback(final Cancellable cancellable) {
- synchronized (this) {
- Asserts.check(!this.completed, "Response already submitted");
- if (this.state.isTerminated() && cancellable != null) {
- cancellable.cancel();
- } else {
- this.state.setCancellable(cancellable);
- this.conn.requestInput();
- }
+ Asserts.check(!this.completed, "Response already submitted");
+ if (this.state.isTerminated() && cancellable != null) {
+ cancellable.cancel();
+ } else {
+ this.state.setCancellable(cancellable);
}
}
@Override
public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
Args.notNull(responseProducer, "Response producer");
- synchronized (this) {
- Asserts.check(!this.completed, "Response already submitted");
- this.completed = true;
- if (!this.state.isTerminated()) {
- final HttpResponse response = responseProducer.generateResponse();
- final Outgoing outgoing = new Outgoing(
- this.request, response, responseProducer, this.context);
+ Asserts.check(!this.completed, "Response already submitted");
+ this.completed = true;
+ if (!this.state.isTerminated()) {
+ final HttpResponse response = responseProducer.generateResponse();
+ final Outgoing outgoing = new Outgoing(
+ this.request, response, responseProducer, this.context);
+
+ synchronized (this.state) {
this.state.setOutgoing(outgoing);
this.state.setCancellable(null);
this.conn.requestOutput();
- } else {
- try {
- responseProducer.close();
- } catch (final IOException ex) {
- }
+ }
+
+ } else {
+ try {
+ responseProducer.close();
+ } catch (final IOException ex) {
}
}
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java?rev=1594528&r1=1594527&r2=1594528&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/MessageState.java Wed May 14 09:56:36 2014
@@ -28,6 +28,6 @@ package org.apache.http.nio.protocol;
enum MessageState {
- READY, INIT, ACK_EXPECTED, ACK, BODY_STREAM, COMPLETED
+ READY, INIT, ACK_EXPECTED, BODY_STREAM, COMPLETED
}