You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/11/29 15:11:51 UTC
svn commit: r1207880 [1/3] - in
/httpcomponents/httpcore/trunk/httpcore-nio/src:
main/java/org/apache/http/nio/protocol/
test/java/org/apache/http/nio/protocol/
Author: olegk
Date: Tue Nov 29 14:11:49 2011
New Revision: 1207880
URL: http://svn.apache.org/viewvc?rev=1207880&view=rev
Log:
Refactored expectation handling code in HttpAsyncServiceHandler; refactored exception handling code in HttpAsyncServiceHandler and HttpAsyncClientProtocolHandler
Modified:
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java
httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncClientProtocolHandler.java
httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncServiceHandler.java
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java Tue Nov 29 14:11:49 2011
@@ -130,6 +130,15 @@ public abstract class AbstractAsyncReque
}
}
+ public final synchronized void failed(final Exception ex) {
+ if (this.completed) {
+ return;
+ }
+ this.completed = true;
+ this.ex = ex;
+ releaseResources();
+ }
+
public final synchronized void close() throws IOException {
if (this.completed) {
return;
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java Tue Nov 29 14:11:49 2011
@@ -114,12 +114,17 @@ public class BasicAsyncRequestProducer i
public void requestCompleted(final HttpContext context) {
}
+ public void failed(final Exception ex) {
+ }
+
public synchronized boolean isRepeatable() {
return this.producer == null || this.producer.isRepeatable();
}
public synchronized void resetRequest() throws IOException {
- this.producer.close();
+ if (this.producer != null) {
+ this.producer.close();
+ }
}
public synchronized void close() throws IOException {
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java Tue Nov 29 14:11:49 2011
@@ -96,6 +96,9 @@ public class BasicAsyncResponseProducer
public void responseCompleted(final HttpContext context) {
}
+ public void failed(final Exception ex) {
+ }
+
public synchronized void close() throws IOException {
if (this.producer != null) {
this.producer.close();
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java Tue Nov 29 14:11:49 2011
@@ -87,6 +87,9 @@ class ErrorResponseProducer implements H
public void responseCompleted(final HttpContext context) {
}
+ public void failed(final Exception ex) {
+ }
+
public void close() throws IOException {
this.contentProducer.close();
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java Tue Nov 29 14:11:49 2011
@@ -59,51 +59,53 @@ public class HttpAsyncClientProtocolHand
}
public void connected(final NHttpClientConnection conn, final Object attachment) {
- HttpExchange httpexchange = new HttpExchange();
+ HttpExchangeState state = new HttpExchangeState();
HttpContext context = conn.getContext();
- context.setAttribute(HTTP_EXCHANGE, httpexchange);
+ context.setAttribute(HTTP_EXCHANGE, state);
requestReady(conn);
}
public void closed(final NHttpClientConnection conn) {
- HttpExchange httpexchange = getHttpExchange(conn);
- if (httpexchange != null) {
- httpexchange.clear();
+ HttpExchangeState state = getHttpExchange(conn);
+ if (state != null) {
+ state.clear();
}
}
public void exception(final NHttpClientConnection conn, final HttpException ex) {
- HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
- HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
- if (handler != null) {
- handler.failed(ex);
+ HttpExchangeState state = getHttpExchange(conn);
+ if (state != null) {
+ handleProtocolFailure(conn, state, ex);
+ } else {
+ shutdownConnection(conn);
+ onException(ex);
}
- closeConnection(conn);
}
public void exception(final NHttpClientConnection conn, final IOException ex) {
- HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
- HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
- if (handler != null) {
- handler.failed(ex);
+ HttpExchangeState state = getHttpExchange(conn);
+ if (state != null) {
+ handleFailure(conn, state, ex);
+ } else {
+ shutdownConnection(conn);
+ onException(ex);
}
- shutdownConnection(conn);
}
public void requestReady(final NHttpClientConnection conn) {
- HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
- if (httpexchange.getRequestState() != MessageState.READY) {
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+ if (state.getRequestState() != MessageState.READY) {
return;
}
- HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
+ HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
if (handler != null && handler.isDone()) {
- httpexchange.clear();
+ state.clear();
handler = null;
}
if (handler == null) {
handler = (HttpAsyncClientExchangeHandler<?>) conn.getContext().removeAttribute(
HTTP_HANDLER);
- httpexchange.setHandler(handler);
+ state.setHandler(handler);
}
if (handler == null) {
return;
@@ -111,68 +113,64 @@ public class HttpAsyncClientProtocolHand
try {
HttpContext context = handler.getContext();
HttpRequest request = handler.generateRequest();
- httpexchange.setRequest(request);
+ state.setRequest(request);
conn.submitRequest(request);
if (request instanceof HttpEntityEnclosingRequest) {
if (((HttpEntityEnclosingRequest) request).expectContinue()) {
int timeout = conn.getSocketTimeout();
- httpexchange.setTimeout(timeout);
+ state.setTimeout(timeout);
timeout = request.getParams().getIntParameter(
CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
conn.setSocketTimeout(timeout);
- httpexchange.setRequestState(MessageState.ACK_EXPECTED);
+ state.setRequestState(MessageState.ACK_EXPECTED);
} else {
- httpexchange.setRequestState(MessageState.BODY_STREAM);
+ state.setRequestState(MessageState.BODY_STREAM);
}
} else {
handler.requestCompleted(context);
- httpexchange.setRequestState(MessageState.COMPLETED);
+ state.setRequestState(MessageState.COMPLETED);
}
+ } catch (HttpException ex) {
+ handleProtocolFailure(conn, state, ex);
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
- handler.failed(ex);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- handler.failed(ex);
- onException(ex);
}
}
public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
- HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
- HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(httpexchange.getHandler());
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+ HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
try {
- if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
conn.suspendOutput();
return;
}
HttpContext context = handler.getContext();
handler.produceContent(encoder, conn);
- httpexchange.setRequestState(MessageState.BODY_STREAM);
+ state.setRequestState(MessageState.BODY_STREAM);
if (encoder.isCompleted()) {
handler.requestCompleted(context);
- httpexchange.setRequestState(MessageState.COMPLETED);
+ state.setRequestState(MessageState.COMPLETED);
}
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
- handler.failed(ex);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- handler.failed(ex);
- onException(ex);
}
}
public void responseReceived(final NHttpClientConnection conn) {
- HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
- HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(httpexchange.getHandler());
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+ HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
try {
HttpResponse response = conn.getHttpResponse();
- HttpRequest request = httpexchange.getRequest();
+ HttpRequest request = state.getRequest();
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode < HttpStatus.SC_OK) {
@@ -181,94 +179,86 @@ public class HttpAsyncClientProtocolHand
throw new ProtocolException(
"Unexpected response: " + response.getStatusLine());
}
- if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
- int timeout = httpexchange.getTimeout();
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ int timeout = state.getTimeout();
conn.setSocketTimeout(timeout);
conn.requestOutput();
- httpexchange.setRequestState(MessageState.ACK);
+ state.setRequestState(MessageState.ACK);
}
return;
}
- httpexchange.setResponse(response);
- if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
- int timeout = httpexchange.getTimeout();
+ state.setResponse(response);
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ int timeout = state.getTimeout();
conn.setSocketTimeout(timeout);
conn.resetOutput();
- httpexchange.setRequestState(MessageState.COMPLETED);
- } else if (httpexchange.getRequestState() == MessageState.BODY_STREAM) {
+ state.setRequestState(MessageState.COMPLETED);
+ } else if (state.getRequestState() == MessageState.BODY_STREAM) {
// Early response
conn.resetOutput();
conn.suspendOutput();
- httpexchange.setRequestState(MessageState.COMPLETED);
- httpexchange.invalidate();
+ state.setRequestState(MessageState.COMPLETED);
+ state.invalidate();
}
handler.responseReceived(response);
- httpexchange.setResponseState(MessageState.BODY_STREAM);
+ state.setResponseState(MessageState.BODY_STREAM);
if (!canResponseHaveBody(request, response)) {
conn.resetInput();
- processResponse(conn, httpexchange, handler);
+ processResponse(conn, state, handler);
}
+ } catch (HttpException ex) {
+ handleProtocolFailure(conn, state, ex);
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
- handler.failed(ex);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- handler.failed(ex);
- onException(ex);
}
}
public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
- HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
- HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(httpexchange.getHandler());
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+ HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
try {
handler.consumeContent(decoder, conn);
- httpexchange.setResponseState(MessageState.BODY_STREAM);
+ state.setResponseState(MessageState.BODY_STREAM);
if (decoder.isCompleted()) {
- processResponse(conn, httpexchange, handler);
+ processResponse(conn, state, handler);
}
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
- handler.failed(ex);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- handler.failed(ex);
- onException(ex);
}
}
public void timeout(final NHttpClientConnection conn) {
- HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
- HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
- if (handler == null) {
- shutdownConnection(conn);
- return;
- }
- try {
- if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
- int timeout = httpexchange.getTimeout();
+ HttpExchangeState state = getHttpExchange(conn);
+ if (state != null) {
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ int timeout = state.getTimeout();
conn.setSocketTimeout(timeout);
conn.requestOutput();
- httpexchange.setRequestState(MessageState.BODY_STREAM);
+ state.setRequestState(MessageState.BODY_STREAM);
+ return;
} else {
- handler.failed(new SocketTimeoutException());
- if (conn.getStatus() == NHttpConnection.ACTIVE) {
- closeConnection(conn);
- if (conn.getStatus() == NHttpConnection.CLOSING) {
- // Give the connection some grace time to
- // close itself nicely
- conn.setSocketTimeout(250);
- }
- } else {
- shutdownConnection(conn);
+ shutdownHttpExchange(state, new SocketTimeoutException());
+ }
+ }
+ try {
+ if (conn.getStatus() == NHttpConnection.ACTIVE) {
+ conn.close();
+ if (conn.getStatus() == NHttpConnection.CLOSING) {
+ // Give the connection some grace time to
+ // close itself nicely
+ conn.setSocketTimeout(250);
}
+ } else {
+ conn.shutdown();
}
- } catch (RuntimeException ex) {
- shutdownConnection(conn);
- handler.failed(ex);
- throw ex;
+ } catch (IOException ex) {
+ onException(ex);
}
}
@@ -291,11 +281,11 @@ public class HttpAsyncClientProtocolHand
}
}
- private HttpExchange getHttpExchange(final NHttpConnection conn) {
- return (HttpExchange) conn.getContext().getAttribute(HTTP_EXCHANGE);
+ private HttpExchangeState getHttpExchange(final NHttpConnection conn) {
+ return (HttpExchangeState) conn.getContext().getAttribute(HTTP_EXCHANGE);
}
- private HttpExchange ensureNotNull(final HttpExchange httpExchange) {
+ private HttpExchangeState ensureNotNull(final HttpExchangeState httpExchange) {
if (httpExchange == null) {
throw new IllegalStateException("HTTP exchange is null");
}
@@ -309,14 +299,48 @@ public class HttpAsyncClientProtocolHand
return handler;
}
+ private void shutdownHttpExchange(
+ final HttpExchangeState state,
+ final Exception ex) {
+ HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
+ if (handler != null) {
+ state.setHandler(null);
+ try {
+ handler.failed(ex);
+ } finally {
+ try {
+ handler.close();
+ } catch (IOException ioex) {
+ onException(ioex);
+ }
+ }
+ }
+ }
+
+ private void handleFailure(
+ final NHttpClientConnection conn,
+ final HttpExchangeState state,
+ final Exception ex) {
+ shutdownConnection(conn);
+ shutdownHttpExchange(state, ex);
+ }
+
+ private void handleProtocolFailure(
+ final NHttpClientConnection conn,
+ final HttpExchangeState state,
+ final HttpException ex) {
+ closeConnection(conn);
+ shutdownHttpExchange(state, ex);
+ }
+
private void processResponse(
final NHttpClientConnection conn,
- final HttpExchange httpexchange,
+ final HttpExchangeState state,
final HttpAsyncClientExchangeHandler<?> handler) throws IOException {
HttpContext context = handler.getContext();
- if (httpexchange.isValid()) {
- HttpRequest request = httpexchange.getRequest();
- HttpResponse response = httpexchange.getResponse();
+ if (state.isValid()) {
+ HttpRequest request = state.getRequest();
+ HttpResponse response = state.getResponse();
String method = request.getRequestLine().getMethod();
int status = response.getStatusLine().getStatusCode();
if (!(method.equalsIgnoreCase("CONNECT") && status < 300)) {
@@ -329,7 +353,7 @@ public class HttpAsyncClientProtocolHand
conn.close();
}
handler.responseCompleted(context);
- httpexchange.reset();
+ state.reset();
}
private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
@@ -351,7 +375,7 @@ public class HttpAsyncClientProtocolHand
static final String HTTP_EXCHANGE = "http.nio.exchange";
- class HttpExchange {
+ class HttpExchangeState {
private volatile HttpAsyncClientExchangeHandler<?> handler;
private volatile MessageState requestState;
@@ -361,7 +385,7 @@ public class HttpAsyncClientProtocolHand
private volatile boolean valid;
private volatile int timeout;
- HttpExchange() {
+ HttpExchangeState() {
super();
this.valid = true;
this.requestState = MessageState.READY;
@@ -373,9 +397,6 @@ public class HttpAsyncClientProtocolHand
}
public void setHandler(final HttpAsyncClientExchangeHandler<?> handler) {
- if (this.handler != null) {
- throw new IllegalStateException("Handler already set");
- }
this.handler = handler;
}
@@ -400,9 +421,6 @@ public class HttpAsyncClientProtocolHand
}
public void setRequest(final HttpRequest request) {
- if (this.request != null) {
- throw new IllegalStateException("Request already set");
- }
this.request = request;
}
@@ -411,9 +429,6 @@ public class HttpAsyncClientProtocolHand
}
public void setResponse(final HttpResponse response) {
- if (this.response != null) {
- throw new IllegalStateException("Response already set");
- }
this.response = response;
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java Tue Nov 29 14:11:49 2011
@@ -80,6 +80,13 @@ public interface HttpAsyncRequestConsume
void requestCompleted(HttpContext context);
/**
+ * Invoked to signal that the response processing terminated abnormally.
+ *
+ * @param ex exception
+ */
+ void failed(Exception ex);
+
+ /**
* Returns an exception in case of an abnormal termination. This method
* returns <code>null</code> if the request execution is still ongoing
* or if it completed successfully.
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java Tue Nov 29 14:11:49 2011
@@ -92,6 +92,13 @@ public interface HttpAsyncRequestProduce
void requestCompleted(HttpContext context);
/**
+ * Invoked to signal that the response processing terminated abnormally.
+ *
+ * @param ex exception
+ */
+ void failed(Exception ex);
+
+ /**
* Determines whether or not this producer is capable of producing
* HTTP request messages more than once.
*/
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java Tue Nov 29 14:11:49 2011
@@ -75,4 +75,11 @@ public interface HttpAsyncResponseProduc
*/
void responseCompleted(HttpContext context);
+ /**
+ * Invoked to signal that the response processing terminated abnormally.
+ *
+ * @param ex exception
+ */
+ void failed(Exception ex);
+
}
Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Tue Nov 29 14:11:49 2011
@@ -28,6 +28,7 @@
package org.apache.http.nio.protocol;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
@@ -107,150 +108,171 @@ public class HttpAsyncServiceHandler imp
}
public void connected(final NHttpServerConnection conn) {
- HttpExchange httpExchange = new HttpExchange();
- conn.getContext().setAttribute(HTTP_EXCHANGE, httpExchange);
+ HttpExchangeState state = new HttpExchangeState();
+ conn.getContext().setAttribute(HTTP_EXCHANGE, state);
}
public void closed(final NHttpServerConnection conn) {
- HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
- Cancellable asyncProcess = httpExchange.getAsyncProcess();
- httpExchange.clear();
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+ Cancellable asyncProcess = state.getAsyncProcess();
+ state.clear();
if (asyncProcess != null) {
asyncProcess.cancel();
}
}
public void exception(final NHttpServerConnection conn, final HttpException httpex) {
- if (conn.isResponseSubmitted()) {
- // There is not much that we can do if a response head
- // has already been submitted
- closeConnection(conn);
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+ if (state != null) {
+ handleProtocolFailure(conn, state, httpex);
+ } else {
+ shutdownConnection(conn);
onException(httpex);
- return;
}
+ }
- HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
- try {
- HttpAsyncResponseProducer responseProducer = handleException(httpex);
- httpExchange.setResponseProducer(responseProducer);
- commitResponse(conn, httpExchange);
- } catch (RuntimeException ex) {
- shutdownConnection(conn);
- throw ex;
- } catch (Exception ex) {
+ public void exception(final NHttpServerConnection conn, final IOException ex) {
+ HttpExchangeState state = getHttpExchange(conn);
+ if (state != null) {
+ handleFailure(conn, state, ex);
+ } else {
shutdownConnection(conn);
onException(ex);
}
}
- public void exception(final NHttpServerConnection conn, final IOException ex) {
- shutdownConnection(conn);
- onException(ex);
- }
-
public void requestReceived(final NHttpServerConnection conn) {
- HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
try {
HttpRequest request = conn.getHttpRequest();
- HttpContext context = httpExchange.getContext();
+ HttpContext context = state.getContext();
request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
this.httpProcessor.process(request, context);
- httpExchange.setRequest(request);
+ state.setRequest(request);
HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
- httpExchange.setRequestHandler(requestHandler);
+ state.setRequestHandler(requestHandler);
HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
- httpExchange.setRequestConsumer(consumer);
+ state.setRequestConsumer(consumer);
consumer.requestReceived(request);
if (request instanceof HttpEntityEnclosingRequest) {
if (((HttpEntityEnclosingRequest) request).expectContinue()) {
- httpExchange.setRequestState(MessageState.ACK_EXPECTED);
+ state.setRequestState(MessageState.ACK_EXPECTED);
if (this.expectationVerifier != null) {
conn.suspendInput();
- HttpAsyncContinueTrigger trigger = new ContinueTriggerImpl(httpExchange, conn);
+ HttpAsyncContinueTrigger trigger = new ContinueTriggerImpl(state, conn);
Cancellable asyncProcess = this.expectationVerifier.verify(request, trigger, context);
- httpExchange.setAsyncProcess(asyncProcess);
+ state.setAsyncProcess(asyncProcess);
} else {
HttpResponse response = create100Continue(request);
conn.submitResponse(response);
- httpExchange.setRequestState(MessageState.BODY_STREAM);
+ state.setRequestState(MessageState.BODY_STREAM);
}
} else {
- httpExchange.setRequestState(MessageState.BODY_STREAM);
+ state.setRequestState(MessageState.BODY_STREAM);
}
} else {
// No request content is expected.
// Process request right away
conn.suspendInput();
- processRequest(conn, httpExchange);
+ processRequest(conn, state);
}
+ } catch (HttpException ex) {
+ handleProtocolFailure(conn, state, ex);
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- onException(ex);
}
}
public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
- HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
try {
- HttpAsyncRequestConsumer<?> consumer = ensureNotNull(httpExchange.getRequestConsumer());
+ HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
consumer.consumeContent(decoder, conn);
- httpExchange.setRequestState(MessageState.BODY_STREAM);
+ state.setRequestState(MessageState.BODY_STREAM);
if (decoder.isCompleted()) {
conn.suspendInput();
- processRequest(conn, httpExchange);
+ processRequest(conn, state);
}
+ } catch (HttpException ex) {
+ handleProtocolFailure(conn, state, ex);
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- onException(ex);
}
}
public void responseReady(final NHttpServerConnection conn) {
- HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
try {
- if (httpExchange.getRequestState() == MessageState.ACK) {
- conn.requestInput();
- httpExchange.setRequestState(MessageState.BODY_STREAM);
- HttpRequest request = httpExchange.getRequest();
- HttpResponse response = create100Continue(request);
- conn.submitResponse(response);
- } else if (httpExchange.getResponse() == null && httpExchange.getResponseProducer() != null) {
- if (httpExchange.getRequestState() == MessageState.ACK_EXPECTED) {
+ if (state.getResponse() != null) {
+ return;
+ }
+ HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+ if (responseProducer == null) {
+ return;
+ }
+ HttpContext context = state.getContext();
+ HttpResponse response = responseProducer.generateResponse();
+ int status = response.getStatusLine().getStatusCode();
+ if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+ if (status == 100) {
+ state.setResponseProducer(null);
+ try {
+ // Make sure 100 response has no entity
+ response.setEntity(null);
+ conn.requestInput();
+ state.setRequestState(MessageState.BODY_STREAM);
+ conn.submitResponse(response);
+ responseProducer.responseCompleted(context);
+ } finally {
+ responseProducer.close();
+ }
+ } else if (status >= 400) {
conn.resetInput();
- httpExchange.setRequestState(MessageState.COMPLETED);
+ state.setRequestState(MessageState.COMPLETED);
+ state.setResponse(response);
+ commitFinalResponse(conn, state);
+ } else {
+ throw new HttpException("Invalid response: " + response.getStatusLine());
+ }
+ } else {
+ if (status >= 200) {
+ state.setResponse(response);
+ commitFinalResponse(conn, state);
+ } else {
+ throw new HttpException("Invalid response: " + response.getStatusLine());
}
- commitResponse(conn, httpExchange);
}
+ } catch (HttpException ex) {
+ handleProtocolFailure(conn, state, ex);
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- onException(ex);
}
}
public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
- HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+ HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
try {
- HttpAsyncResponseProducer responseProducer = httpExchange.getResponseProducer();
- HttpContext context = httpExchange.getContext();
- HttpResponse response = httpExchange.getResponse();
+ HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+ HttpContext context = state.getContext();
+ HttpResponse response = state.getResponse();
responseProducer.produceContent(encoder, conn);
- httpExchange.setResponseState(MessageState.BODY_STREAM);
+ state.setResponseState(MessageState.BODY_STREAM);
if (encoder.isCompleted()) {
responseProducer.responseCompleted(context);
if (!this.connStrategy.keepAlive(response, context)) {
@@ -258,18 +280,21 @@ public class HttpAsyncServiceHandler imp
} else {
conn.requestInput();
}
- httpExchange.clear();
+ state.clear();
}
+ } catch (IOException ex) {
+ handleFailure(conn, state, ex);
} catch (RuntimeException ex) {
- shutdownConnection(conn);
+ handleFailure(conn, state, ex);
throw ex;
- } catch (Exception ex) {
- shutdownConnection(conn);
- onException(ex);
}
}
public void timeout(final NHttpServerConnection conn) {
+ HttpExchangeState state = getHttpExchange(conn);
+ if (state != null) {
+ shutdownHttpExchange(state, new SocketTimeoutException());
+ }
try {
if (conn.getStatus() == NHttpConnection.ACTIVE) {
conn.close();
@@ -286,15 +311,15 @@ public class HttpAsyncServiceHandler imp
}
}
- private HttpExchange getHttpExchange(final NHttpConnection conn) {
- return (HttpExchange) conn.getContext().getAttribute(HTTP_EXCHANGE);
+ private HttpExchangeState getHttpExchange(final NHttpConnection conn) {
+ return (HttpExchangeState) conn.getContext().getAttribute(HTTP_EXCHANGE);
}
- private HttpExchange ensureNotNull(final HttpExchange httpExchange) {
- if (httpExchange == null) {
+ private HttpExchangeState ensureNotNull(final HttpExchangeState state) {
+ if (state == null) {
throw new IllegalStateException("HTTP exchange is null");
}
- return httpExchange;
+ return state;
}
private HttpAsyncRequestConsumer<Object> ensureNotNull(final HttpAsyncRequestConsumer<Object> requestConsumer) {
@@ -323,6 +348,36 @@ public class HttpAsyncServiceHandler imp
}
}
+ private void shutdownHttpExchange(final HttpExchangeState state, final Exception ex) {
+ HttpAsyncRequestConsumer<Object> consumer = state.getRequestConsumer();
+ if (consumer != null) {
+ state.setRequestConsumer(null);
+ try {
+ consumer.failed(ex);
+ } finally {
+ try {
+ consumer.close();
+ } catch (IOException ioex) {
+ onException(ioex);
+ }
+ }
+ }
+ HttpAsyncResponseProducer producer = state.getResponseProducer();
+ if (producer != null) {
+ state.setResponseProducer(null);
+ try {
+ producer.failed(ex);
+ } finally {
+ try {
+ producer.close();
+ } catch (IOException ioex) {
+ onException(ioex);
+ }
+ }
+ }
+ state.setRequestHandler(null);
+ }
+
protected HttpAsyncResponseProducer handleException(final Exception ex) {
int code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
if (ex instanceof MethodNotSupportedException) {
@@ -360,44 +415,74 @@ public class HttpAsyncServiceHandler imp
&& status != HttpStatus.SC_RESET_CONTENT;
}
+ private void handleFailure(
+ final NHttpServerConnection conn,
+ final HttpExchangeState state,
+ final Exception ex) {
+ shutdownConnection(conn);
+ shutdownHttpExchange(state, ex);
+ }
+
+ private void handleProtocolFailure(
+ final NHttpServerConnection conn,
+ final HttpExchangeState state,
+ final HttpException httpex) {
+ shutdownHttpExchange(state, httpex);
+ if (conn.isResponseSubmitted() || state.getResponseState() != MessageState.READY) {
+ // There is not much that we can do if a response
+ // has already been submitted
+ closeConnection(conn);
+ } else {
+ HttpAsyncResponseProducer responseProducer = handleException(httpex);
+ state.setResponseProducer(responseProducer);
+ try {
+ HttpResponse response = responseProducer.generateResponse();
+ state.setResponse(response);
+ commitFinalResponse(conn, state);
+ } catch (RuntimeException ex) {
+ handleFailure(conn, state, ex);
+ throw ex;
+ } catch (Exception ex) {
+ handleFailure(conn, state, ex);
+ }
+ }
+ }
+
private void processRequest(
final NHttpServerConnection conn,
- final HttpExchange httpExchange) throws HttpException, IOException {
- HttpAsyncRequestHandler<Object> handler = httpExchange.getRequestHandler();
- HttpContext context = httpExchange.getContext();
- HttpAsyncRequestConsumer<?> consumer = httpExchange.getRequestConsumer();
+ final HttpExchangeState state) throws HttpException, IOException {
+ HttpAsyncRequestHandler<Object> handler = state.getRequestHandler();
+ HttpContext context = state.getContext();
+ HttpAsyncRequestConsumer<?> consumer = state.getRequestConsumer();
consumer.requestCompleted(context);
- httpExchange.setRequestState(MessageState.COMPLETED);
+ state.setRequestState(MessageState.COMPLETED);
Exception exception = consumer.getException();
if (exception != null) {
HttpAsyncResponseProducer responseProducer = handleException(exception);
- httpExchange.setResponseProducer(responseProducer);
+ state.setResponseProducer(responseProducer);
conn.requestOutput();
} else {
Object result = consumer.getResult();
- HttpAsyncResponseTrigger trigger = new ResponseTriggerImpl(httpExchange, conn);
+ HttpAsyncResponseTrigger trigger = new ResponseTriggerImpl(state, conn);
try {
Cancellable asyncProcess = handler.handle(result, trigger, context);
- httpExchange.setAsyncProcess(asyncProcess);
+ state.setAsyncProcess(asyncProcess);
} catch (HttpException ex) {
HttpAsyncResponseProducer responseProducer = handleException(ex);
- httpExchange.setResponseProducer(responseProducer);
+ state.setResponseProducer(responseProducer);
conn.requestOutput();
}
}
}
- private void commitResponse(
+ private void commitFinalResponse(
final NHttpServerConnection conn,
- final HttpExchange httpExchange) throws IOException, HttpException {
- HttpContext context = httpExchange.getContext();
- HttpRequest request = httpExchange.getRequest();
- HttpAsyncResponseProducer responseProducer = httpExchange.getResponseProducer();
- HttpResponse response = responseProducer.generateResponse();
- response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
-
- httpExchange.setResponse(response);
+ final HttpExchangeState state) throws IOException, HttpException {
+ HttpContext context = state.getContext();
+ HttpRequest request = state.getRequest();
+ HttpResponse response = state.getResponse();
+ response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
this.httpProcessor.process(response, context);
@@ -410,6 +495,7 @@ public class HttpAsyncServiceHandler imp
conn.submitResponse(response);
if (entity == null) {
+ HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
responseProducer.responseCompleted(context);
if (!this.connStrategy.keepAlive(response, context)) {
conn.close();
@@ -417,9 +503,9 @@ public class HttpAsyncServiceHandler imp
// Ready to process new request
conn.requestInput();
}
- httpExchange.clear();
+ state.clear();
} else {
- httpExchange.setResponseState(MessageState.BODY_STREAM);
+ state.setResponseState(MessageState.BODY_STREAM);
}
}
@@ -436,7 +522,7 @@ public class HttpAsyncServiceHandler imp
return handler;
}
- class HttpExchange {
+ class HttpExchangeState {
private final BasicHttpContext context;
private volatile HttpAsyncRequestHandler<Object> requestHandler;
@@ -448,7 +534,7 @@ public class HttpAsyncServiceHandler imp
private volatile HttpResponse response;
private volatile Cancellable asyncProcess;
- HttpExchange() {
+ HttpExchangeState() {
super();
this.context = new BasicHttpContext();
this.requestState = MessageState.READY;
@@ -464,9 +550,6 @@ public class HttpAsyncServiceHandler imp
}
public void setRequestHandler(final HttpAsyncRequestHandler<Object> requestHandler) {
- if (this.requestHandler != null) {
- throw new IllegalStateException("Request handler already set");
- }
this.requestHandler = requestHandler;
}
@@ -486,15 +569,11 @@ public class HttpAsyncServiceHandler imp
this.responseState = state;
}
-
public HttpAsyncRequestConsumer<Object> getRequestConsumer() {
return this.requestConsumer;
}
public void setRequestConsumer(final HttpAsyncRequestConsumer<Object> requestConsumer) {
- if (this.requestConsumer != null) {
- throw new IllegalStateException("Request consumer already set");
- }
this.requestConsumer = requestConsumer;
}
@@ -503,9 +582,6 @@ public class HttpAsyncServiceHandler imp
}
public void setResponseProducer(final HttpAsyncResponseProducer responseProducer) {
- if (this.responseProducer != null) {
- throw new IllegalStateException("Response producer already set");
- }
this.responseProducer = responseProducer;
}
@@ -514,9 +590,6 @@ public class HttpAsyncServiceHandler imp
}
public void setRequest(final HttpRequest request) {
- if (this.request != null) {
- throw new IllegalStateException("Request already set");
- }
this.request = request;
}
@@ -525,9 +598,6 @@ public class HttpAsyncServiceHandler imp
}
public void setResponse(final HttpResponse response) {
- if (this.response != null) {
- throw new IllegalStateException("Response already set");
- }
this.response = response;
}
@@ -588,14 +658,14 @@ public class HttpAsyncServiceHandler imp
class ResponseTriggerImpl implements HttpAsyncResponseTrigger {
- private final HttpExchange httpExchange;
+ private final HttpExchangeState state;
private final IOControl iocontrol;
private volatile boolean triggered;
- public ResponseTriggerImpl(final HttpExchange httpExchange, final IOControl iocontrol) {
+ public ResponseTriggerImpl(final HttpExchangeState state, final IOControl iocontrol) {
super();
- this.httpExchange = httpExchange;
+ this.state = state;
this.iocontrol = iocontrol;
}
@@ -607,7 +677,7 @@ public class HttpAsyncServiceHandler imp
throw new IllegalStateException("Response already triggered");
}
this.triggered = true;
- this.httpExchange.setResponseProducer(responseProducer);
+ this.state.setResponseProducer(responseProducer);
this.iocontrol.requestOutput();
}
@@ -619,14 +689,14 @@ public class HttpAsyncServiceHandler imp
class ContinueTriggerImpl implements HttpAsyncContinueTrigger {
- private final HttpExchange httpExchange;
+ private final HttpExchangeState state;
private final IOControl iocontrol;
private volatile boolean triggered;
- public ContinueTriggerImpl(final HttpExchange httpExchange, final IOControl iocontrol) {
+ public ContinueTriggerImpl(final HttpExchangeState state, final IOControl iocontrol) {
super();
- this.httpExchange = httpExchange;
+ this.state = state;
this.iocontrol = iocontrol;
}
@@ -635,7 +705,8 @@ public class HttpAsyncServiceHandler imp
throw new IllegalStateException("Response already triggered");
}
this.triggered = true;
- this.httpExchange.setRequestState(MessageState.ACK);
+ HttpResponse ack = new BasicHttpResponse(HttpVersion.HTTP_1_1, HttpStatus.SC_CONTINUE, "Continue");
+ this.state.setResponseProducer(new BasicAsyncResponseProducer(ack));
this.iocontrol.requestOutput();
}
@@ -647,7 +718,7 @@ public class HttpAsyncServiceHandler imp
throw new IllegalStateException("Response already triggered");
}
this.triggered = true;
- this.httpExchange.setResponseProducer(responseProducer);
+ this.state.setResponseProducer(responseProducer);
this.iocontrol.requestOutput();
}