You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/12/06 21:50:20 UTC
svn commit: r1211116 - in
/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol:
HttpAsyncClientProtocolHandler.java HttpAsyncServiceHandler.java
Author: olegk
Date: Tue Dec 6 20:50:19 2011
New Revision: 1211116
URL: http://svn.apache.org/viewvc?rev=1211116&view=rev
Log:
Synchronize on exchange state object
Modified:
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java?rev=1211116&r1=1211115&r2=1211116&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java Tue Dec 6 20:50:19 2011
@@ -70,8 +70,10 @@ public class HttpAsyncClientProtocolHand
public void closed(final NHttpClientConnection conn) {
State state = getState(conn);
if (state != null) {
- closeHandler(state);
- state.reset();
+ synchronized (state) {
+ closeHandler(state);
+ state.reset();
+ }
}
}
@@ -80,8 +82,10 @@ public class HttpAsyncClientProtocolHand
shutdownConnection(conn);
State state = getState(conn);
if (state != null) {
- closeHandler(state, cause);
- state.reset();
+ synchronized (state) {
+ closeHandler(state, cause);
+ state.reset();
+ }
} else {
log(cause);
}
@@ -90,43 +94,45 @@ public class HttpAsyncClientProtocolHand
public void requestReady(
final NHttpClientConnection conn) throws IOException, HttpException {
State state = ensureNotNull(getState(conn));
- if (state.getRequestState() != MessageState.READY) {
- return;
- }
- HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
- if (handler != null && handler.isDone()) {
- closeHandler(state);
- state.reset();
- handler = null;
- }
- if (handler == null) {
- handler = (HttpAsyncClientExchangeHandler<?>) conn.getContext().removeAttribute(
- HTTP_HANDLER);
- state.setHandler(handler);
- }
- if (handler == null) {
- return;
- }
- HttpContext context = handler.getContext();
- HttpRequest request = handler.generateRequest();
- state.setRequest(request);
-
- conn.submitRequest(request);
-
- if (request instanceof HttpEntityEnclosingRequest) {
- if (((HttpEntityEnclosingRequest) request).expectContinue()) {
- int timeout = conn.getSocketTimeout();
- state.setTimeout(timeout);
- timeout = request.getParams().getIntParameter(
- CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
- conn.setSocketTimeout(timeout);
- state.setRequestState(MessageState.ACK_EXPECTED);
+ synchronized (state) {
+ if (state.getRequestState() != MessageState.READY) {
+ return;
+ }
+ HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
+ if (handler != null && handler.isDone()) {
+ closeHandler(state);
+ state.reset();
+ handler = null;
+ }
+ if (handler == null) {
+ handler = (HttpAsyncClientExchangeHandler<?>) conn.getContext().removeAttribute(
+ HTTP_HANDLER);
+ state.setHandler(handler);
+ }
+ if (handler == null) {
+ return;
+ }
+ HttpContext context = handler.getContext();
+ HttpRequest request = handler.generateRequest();
+ state.setRequest(request);
+
+ conn.submitRequest(request);
+
+ if (request instanceof HttpEntityEnclosingRequest) {
+ if (((HttpEntityEnclosingRequest) request).expectContinue()) {
+ int timeout = conn.getSocketTimeout();
+ state.setTimeout(timeout);
+ timeout = request.getParams().getIntParameter(
+ CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
+ conn.setSocketTimeout(timeout);
+ state.setRequestState(MessageState.ACK_EXPECTED);
+ } else {
+ state.setRequestState(MessageState.BODY_STREAM);
+ }
} else {
- state.setRequestState(MessageState.BODY_STREAM);
+ handler.requestCompleted(context);
+ state.setRequestState(MessageState.COMPLETED);
}
- } else {
- handler.requestCompleted(context);
- state.setRequestState(MessageState.COMPLETED);
}
}
@@ -134,60 +140,64 @@ public class HttpAsyncClientProtocolHand
final NHttpClientConnection conn,
final ContentEncoder encoder) throws IOException {
State state = ensureNotNull(getState(conn));
- HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
- if (state.getRequestState() == MessageState.ACK_EXPECTED) {
- conn.suspendOutput();
- return;
- }
- HttpContext context = handler.getContext();
- handler.produceContent(encoder, conn);
- state.setRequestState(MessageState.BODY_STREAM);
- if (encoder.isCompleted()) {
- handler.requestCompleted(context);
- state.setRequestState(MessageState.COMPLETED);
+ synchronized (state) {
+ HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ conn.suspendOutput();
+ return;
+ }
+ HttpContext context = handler.getContext();
+ handler.produceContent(encoder, conn);
+ state.setRequestState(MessageState.BODY_STREAM);
+ if (encoder.isCompleted()) {
+ handler.requestCompleted(context);
+ state.setRequestState(MessageState.COMPLETED);
+ }
}
}
public void responseReceived(
final NHttpClientConnection conn) throws HttpException, IOException {
State state = ensureNotNull(getState(conn));
- HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
- HttpResponse response = conn.getHttpResponse();
- HttpRequest request = state.getRequest();
-
- int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode < HttpStatus.SC_OK) {
- // 1xx intermediate response
- if (statusCode != HttpStatus.SC_CONTINUE) {
- throw new ProtocolException(
- "Unexpected response: " + response.getStatusLine());
+ synchronized (state) {
+ HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
+ HttpResponse response = conn.getHttpResponse();
+ HttpRequest request = state.getRequest();
+
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode < HttpStatus.SC_OK) {
+ // 1xx intermediate response
+ if (statusCode != HttpStatus.SC_CONTINUE) {
+ throw new ProtocolException(
+ "Unexpected response: " + response.getStatusLine());
+ }
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ int timeout = state.getTimeout();
+ conn.setSocketTimeout(timeout);
+ conn.requestOutput();
+ state.setRequestState(MessageState.ACK);
+ }
+ return;
}
+ state.setResponse(response);
if (state.getRequestState() == MessageState.ACK_EXPECTED) {
int timeout = state.getTimeout();
conn.setSocketTimeout(timeout);
- conn.requestOutput();
- state.setRequestState(MessageState.ACK);
+ conn.resetOutput();
+ state.setRequestState(MessageState.COMPLETED);
+ } else if (state.getRequestState() == MessageState.BODY_STREAM) {
+ // Early response
+ conn.resetOutput();
+ conn.suspendOutput();
+ state.setRequestState(MessageState.COMPLETED);
+ state.invalidate();
+ }
+ handler.responseReceived(response);
+ state.setResponseState(MessageState.BODY_STREAM);
+ if (!canResponseHaveBody(request, response)) {
+ conn.resetInput();
+ processResponse(conn, state, handler);
}
- return;
- }
- state.setResponse(response);
- if (state.getRequestState() == MessageState.ACK_EXPECTED) {
- int timeout = state.getTimeout();
- conn.setSocketTimeout(timeout);
- conn.resetOutput();
- state.setRequestState(MessageState.COMPLETED);
- } else if (state.getRequestState() == MessageState.BODY_STREAM) {
- // Early response
- conn.resetOutput();
- conn.suspendOutput();
- state.setRequestState(MessageState.COMPLETED);
- state.invalidate();
- }
- handler.responseReceived(response);
- state.setResponseState(MessageState.BODY_STREAM);
- if (!canResponseHaveBody(request, response)) {
- conn.resetInput();
- processResponse(conn, state, handler);
}
}
@@ -195,11 +205,13 @@ public class HttpAsyncClientProtocolHand
final NHttpClientConnection conn,
final ContentDecoder decoder) throws IOException {
State state = ensureNotNull(getState(conn));
- HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
- handler.consumeContent(decoder, conn);
- state.setResponseState(MessageState.BODY_STREAM);
- if (decoder.isCompleted()) {
- processResponse(conn, state, handler);
+ synchronized (state) {
+ HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
+ handler.consumeContent(decoder, conn);
+ state.setResponseState(MessageState.BODY_STREAM);
+ if (decoder.isCompleted()) {
+ processResponse(conn, state, handler);
+ }
}
}
@@ -207,14 +219,16 @@ public class HttpAsyncClientProtocolHand
final NHttpClientConnection conn) throws IOException {
State state = getState(conn);
if (state != null) {
- if (state.getRequestState() == MessageState.ACK_EXPECTED) {
- int timeout = state.getTimeout();
- conn.setSocketTimeout(timeout);
- conn.requestOutput();
- state.setRequestState(MessageState.BODY_STREAM);
- return;
- } else {
- closeHandler(state, new SocketTimeoutException());
+ synchronized (state) {
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ int timeout = state.getTimeout();
+ conn.setSocketTimeout(timeout);
+ conn.requestOutput();
+ state.setRequestState(MessageState.BODY_STREAM);
+ return;
+ } else {
+ closeHandler(state, new SocketTimeoutException());
+ }
}
}
if (conn.getStatus() == NHttpConnection.ACTIVE) {
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1211116&r1=1211115&r2=1211116&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Tue Dec 6 20:50:19 2011
@@ -118,12 +118,14 @@ public class HttpAsyncServiceHandler imp
public void closed(final NHttpServerConnection conn) {
State state = getState(conn);
if (state != null) {
- closeHandlers(state);
- Cancellable asyncProcess = state.getAsyncProcess();
- if (asyncProcess != null) {
- asyncProcess.cancel();
+ synchronized (state) {
+ closeHandlers(state);
+ Cancellable asyncProcess = state.getAsyncProcess();
+ if (asyncProcess != null) {
+ asyncProcess.cancel();
+ }
+ state.reset();
}
- state.reset();
}
}
@@ -131,34 +133,38 @@ public class HttpAsyncServiceHandler imp
final NHttpServerConnection conn, final Exception cause) {
State state = ensureNotNull(getState(conn));
if (state != null) {
- closeHandlers(state, cause);
- if (cause instanceof HttpException) {
- if (conn.isResponseSubmitted() || state.getResponseState() != MessageState.READY) {
- // There is not much that we can do if a response
- // has already been submitted
- closeConnection(conn);
- } else {
- HttpContext context = state.getContext();
- HttpAsyncResponseProducer responseProducer = handleException(cause, context);
- state.setResponseProducer(responseProducer);
- try {
- HttpResponse response = responseProducer.generateResponse();
- state.setResponse(response);
- commitFinalResponse(conn, state);
- } catch (Exception ex) {
- shutdownConnection(conn);
- closeHandlers(state);
- state.reset();
- if (ex instanceof RuntimeException) {
- throw (RuntimeException) ex;
- } else {
- log(ex);
+ synchronized (state) {
+ closeHandlers(state, cause);
+ if (cause instanceof HttpException) {
+ if (conn.isResponseSubmitted()
+ || state.getResponseState() != MessageState.READY) {
+ // There is not much that we can do if a response
+ // has already been submitted
+ closeConnection(conn);
+ } else {
+ HttpContext context = state.getContext();
+ HttpAsyncResponseProducer responseProducer = handleException(
+ cause, context);
+ state.setResponseProducer(responseProducer);
+ try {
+ HttpResponse response = responseProducer.generateResponse();
+ state.setResponse(response);
+ commitFinalResponse(conn, state);
+ } catch (Exception ex) {
+ shutdownConnection(conn);
+ closeHandlers(state);
+ state.reset();
+ if (ex instanceof RuntimeException) {
+ throw (RuntimeException) ex;
+ } else {
+ log(ex);
+ }
}
}
+ } else {
+ shutdownConnection(conn);
+ state.reset();
}
- } else {
- shutdownConnection(conn);
- state.reset();
}
} else {
shutdownConnection(conn);
@@ -169,45 +175,47 @@ public class HttpAsyncServiceHandler imp
public void requestReceived(
final NHttpServerConnection conn) throws IOException, HttpException {
State state = ensureNotNull(getState(conn));
- HttpRequest request = conn.getHttpRequest();
- HttpContext context = state.getContext();
- request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
-
- context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
- context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
- this.httpProcessor.process(request, context);
-
- state.setRequest(request);
- HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
- state.setRequestHandler(requestHandler);
- HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
- state.setRequestConsumer(consumer);
-
- consumer.requestReceived(request);
-
- if (request instanceof HttpEntityEnclosingRequest) {
- if (((HttpEntityEnclosingRequest) request).expectContinue()) {
- state.setRequestState(MessageState.ACK_EXPECTED);
- HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
- HttpStatus.SC_CONTINUE, context);
- if (this.expectationVerifier != null) {
- conn.suspendInput();
- HttpAsyncServiceExchange httpex = new Exchange(
- request, ack, state, conn);
- Cancellable asyncProcess = this.expectationVerifier.verify(httpex, context);
- state.setAsyncProcess(asyncProcess);
+ synchronized (state) {
+ HttpRequest request = conn.getHttpRequest();
+ HttpContext context = state.getContext();
+ request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
+
+ context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
+ this.httpProcessor.process(request, context);
+
+ state.setRequest(request);
+ HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
+ state.setRequestHandler(requestHandler);
+ HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
+ state.setRequestConsumer(consumer);
+
+ consumer.requestReceived(request);
+
+ if (request instanceof HttpEntityEnclosingRequest) {
+ if (((HttpEntityEnclosingRequest) request).expectContinue()) {
+ state.setRequestState(MessageState.ACK_EXPECTED);
+ HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
+ HttpStatus.SC_CONTINUE, context);
+ if (this.expectationVerifier != null) {
+ conn.suspendInput();
+ HttpAsyncServiceExchange httpex = new Exchange(
+ request, ack, state, conn);
+ Cancellable asyncProcess = this.expectationVerifier.verify(httpex, context);
+ state.setAsyncProcess(asyncProcess);
+ } else {
+ conn.submitResponse(ack);
+ state.setRequestState(MessageState.BODY_STREAM);
+ }
} else {
- conn.submitResponse(ack);
state.setRequestState(MessageState.BODY_STREAM);
}
} else {
- state.setRequestState(MessageState.BODY_STREAM);
+ // No request content is expected.
+ // Process request right away
+ conn.suspendInput();
+ processRequest(conn, state);
}
- } else {
- // No request content is expected.
- // Process request right away
- conn.suspendInput();
- processRequest(conn, state);
}
}
@@ -215,55 +223,59 @@ public class HttpAsyncServiceHandler imp
final NHttpServerConnection conn,
final ContentDecoder decoder) throws IOException, HttpException {
State state = ensureNotNull(getState(conn));
- HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
- consumer.consumeContent(decoder, conn);
- state.setRequestState(MessageState.BODY_STREAM);
- if (decoder.isCompleted()) {
- conn.suspendInput();
- processRequest(conn, state);
+ synchronized (state) {
+ HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
+ consumer.consumeContent(decoder, conn);
+ state.setRequestState(MessageState.BODY_STREAM);
+ if (decoder.isCompleted()) {
+ conn.suspendInput();
+ processRequest(conn, state);
+ }
}
}
public void responseReady(
final NHttpServerConnection conn) throws IOException, HttpException {
State state = ensureNotNull(getState(conn));
- if (state.getResponse() != null) {
- return;
- }
- HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
- if (responseProducer == null) {
- return;
- }
- HttpContext context = state.getContext();
- HttpResponse response = responseProducer.generateResponse();
- int status = response.getStatusLine().getStatusCode();
- if (state.getRequestState() == MessageState.ACK_EXPECTED) {
- if (status == 100) {
- state.setResponseProducer(null);
- try {
- // Make sure 100 response has no entity
- response.setEntity(null);
- conn.requestInput();
- state.setRequestState(MessageState.BODY_STREAM);
- conn.submitResponse(response);
- responseProducer.responseCompleted(context);
- } finally {
- responseProducer.close();
- }
- } else if (status >= 400) {
- conn.resetInput();
- state.setRequestState(MessageState.COMPLETED);
- state.setResponse(response);
- commitFinalResponse(conn, state);
- } else {
- throw new HttpException("Invalid response: " + response.getStatusLine());
+ synchronized (state) {
+ if (state.getResponse() != null) {
+ return;
}
- } else {
- if (status >= 200) {
- state.setResponse(response);
- commitFinalResponse(conn, state);
+ HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+ if (responseProducer == null) {
+ return;
+ }
+ HttpContext context = state.getContext();
+ HttpResponse response = responseProducer.generateResponse();
+ int status = response.getStatusLine().getStatusCode();
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ if (status == 100) {
+ try {
+ // Make sure 100 response has no entity
+ response.setEntity(null);
+ conn.requestInput();
+ state.setRequestState(MessageState.BODY_STREAM);
+ conn.submitResponse(response);
+ responseProducer.responseCompleted(context);
+ } finally {
+ state.setResponseProducer(null);
+ responseProducer.close();
+ }
+ } else if (status >= 400) {
+ conn.resetInput();
+ state.setRequestState(MessageState.COMPLETED);
+ state.setResponse(response);
+ commitFinalResponse(conn, state);
+ } else {
+ throw new HttpException("Invalid response: " + response.getStatusLine());
+ }
} else {
- throw new HttpException("Invalid response: " + response.getStatusLine());
+ if (status >= 200) {
+ state.setResponse(response);
+ commitFinalResponse(conn, state);
+ } else {
+ throw new HttpException("Invalid response: " + response.getStatusLine());
+ }
}
}
}
@@ -272,28 +284,32 @@ public class HttpAsyncServiceHandler imp
final NHttpServerConnection conn,
final ContentEncoder encoder) throws IOException {
State state = ensureNotNull(getState(conn));
- HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
- HttpContext context = state.getContext();
- HttpResponse response = state.getResponse();
+ synchronized (state) {
+ HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+ HttpContext context = state.getContext();
+ HttpResponse response = state.getResponse();
- responseProducer.produceContent(encoder, conn);
- state.setResponseState(MessageState.BODY_STREAM);
- if (encoder.isCompleted()) {
- responseProducer.responseCompleted(context);
- if (!this.connStrategy.keepAlive(response, context)) {
- conn.close();
- } else {
- conn.requestInput();
+ responseProducer.produceContent(encoder, conn);
+ state.setResponseState(MessageState.BODY_STREAM);
+ if (encoder.isCompleted()) {
+ responseProducer.responseCompleted(context);
+ if (!this.connStrategy.keepAlive(response, context)) {
+ conn.close();
+ } else {
+ conn.requestInput();
+ }
+ closeHandlers(state);
+ state.reset();
}
- closeHandlers(state);
- state.reset();
}
}
public void timeout(final NHttpServerConnection conn) throws IOException {
State state = getState(conn);
if (state != null) {
- closeHandlers(state, new SocketTimeoutException());
+ synchronized (state) {
+ closeHandlers(state, new SocketTimeoutException());
+ }
}
if (conn.getStatus() == NHttpConnection.ACTIVE) {
conn.close();
@@ -626,8 +642,6 @@ public class HttpAsyncServiceHandler imp
private final State state;
private final NHttpServerConnection conn;
- private volatile boolean completed;
-
public Exchange(
final HttpRequest request,
final HttpResponse response,
@@ -648,16 +662,17 @@ public class HttpAsyncServiceHandler imp
return this.response;
}
- public synchronized void submitResponse(final HttpAsyncResponseProducer responseProducer) {
+ public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
if (responseProducer == null) {
throw new IllegalArgumentException("Response producer may not be null");
}
- if (this.completed) {
- throw new IllegalStateException("Response already submitted");
+ synchronized (this.state) {
+ if (this.state.getResponseProducer() != null) {
+ throw new IllegalStateException("Response already submitted");
+ }
+ this.state.setResponseProducer(responseProducer);
+ this.conn.requestOutput();
}
- this.completed = true;
- this.state.setResponseProducer(responseProducer);
- this.conn.requestOutput();
}
public void submitResponse() {
@@ -665,7 +680,9 @@ public class HttpAsyncServiceHandler imp
}
public boolean isCompleted() {
- return this.completed;
+ synchronized (this.state) {
+ return this.state.getResponseProducer() != null;
+ }
}
}