You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/11/29 15:11:51 UTC

svn commit: r1207880 [1/3] - in /httpcomponents/httpcore/trunk/httpcore-nio/src: main/java/org/apache/http/nio/protocol/ test/java/org/apache/http/nio/protocol/

Author: olegk
Date: Tue Nov 29 14:11:49 2011
New Revision: 1207880

URL: http://svn.apache.org/viewvc?rev=1207880&view=rev
Log:
Refactored expectation handling code in HttpAsyncServiceHandler; refactored exception handling code in HttpAsyncServiceHandler and HttpAsyncClientProtocolHandler

Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncClientProtocolHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/test/java/org/apache/http/nio/protocol/TestHttpAsyncServiceHandler.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/AbstractAsyncRequestConsumer.java Tue Nov 29 14:11:49 2011
@@ -130,6 +130,15 @@ public abstract class AbstractAsyncReque
         }
     }
 
+    public final synchronized void failed(final Exception ex) {
+        if (this.completed) {
+            return;
+        }
+        this.completed = true;
+        this.ex = ex;
+        releaseResources();
+    }
+
     public final synchronized void close() throws IOException {
         if (this.completed) {
             return;

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncRequestProducer.java Tue Nov 29 14:11:49 2011
@@ -114,12 +114,17 @@ public class BasicAsyncRequestProducer i
     public void requestCompleted(final HttpContext context) {
     }
 
+    public void failed(final Exception ex) {
+    }
+
     public synchronized boolean isRepeatable() {
         return this.producer == null || this.producer.isRepeatable();
     }
 
     public synchronized void resetRequest() throws IOException {
-        this.producer.close();
+        if (this.producer != null) {
+            this.producer.close();
+        }
     }
 
     public synchronized void close() throws IOException {

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/BasicAsyncResponseProducer.java Tue Nov 29 14:11:49 2011
@@ -96,6 +96,9 @@ public class BasicAsyncResponseProducer 
     public void responseCompleted(final HttpContext context) {
     }
 
+    public void failed(final Exception ex) {
+    }
+
     public synchronized void close() throws IOException {
         if (this.producer != null) {
             this.producer.close();

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/ErrorResponseProducer.java Tue Nov 29 14:11:49 2011
@@ -87,6 +87,9 @@ class ErrorResponseProducer implements H
     public void responseCompleted(final HttpContext context) {
     }
 
+    public void failed(final Exception ex) {
+    }
+
     public void close() throws IOException {
         this.contentProducer.close();
     }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java Tue Nov 29 14:11:49 2011
@@ -59,51 +59,53 @@ public class HttpAsyncClientProtocolHand
     }
 
     public void connected(final NHttpClientConnection conn, final Object attachment) {
-        HttpExchange httpexchange = new HttpExchange();
+        HttpExchangeState state = new HttpExchangeState();
         HttpContext context = conn.getContext();
-        context.setAttribute(HTTP_EXCHANGE, httpexchange);
+        context.setAttribute(HTTP_EXCHANGE, state);
         requestReady(conn);
     }
 
     public void closed(final NHttpClientConnection conn) {
-        HttpExchange httpexchange = getHttpExchange(conn);
-        if (httpexchange != null) {
-            httpexchange.clear();
+        HttpExchangeState state = getHttpExchange(conn);
+        if (state != null) {
+            state.clear();
         }
     }
 
     public void exception(final NHttpClientConnection conn, final HttpException ex) {
-        HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
-        HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
-        if (handler != null) {
-            handler.failed(ex);
+        HttpExchangeState state = getHttpExchange(conn);
+        if (state != null) {
+            handleProtocolFailure(conn, state, ex);
+        } else {
+            shutdownConnection(conn);
+            onException(ex);
         }
-        closeConnection(conn);
     }
 
     public void exception(final NHttpClientConnection conn, final IOException ex) {
-        HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
-        HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
-        if (handler != null) {
-            handler.failed(ex);
+        HttpExchangeState state = getHttpExchange(conn);
+        if (state != null) {
+            handleFailure(conn, state, ex);
+        } else {
+            shutdownConnection(conn);
+            onException(ex);
         }
-        shutdownConnection(conn);
     }
 
     public void requestReady(final NHttpClientConnection conn) {
-        HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
-        if (httpexchange.getRequestState() != MessageState.READY) {
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+        if (state.getRequestState() != MessageState.READY) {
             return;
         }
-        HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
+        HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
         if (handler != null && handler.isDone()) {
-            httpexchange.clear();
+            state.clear();
             handler = null;
         }
         if (handler == null) {
             handler = (HttpAsyncClientExchangeHandler<?>) conn.getContext().removeAttribute(
                     HTTP_HANDLER);
-            httpexchange.setHandler(handler);
+            state.setHandler(handler);
         }
         if (handler == null) {
             return;
@@ -111,68 +113,64 @@ public class HttpAsyncClientProtocolHand
         try {
             HttpContext context = handler.getContext();
             HttpRequest request = handler.generateRequest();
-            httpexchange.setRequest(request);
+            state.setRequest(request);
 
             conn.submitRequest(request);
 
             if (request instanceof HttpEntityEnclosingRequest) {
                 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
                     int timeout = conn.getSocketTimeout();
-                    httpexchange.setTimeout(timeout);
+                    state.setTimeout(timeout);
                     timeout = request.getParams().getIntParameter(
                             CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
                     conn.setSocketTimeout(timeout);
-                    httpexchange.setRequestState(MessageState.ACK_EXPECTED);
+                    state.setRequestState(MessageState.ACK_EXPECTED);
                 } else {
-                    httpexchange.setRequestState(MessageState.BODY_STREAM);
+                    state.setRequestState(MessageState.BODY_STREAM);
                 }
             } else {
                 handler.requestCompleted(context);
-                httpexchange.setRequestState(MessageState.COMPLETED);
+                state.setRequestState(MessageState.COMPLETED);
             }
+        } catch (HttpException ex) {
+            handleProtocolFailure(conn, state, ex);
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
-            onException(ex);
         }
     }
 
     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
-        HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
-        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(httpexchange.getHandler());
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
         try {
-            if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
+            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
                 conn.suspendOutput();
                 return;
             }
             HttpContext context = handler.getContext();
             handler.produceContent(encoder, conn);
-            httpexchange.setRequestState(MessageState.BODY_STREAM);
+            state.setRequestState(MessageState.BODY_STREAM);
             if (encoder.isCompleted()) {
                 handler.requestCompleted(context);
-                httpexchange.setRequestState(MessageState.COMPLETED);
+                state.setRequestState(MessageState.COMPLETED);
             }
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
-            onException(ex);
         }
     }
 
     public void responseReceived(final NHttpClientConnection conn) {
-        HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
-        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(httpexchange.getHandler());
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
         try {
             HttpResponse response = conn.getHttpResponse();
-            HttpRequest request = httpexchange.getRequest();
+            HttpRequest request = state.getRequest();
 
             int statusCode = response.getStatusLine().getStatusCode();
             if (statusCode < HttpStatus.SC_OK) {
@@ -181,94 +179,86 @@ public class HttpAsyncClientProtocolHand
                     throw new ProtocolException(
                             "Unexpected response: " + response.getStatusLine());
                 }
-                if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
-                    int timeout = httpexchange.getTimeout();
+                if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                    int timeout = state.getTimeout();
                     conn.setSocketTimeout(timeout);
                     conn.requestOutput();
-                    httpexchange.setRequestState(MessageState.ACK);
+                    state.setRequestState(MessageState.ACK);
                 }
                 return;
             }
-            httpexchange.setResponse(response);
-            if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
-                int timeout = httpexchange.getTimeout();
+            state.setResponse(response);
+            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                int timeout = state.getTimeout();
                 conn.setSocketTimeout(timeout);
                 conn.resetOutput();
-                httpexchange.setRequestState(MessageState.COMPLETED);
-            } else if (httpexchange.getRequestState() == MessageState.BODY_STREAM) {
+                state.setRequestState(MessageState.COMPLETED);
+            } else if (state.getRequestState() == MessageState.BODY_STREAM) {
                 // Early response
                 conn.resetOutput();
                 conn.suspendOutput();
-                httpexchange.setRequestState(MessageState.COMPLETED);
-                httpexchange.invalidate();
+                state.setRequestState(MessageState.COMPLETED);
+                state.invalidate();
             }
             handler.responseReceived(response);
-            httpexchange.setResponseState(MessageState.BODY_STREAM);
+            state.setResponseState(MessageState.BODY_STREAM);
             if (!canResponseHaveBody(request, response)) {
                 conn.resetInput();
-                processResponse(conn, httpexchange, handler);
+                processResponse(conn, state, handler);
             }
+        } catch (HttpException ex) {
+            handleProtocolFailure(conn, state, ex);
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
-            onException(ex);
         }
     }
 
     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
-        HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
-        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(httpexchange.getHandler());
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
         try {
             handler.consumeContent(decoder, conn);
-            httpexchange.setResponseState(MessageState.BODY_STREAM);
+            state.setResponseState(MessageState.BODY_STREAM);
             if (decoder.isCompleted()) {
-                processResponse(conn, httpexchange, handler);
+                processResponse(conn, state, handler);
             }
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
-            onException(ex);
         }
     }
 
     public void timeout(final NHttpClientConnection conn) {
-        HttpExchange httpexchange = ensureNotNull(getHttpExchange(conn));
-        HttpAsyncClientExchangeHandler<?> handler = httpexchange.getHandler();
-        if (handler == null) {
-            shutdownConnection(conn);
-            return;
-        }
-        try {
-            if (httpexchange.getRequestState() == MessageState.ACK_EXPECTED) {
-                int timeout = httpexchange.getTimeout();
+        HttpExchangeState state = getHttpExchange(conn);
+        if (state != null) {
+            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                int timeout = state.getTimeout();
                 conn.setSocketTimeout(timeout);
                 conn.requestOutput();
-                httpexchange.setRequestState(MessageState.BODY_STREAM);
+                state.setRequestState(MessageState.BODY_STREAM);
+                return;
             } else {
-                handler.failed(new SocketTimeoutException());
-                if (conn.getStatus() == NHttpConnection.ACTIVE) {
-                    closeConnection(conn);
-                    if (conn.getStatus() == NHttpConnection.CLOSING) {
-                        // Give the connection some grace time to
-                        // close itself nicely
-                        conn.setSocketTimeout(250);
-                    }
-                } else {
-                    shutdownConnection(conn);
+                shutdownHttpExchange(state, new SocketTimeoutException());
+            }
+        }
+        try {
+            if (conn.getStatus() == NHttpConnection.ACTIVE) {
+                conn.close();
+                if (conn.getStatus() == NHttpConnection.CLOSING) {
+                    // Give the connection some grace time to
+                    // close itself nicely
+                    conn.setSocketTimeout(250);
                 }
+            } else {
+                conn.shutdown();
             }
-        } catch (RuntimeException ex) {
-            shutdownConnection(conn);
-            handler.failed(ex);
-            throw ex;
+        } catch (IOException ex) {
+            onException(ex);
         }
     }
 
@@ -291,11 +281,11 @@ public class HttpAsyncClientProtocolHand
         }
     }
 
-    private HttpExchange getHttpExchange(final NHttpConnection conn) {
-        return (HttpExchange) conn.getContext().getAttribute(HTTP_EXCHANGE);
+    private HttpExchangeState getHttpExchange(final NHttpConnection conn) {
+        return (HttpExchangeState) conn.getContext().getAttribute(HTTP_EXCHANGE);
     }
 
-    private HttpExchange ensureNotNull(final HttpExchange httpExchange) {
+    private HttpExchangeState ensureNotNull(final HttpExchangeState httpExchange) {
         if (httpExchange == null) {
             throw new IllegalStateException("HTTP exchange is null");
         }
@@ -309,14 +299,48 @@ public class HttpAsyncClientProtocolHand
         return handler;
     }
 
+    private void shutdownHttpExchange(
+            final HttpExchangeState state,
+            final Exception ex) {
+        HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
+        if (handler != null) {
+            state.setHandler(null);
+            try {
+                handler.failed(ex);
+            } finally {
+                try {
+                    handler.close();
+                } catch (IOException ioex) {
+                    onException(ioex);
+                }
+            }
+        }
+    }
+
+    private void handleFailure(
+            final NHttpClientConnection conn,
+            final HttpExchangeState state,
+            final Exception ex) {
+        shutdownConnection(conn);
+        shutdownHttpExchange(state, ex);
+    }
+
+    private void handleProtocolFailure(
+            final NHttpClientConnection conn,
+            final HttpExchangeState state,
+            final HttpException ex) {
+        closeConnection(conn);
+        shutdownHttpExchange(state, ex);
+    }
+
     private void processResponse(
             final NHttpClientConnection conn,
-            final HttpExchange httpexchange,
+            final HttpExchangeState state,
             final HttpAsyncClientExchangeHandler<?> handler) throws IOException {
         HttpContext context = handler.getContext();
-        if (httpexchange.isValid()) {
-            HttpRequest request = httpexchange.getRequest();
-            HttpResponse response = httpexchange.getResponse();
+        if (state.isValid()) {
+            HttpRequest request = state.getRequest();
+            HttpResponse response = state.getResponse();
             String method = request.getRequestLine().getMethod();
             int status = response.getStatusLine().getStatusCode();
             if (!(method.equalsIgnoreCase("CONNECT") && status < 300)) {
@@ -329,7 +353,7 @@ public class HttpAsyncClientProtocolHand
             conn.close();
         }
         handler.responseCompleted(context);
-        httpexchange.reset();
+        state.reset();
     }
 
     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
@@ -351,7 +375,7 @@ public class HttpAsyncClientProtocolHand
 
     static final String HTTP_EXCHANGE = "http.nio.exchange";
 
-    class HttpExchange {
+    class HttpExchangeState {
 
         private volatile HttpAsyncClientExchangeHandler<?> handler;
         private volatile MessageState requestState;
@@ -361,7 +385,7 @@ public class HttpAsyncClientProtocolHand
         private volatile boolean valid;
         private volatile int timeout;
 
-        HttpExchange() {
+        HttpExchangeState() {
             super();
             this.valid = true;
             this.requestState = MessageState.READY;
@@ -373,9 +397,6 @@ public class HttpAsyncClientProtocolHand
         }
 
         public void setHandler(final HttpAsyncClientExchangeHandler<?> handler) {
-            if (this.handler != null) {
-                throw new IllegalStateException("Handler already set");
-            }
             this.handler = handler;
         }
 
@@ -400,9 +421,6 @@ public class HttpAsyncClientProtocolHand
         }
 
         public void setRequest(final HttpRequest request) {
-            if (this.request != null) {
-                throw new IllegalStateException("Request already set");
-            }
             this.request = request;
         }
 
@@ -411,9 +429,6 @@ public class HttpAsyncClientProtocolHand
         }
 
         public void setResponse(final HttpResponse response) {
-            if (this.response != null) {
-                throw new IllegalStateException("Response already set");
-            }
             this.response = response;
         }
 

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestConsumer.java Tue Nov 29 14:11:49 2011
@@ -80,6 +80,13 @@ public interface HttpAsyncRequestConsume
     void requestCompleted(HttpContext context);
 
     /**
+     * Invoked to signal that the response processing terminated abnormally.
+     *
+     * @param ex exception
+     */
+    void failed(Exception ex);
+
+    /**
      * Returns an exception in case of an abnormal termination. This method
      * returns <code>null</code> if the request execution is still ongoing
      * or if it completed successfully.

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncRequestProducer.java Tue Nov 29 14:11:49 2011
@@ -92,6 +92,13 @@ public interface HttpAsyncRequestProduce
     void requestCompleted(HttpContext context);
 
     /**
+     * Invoked to signal that the response processing terminated abnormally.
+     *
+     * @param ex exception
+     */
+    void failed(Exception ex);
+
+    /**
      * Determines whether or not this producer is capable of producing
      * HTTP request messages more than once.
      */

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncResponseProducer.java Tue Nov 29 14:11:49 2011
@@ -75,4 +75,11 @@ public interface HttpAsyncResponseProduc
      */
     void responseCompleted(HttpContext context);
 
+    /**
+     * Invoked to signal that the response processing terminated abnormally.
+     *
+     * @param ex exception
+     */
+    void failed(Exception ex);
+
 }

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1207880&r1=1207879&r2=1207880&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Tue Nov 29 14:11:49 2011
@@ -28,6 +28,7 @@
 package org.apache.http.nio.protocol;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 
 import org.apache.http.ConnectionReuseStrategy;
 import org.apache.http.HttpEntity;
@@ -107,150 +108,171 @@ public class HttpAsyncServiceHandler imp
     }
 
     public void connected(final NHttpServerConnection conn) {
-        HttpExchange httpExchange = new HttpExchange();
-        conn.getContext().setAttribute(HTTP_EXCHANGE, httpExchange);
+        HttpExchangeState state = new HttpExchangeState();
+        conn.getContext().setAttribute(HTTP_EXCHANGE, state);
     }
 
     public void closed(final NHttpServerConnection conn) {
-        HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
-        Cancellable asyncProcess = httpExchange.getAsyncProcess();
-        httpExchange.clear();
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+        Cancellable asyncProcess = state.getAsyncProcess();
+        state.clear();
         if (asyncProcess != null) {
             asyncProcess.cancel();
         }
     }
 
     public void exception(final NHttpServerConnection conn, final HttpException httpex) {
-        if (conn.isResponseSubmitted()) {
-            // There is not much that we can do if a response head
-            // has already been submitted
-            closeConnection(conn);
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
+        if (state != null) {
+            handleProtocolFailure(conn, state, httpex);
+        } else {
+            shutdownConnection(conn);
             onException(httpex);
-            return;
         }
+    }
 
-        HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
-        try {
-            HttpAsyncResponseProducer responseProducer = handleException(httpex);
-            httpExchange.setResponseProducer(responseProducer);
-            commitResponse(conn, httpExchange);
-        } catch (RuntimeException ex) {
-            shutdownConnection(conn);
-            throw ex;
-        } catch (Exception ex) {
+    public void exception(final NHttpServerConnection conn, final IOException ex) {
+        HttpExchangeState state = getHttpExchange(conn);
+        if (state != null) {
+            handleFailure(conn, state, ex);
+        } else {
             shutdownConnection(conn);
             onException(ex);
         }
     }
 
-    public void exception(final NHttpServerConnection conn, final IOException ex) {
-        shutdownConnection(conn);
-        onException(ex);
-    }
-
     public void requestReceived(final NHttpServerConnection conn) {
-        HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
         try {
             HttpRequest request = conn.getHttpRequest();
-            HttpContext context = httpExchange.getContext();
+            HttpContext context = state.getContext();
             request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
 
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
             context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
             this.httpProcessor.process(request, context);
 
-            httpExchange.setRequest(request);
+            state.setRequest(request);
             HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
-            httpExchange.setRequestHandler(requestHandler);
+            state.setRequestHandler(requestHandler);
             HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
-            httpExchange.setRequestConsumer(consumer);
+            state.setRequestConsumer(consumer);
 
             consumer.requestReceived(request);
 
             if (request instanceof HttpEntityEnclosingRequest) {
                 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
-                    httpExchange.setRequestState(MessageState.ACK_EXPECTED);
+                    state.setRequestState(MessageState.ACK_EXPECTED);
                     if (this.expectationVerifier != null) {
                         conn.suspendInput();
-                        HttpAsyncContinueTrigger trigger = new ContinueTriggerImpl(httpExchange, conn);
+                        HttpAsyncContinueTrigger trigger = new ContinueTriggerImpl(state, conn);
                         Cancellable asyncProcess = this.expectationVerifier.verify(request, trigger, context);
-                        httpExchange.setAsyncProcess(asyncProcess);
+                        state.setAsyncProcess(asyncProcess);
                     } else {
                         HttpResponse response = create100Continue(request);
                         conn.submitResponse(response);
-                        httpExchange.setRequestState(MessageState.BODY_STREAM);
+                        state.setRequestState(MessageState.BODY_STREAM);
                     }
                 } else {
-                    httpExchange.setRequestState(MessageState.BODY_STREAM);
+                    state.setRequestState(MessageState.BODY_STREAM);
                 }
             } else {
                 // No request content is expected.
                 // Process request right away
                 conn.suspendInput();
-                processRequest(conn, httpExchange);
+                processRequest(conn, state);
             }
+        } catch (HttpException ex) {
+            handleProtocolFailure(conn, state, ex);
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            onException(ex);
         }
     }
 
     public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
-        HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
         try {
-            HttpAsyncRequestConsumer<?> consumer = ensureNotNull(httpExchange.getRequestConsumer());
+            HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
             consumer.consumeContent(decoder, conn);
-            httpExchange.setRequestState(MessageState.BODY_STREAM);
+            state.setRequestState(MessageState.BODY_STREAM);
             if (decoder.isCompleted()) {
                 conn.suspendInput();
-                processRequest(conn, httpExchange);
+                processRequest(conn, state);
             }
+        } catch (HttpException ex) {
+            handleProtocolFailure(conn, state, ex);
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            onException(ex);
         }
     }
 
     public void responseReady(final NHttpServerConnection conn) {
-        HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
         try {
-            if (httpExchange.getRequestState() == MessageState.ACK) {
-                conn.requestInput();
-                httpExchange.setRequestState(MessageState.BODY_STREAM);
-                HttpRequest request = httpExchange.getRequest();
-                HttpResponse response = create100Continue(request);
-                conn.submitResponse(response);
-            } else if (httpExchange.getResponse() == null && httpExchange.getResponseProducer() != null) {
-                if (httpExchange.getRequestState() == MessageState.ACK_EXPECTED) {
+            if (state.getResponse() != null) {
+                return;
+            }
+            HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+            if (responseProducer == null) {
+                return;
+            }
+            HttpContext context = state.getContext();
+            HttpResponse response = responseProducer.generateResponse();
+            int status = response.getStatusLine().getStatusCode();
+            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                if (status == 100) {
+                    state.setResponseProducer(null);
+                    try {
+                        // Make sure 100 response has no entity
+                        response.setEntity(null);
+                        conn.requestInput();
+                        state.setRequestState(MessageState.BODY_STREAM);
+                        conn.submitResponse(response);
+                        responseProducer.responseCompleted(context);
+                    } finally {
+                        responseProducer.close();
+                    }
+                } else if (status >= 400) {
                     conn.resetInput();
-                    httpExchange.setRequestState(MessageState.COMPLETED);
+                    state.setRequestState(MessageState.COMPLETED);
+                    state.setResponse(response);
+                    commitFinalResponse(conn, state);
+                } else {
+                    throw new HttpException("Invalid response: " + response.getStatusLine());
+                }
+            } else {
+                if (status >= 200) {
+                    state.setResponse(response);
+                    commitFinalResponse(conn, state);
+                } else {
+                    throw new HttpException("Invalid response: " + response.getStatusLine());
                 }
-                commitResponse(conn, httpExchange);
             }
+        } catch (HttpException ex) {
+            handleProtocolFailure(conn, state, ex);
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            onException(ex);
         }
     }
 
     public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
-        HttpExchange httpExchange = ensureNotNull(getHttpExchange(conn));
+        HttpExchangeState state = ensureNotNull(getHttpExchange(conn));
         try {
-            HttpAsyncResponseProducer responseProducer = httpExchange.getResponseProducer();
-            HttpContext context = httpExchange.getContext();
-            HttpResponse response = httpExchange.getResponse();
+            HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+            HttpContext context = state.getContext();
+            HttpResponse response = state.getResponse();
 
             responseProducer.produceContent(encoder, conn);
-            httpExchange.setResponseState(MessageState.BODY_STREAM);
+            state.setResponseState(MessageState.BODY_STREAM);
             if (encoder.isCompleted()) {
                 responseProducer.responseCompleted(context);
                 if (!this.connStrategy.keepAlive(response, context)) {
@@ -258,18 +280,21 @@ public class HttpAsyncServiceHandler imp
                 } else {
                     conn.requestInput();
                 }
-                httpExchange.clear();
+                state.clear();
             }
+        } catch (IOException ex) {
+            handleFailure(conn, state, ex);
         } catch (RuntimeException ex) {
-            shutdownConnection(conn);
+            handleFailure(conn, state, ex);
             throw ex;
-        } catch (Exception ex) {
-            shutdownConnection(conn);
-            onException(ex);
         }
     }
 
     public void timeout(final NHttpServerConnection conn) {
+        HttpExchangeState state = getHttpExchange(conn);
+        if (state != null) {
+            shutdownHttpExchange(state, new SocketTimeoutException());
+        }
         try {
             if (conn.getStatus() == NHttpConnection.ACTIVE) {
                 conn.close();
@@ -286,15 +311,15 @@ public class HttpAsyncServiceHandler imp
         }
     }
 
-    private HttpExchange getHttpExchange(final NHttpConnection conn) {
-        return (HttpExchange) conn.getContext().getAttribute(HTTP_EXCHANGE);
+    private HttpExchangeState getHttpExchange(final NHttpConnection conn) {
+        return (HttpExchangeState) conn.getContext().getAttribute(HTTP_EXCHANGE);
     }
 
-    private HttpExchange ensureNotNull(final HttpExchange httpExchange) {
-        if (httpExchange == null) {
+    private HttpExchangeState ensureNotNull(final HttpExchangeState state) {
+        if (state == null) {
             throw new IllegalStateException("HTTP exchange is null");
         }
-        return httpExchange;
+        return state;
     }
 
     private HttpAsyncRequestConsumer<Object> ensureNotNull(final HttpAsyncRequestConsumer<Object> requestConsumer) {
@@ -323,6 +348,36 @@ public class HttpAsyncServiceHandler imp
         }
     }
 
+    private void shutdownHttpExchange(final HttpExchangeState state, final Exception ex) {
+        HttpAsyncRequestConsumer<Object> consumer = state.getRequestConsumer();
+        if (consumer != null) {
+            state.setRequestConsumer(null);
+            try {
+                consumer.failed(ex);
+            } finally {
+                try {
+                    consumer.close();
+                } catch (IOException ioex) {
+                    onException(ioex);
+                }
+            }
+        }
+        HttpAsyncResponseProducer producer = state.getResponseProducer();
+        if (producer != null) {
+            state.setResponseProducer(null);
+            try {
+                producer.failed(ex);
+            } finally {
+                try {
+                    producer.close();
+                } catch (IOException ioex) {
+                    onException(ioex);
+                }
+            }
+        }
+        state.setRequestHandler(null);
+    }
+
     protected HttpAsyncResponseProducer handleException(final Exception ex) {
         int code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
         if (ex instanceof MethodNotSupportedException) {
@@ -360,44 +415,74 @@ public class HttpAsyncServiceHandler imp
             && status != HttpStatus.SC_RESET_CONTENT;
     }
 
+    private void handleFailure(
+            final NHttpServerConnection conn,
+            final HttpExchangeState state,
+            final Exception ex) {
+        shutdownConnection(conn);
+        shutdownHttpExchange(state, ex);
+    }
+
+    private void handleProtocolFailure(
+            final NHttpServerConnection conn,
+            final HttpExchangeState state,
+            final HttpException httpex) {
+        shutdownHttpExchange(state, httpex);
+        if (conn.isResponseSubmitted() || state.getResponseState() != MessageState.READY) {
+            // There is not much that we can do if a response
+            // has already been submitted
+            closeConnection(conn);
+        } else {
+            HttpAsyncResponseProducer responseProducer = handleException(httpex);
+            state.setResponseProducer(responseProducer);
+            try {
+                HttpResponse response = responseProducer.generateResponse();
+                state.setResponse(response);
+                commitFinalResponse(conn, state);
+            } catch (RuntimeException ex) {
+                handleFailure(conn, state, ex);
+                throw ex;
+            } catch (Exception ex) {
+                handleFailure(conn, state, ex);
+            }
+        }
+    }
+
     private void processRequest(
             final NHttpServerConnection conn,
-            final HttpExchange httpExchange) throws HttpException, IOException {
-        HttpAsyncRequestHandler<Object> handler = httpExchange.getRequestHandler();
-        HttpContext context = httpExchange.getContext();
-        HttpAsyncRequestConsumer<?> consumer = httpExchange.getRequestConsumer();
+            final HttpExchangeState state) throws HttpException, IOException {
+        HttpAsyncRequestHandler<Object> handler = state.getRequestHandler();
+        HttpContext context = state.getContext();
+        HttpAsyncRequestConsumer<?> consumer = state.getRequestConsumer();
         consumer.requestCompleted(context);
-        httpExchange.setRequestState(MessageState.COMPLETED);
+        state.setRequestState(MessageState.COMPLETED);
         Exception exception = consumer.getException();
         if (exception != null) {
             HttpAsyncResponseProducer responseProducer = handleException(exception);
-            httpExchange.setResponseProducer(responseProducer);
+            state.setResponseProducer(responseProducer);
             conn.requestOutput();
         } else {
             Object result = consumer.getResult();
-            HttpAsyncResponseTrigger trigger = new ResponseTriggerImpl(httpExchange, conn);
+            HttpAsyncResponseTrigger trigger = new ResponseTriggerImpl(state, conn);
             try {
                 Cancellable asyncProcess = handler.handle(result, trigger, context);
-                httpExchange.setAsyncProcess(asyncProcess);
+                state.setAsyncProcess(asyncProcess);
             } catch (HttpException ex) {
                 HttpAsyncResponseProducer responseProducer = handleException(ex);
-                httpExchange.setResponseProducer(responseProducer);
+                state.setResponseProducer(responseProducer);
                 conn.requestOutput();
             }
         }
     }
 
-    private void commitResponse(
+    private void commitFinalResponse(
             final NHttpServerConnection conn,
-            final HttpExchange httpExchange) throws IOException, HttpException {
-        HttpContext context = httpExchange.getContext();
-        HttpRequest request = httpExchange.getRequest();
-        HttpAsyncResponseProducer responseProducer = httpExchange.getResponseProducer();
-        HttpResponse response = responseProducer.generateResponse();
-        response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
-
-        httpExchange.setResponse(response);
+            final HttpExchangeState state) throws IOException, HttpException {
+        HttpContext context = state.getContext();
+        HttpRequest request = state.getRequest();
+        HttpResponse response = state.getResponse();
 
+        response.setParams(new DefaultedHttpParams(response.getParams(), this.params));
         context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
         this.httpProcessor.process(response, context);
 
@@ -410,6 +495,7 @@ public class HttpAsyncServiceHandler imp
         conn.submitResponse(response);
 
         if (entity == null) {
+            HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
             responseProducer.responseCompleted(context);
             if (!this.connStrategy.keepAlive(response, context)) {
                 conn.close();
@@ -417,9 +503,9 @@ public class HttpAsyncServiceHandler imp
                 // Ready to process new request
                 conn.requestInput();
             }
-            httpExchange.clear();
+            state.clear();
         } else {
-            httpExchange.setResponseState(MessageState.BODY_STREAM);
+            state.setResponseState(MessageState.BODY_STREAM);
         }
     }
 
@@ -436,7 +522,7 @@ public class HttpAsyncServiceHandler imp
         return handler;
     }
 
-    class HttpExchange {
+    class HttpExchangeState {
 
         private final BasicHttpContext context;
         private volatile HttpAsyncRequestHandler<Object> requestHandler;
@@ -448,7 +534,7 @@ public class HttpAsyncServiceHandler imp
         private volatile HttpResponse response;
         private volatile Cancellable asyncProcess;
 
-        HttpExchange() {
+        HttpExchangeState() {
             super();
             this.context = new BasicHttpContext();
             this.requestState = MessageState.READY;
@@ -464,9 +550,6 @@ public class HttpAsyncServiceHandler imp
         }
 
         public void setRequestHandler(final HttpAsyncRequestHandler<Object> requestHandler) {
-            if (this.requestHandler != null) {
-                throw new IllegalStateException("Request handler already set");
-            }
             this.requestHandler = requestHandler;
         }
 
@@ -486,15 +569,11 @@ public class HttpAsyncServiceHandler imp
             this.responseState = state;
         }
 
-
         public HttpAsyncRequestConsumer<Object> getRequestConsumer() {
             return this.requestConsumer;
         }
 
         public void setRequestConsumer(final HttpAsyncRequestConsumer<Object> requestConsumer) {
-            if (this.requestConsumer != null) {
-                throw new IllegalStateException("Request consumer already set");
-            }
             this.requestConsumer = requestConsumer;
         }
 
@@ -503,9 +582,6 @@ public class HttpAsyncServiceHandler imp
         }
 
         public void setResponseProducer(final HttpAsyncResponseProducer responseProducer) {
-            if (this.responseProducer != null) {
-                throw new IllegalStateException("Response producer already set");
-            }
             this.responseProducer = responseProducer;
         }
 
@@ -514,9 +590,6 @@ public class HttpAsyncServiceHandler imp
         }
 
         public void setRequest(final HttpRequest request) {
-            if (this.request != null) {
-                throw new IllegalStateException("Request already set");
-            }
             this.request = request;
         }
 
@@ -525,9 +598,6 @@ public class HttpAsyncServiceHandler imp
         }
 
         public void setResponse(final HttpResponse response) {
-            if (this.response != null) {
-                throw new IllegalStateException("Response already set");
-            }
             this.response = response;
         }
 
@@ -588,14 +658,14 @@ public class HttpAsyncServiceHandler imp
 
     class ResponseTriggerImpl implements HttpAsyncResponseTrigger {
 
-        private final HttpExchange httpExchange;
+        private final HttpExchangeState state;
         private final IOControl iocontrol;
 
         private volatile boolean triggered;
 
-        public ResponseTriggerImpl(final HttpExchange httpExchange, final IOControl iocontrol) {
+        public ResponseTriggerImpl(final HttpExchangeState state, final IOControl iocontrol) {
             super();
-            this.httpExchange = httpExchange;
+            this.state = state;
             this.iocontrol = iocontrol;
         }
 
@@ -607,7 +677,7 @@ public class HttpAsyncServiceHandler imp
                 throw new IllegalStateException("Response already triggered");
             }
             this.triggered = true;
-            this.httpExchange.setResponseProducer(responseProducer);
+            this.state.setResponseProducer(responseProducer);
             this.iocontrol.requestOutput();
         }
 
@@ -619,14 +689,14 @@ public class HttpAsyncServiceHandler imp
 
     class ContinueTriggerImpl implements HttpAsyncContinueTrigger {
 
-        private final HttpExchange httpExchange;
+        private final HttpExchangeState state;
         private final IOControl iocontrol;
 
         private volatile boolean triggered;
 
-        public ContinueTriggerImpl(final HttpExchange httpExchange, final IOControl iocontrol) {
+        public ContinueTriggerImpl(final HttpExchangeState state, final IOControl iocontrol) {
             super();
-            this.httpExchange = httpExchange;
+            this.state = state;
             this.iocontrol = iocontrol;
         }
 
@@ -635,7 +705,8 @@ public class HttpAsyncServiceHandler imp
                 throw new IllegalStateException("Response already triggered");
             }
             this.triggered = true;
-            this.httpExchange.setRequestState(MessageState.ACK);
+            HttpResponse ack = new BasicHttpResponse(HttpVersion.HTTP_1_1, HttpStatus.SC_CONTINUE, "Continue");
+            this.state.setResponseProducer(new BasicAsyncResponseProducer(ack));
             this.iocontrol.requestOutput();
         }
 
@@ -647,7 +718,7 @@ public class HttpAsyncServiceHandler imp
                 throw new IllegalStateException("Response already triggered");
             }
             this.triggered = true;
-            this.httpExchange.setResponseProducer(responseProducer);
+            this.state.setResponseProducer(responseProducer);
             this.iocontrol.requestOutput();
         }