You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2011/12/06 21:50:20 UTC

svn commit: r1211116 - in /httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol: HttpAsyncClientProtocolHandler.java HttpAsyncServiceHandler.java

Author: olegk
Date: Tue Dec  6 20:50:19 2011
New Revision: 1211116

URL: http://svn.apache.org/viewvc?rev=1211116&view=rev
Log:
Synchronize on exchange state object

Modified:
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
    httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java?rev=1211116&r1=1211115&r2=1211116&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncClientProtocolHandler.java Tue Dec  6 20:50:19 2011
@@ -70,8 +70,10 @@ public class HttpAsyncClientProtocolHand
     public void closed(final NHttpClientConnection conn) {
         State state = getState(conn);
         if (state != null) {
-            closeHandler(state);
-            state.reset();
+            synchronized (state) {
+                closeHandler(state);
+                state.reset();
+            }
         }
     }
 
@@ -80,8 +82,10 @@ public class HttpAsyncClientProtocolHand
         shutdownConnection(conn);
         State state = getState(conn);
         if (state != null) {
-            closeHandler(state, cause);
-            state.reset();
+            synchronized (state) {
+                closeHandler(state, cause);
+                state.reset();
+            }
         } else {
             log(cause);
         }
@@ -90,43 +94,45 @@ public class HttpAsyncClientProtocolHand
     public void requestReady(
             final NHttpClientConnection conn) throws IOException, HttpException {
         State state = ensureNotNull(getState(conn));
-        if (state.getRequestState() != MessageState.READY) {
-            return;
-        }
-        HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
-        if (handler != null && handler.isDone()) {
-            closeHandler(state);
-            state.reset();
-            handler = null;
-        }
-        if (handler == null) {
-            handler = (HttpAsyncClientExchangeHandler<?>) conn.getContext().removeAttribute(
-                    HTTP_HANDLER);
-            state.setHandler(handler);
-        }
-        if (handler == null) {
-            return;
-        }
-        HttpContext context = handler.getContext();
-        HttpRequest request = handler.generateRequest();
-        state.setRequest(request);
-
-        conn.submitRequest(request);
-
-        if (request instanceof HttpEntityEnclosingRequest) {
-            if (((HttpEntityEnclosingRequest) request).expectContinue()) {
-                int timeout = conn.getSocketTimeout();
-                state.setTimeout(timeout);
-                timeout = request.getParams().getIntParameter(
-                        CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
-                conn.setSocketTimeout(timeout);
-                state.setRequestState(MessageState.ACK_EXPECTED);
+        synchronized (state) {
+            if (state.getRequestState() != MessageState.READY) {
+                return;
+            }
+            HttpAsyncClientExchangeHandler<?> handler = state.getHandler();
+            if (handler != null && handler.isDone()) {
+                closeHandler(state);
+                state.reset();
+                handler = null;
+            }
+            if (handler == null) {
+                handler = (HttpAsyncClientExchangeHandler<?>) conn.getContext().removeAttribute(
+                        HTTP_HANDLER);
+                state.setHandler(handler);
+            }
+            if (handler == null) {
+                return;
+            }
+            HttpContext context = handler.getContext();
+            HttpRequest request = handler.generateRequest();
+            state.setRequest(request);
+
+            conn.submitRequest(request);
+
+            if (request instanceof HttpEntityEnclosingRequest) {
+                if (((HttpEntityEnclosingRequest) request).expectContinue()) {
+                    int timeout = conn.getSocketTimeout();
+                    state.setTimeout(timeout);
+                    timeout = request.getParams().getIntParameter(
+                            CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
+                    conn.setSocketTimeout(timeout);
+                    state.setRequestState(MessageState.ACK_EXPECTED);
+                } else {
+                    state.setRequestState(MessageState.BODY_STREAM);
+                }
             } else {
-                state.setRequestState(MessageState.BODY_STREAM);
+                handler.requestCompleted(context);
+                state.setRequestState(MessageState.COMPLETED);
             }
-        } else {
-            handler.requestCompleted(context);
-            state.setRequestState(MessageState.COMPLETED);
         }
     }
 
@@ -134,60 +140,64 @@ public class HttpAsyncClientProtocolHand
             final NHttpClientConnection conn,
             final ContentEncoder encoder) throws IOException {
         State state = ensureNotNull(getState(conn));
-        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
-        if (state.getRequestState() == MessageState.ACK_EXPECTED) {
-            conn.suspendOutput();
-            return;
-        }
-        HttpContext context = handler.getContext();
-        handler.produceContent(encoder, conn);
-        state.setRequestState(MessageState.BODY_STREAM);
-        if (encoder.isCompleted()) {
-            handler.requestCompleted(context);
-            state.setRequestState(MessageState.COMPLETED);
+        synchronized (state) {
+            HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
+            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                conn.suspendOutput();
+                return;
+            }
+            HttpContext context = handler.getContext();
+            handler.produceContent(encoder, conn);
+            state.setRequestState(MessageState.BODY_STREAM);
+            if (encoder.isCompleted()) {
+                handler.requestCompleted(context);
+                state.setRequestState(MessageState.COMPLETED);
+            }
         }
     }
 
     public void responseReceived(
             final NHttpClientConnection conn) throws HttpException, IOException {
         State state = ensureNotNull(getState(conn));
-        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
-        HttpResponse response = conn.getHttpResponse();
-        HttpRequest request = state.getRequest();
-
-        int statusCode = response.getStatusLine().getStatusCode();
-        if (statusCode < HttpStatus.SC_OK) {
-            // 1xx intermediate response
-            if (statusCode != HttpStatus.SC_CONTINUE) {
-                throw new ProtocolException(
-                        "Unexpected response: " + response.getStatusLine());
+        synchronized (state) {
+            HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
+            HttpResponse response = conn.getHttpResponse();
+            HttpRequest request = state.getRequest();
+
+            int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode < HttpStatus.SC_OK) {
+                // 1xx intermediate response
+                if (statusCode != HttpStatus.SC_CONTINUE) {
+                    throw new ProtocolException(
+                            "Unexpected response: " + response.getStatusLine());
+                }
+                if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                    int timeout = state.getTimeout();
+                    conn.setSocketTimeout(timeout);
+                    conn.requestOutput();
+                    state.setRequestState(MessageState.ACK);
+                }
+                return;
             }
+            state.setResponse(response);
             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
                 int timeout = state.getTimeout();
                 conn.setSocketTimeout(timeout);
-                conn.requestOutput();
-                state.setRequestState(MessageState.ACK);
+                conn.resetOutput();
+                state.setRequestState(MessageState.COMPLETED);
+            } else if (state.getRequestState() == MessageState.BODY_STREAM) {
+                // Early response
+                conn.resetOutput();
+                conn.suspendOutput();
+                state.setRequestState(MessageState.COMPLETED);
+                state.invalidate();
+            }
+            handler.responseReceived(response);
+            state.setResponseState(MessageState.BODY_STREAM);
+            if (!canResponseHaveBody(request, response)) {
+                conn.resetInput();
+                processResponse(conn, state, handler);
             }
-            return;
-        }
-        state.setResponse(response);
-        if (state.getRequestState() == MessageState.ACK_EXPECTED) {
-            int timeout = state.getTimeout();
-            conn.setSocketTimeout(timeout);
-            conn.resetOutput();
-            state.setRequestState(MessageState.COMPLETED);
-        } else if (state.getRequestState() == MessageState.BODY_STREAM) {
-            // Early response
-            conn.resetOutput();
-            conn.suspendOutput();
-            state.setRequestState(MessageState.COMPLETED);
-            state.invalidate();
-        }
-        handler.responseReceived(response);
-        state.setResponseState(MessageState.BODY_STREAM);
-        if (!canResponseHaveBody(request, response)) {
-            conn.resetInput();
-            processResponse(conn, state, handler);
         }
     }
 
@@ -195,11 +205,13 @@ public class HttpAsyncClientProtocolHand
             final NHttpClientConnection conn,
             final ContentDecoder decoder) throws IOException {
         State state = ensureNotNull(getState(conn));
-        HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
-        handler.consumeContent(decoder, conn);
-        state.setResponseState(MessageState.BODY_STREAM);
-        if (decoder.isCompleted()) {
-            processResponse(conn, state, handler);
+        synchronized (state) {
+            HttpAsyncClientExchangeHandler<?> handler = ensureNotNull(state.getHandler());
+            handler.consumeContent(decoder, conn);
+            state.setResponseState(MessageState.BODY_STREAM);
+            if (decoder.isCompleted()) {
+                processResponse(conn, state, handler);
+            }
         }
     }
 
@@ -207,14 +219,16 @@ public class HttpAsyncClientProtocolHand
             final NHttpClientConnection conn) throws IOException {
         State state = getState(conn);
         if (state != null) {
-            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
-                int timeout = state.getTimeout();
-                conn.setSocketTimeout(timeout);
-                conn.requestOutput();
-                state.setRequestState(MessageState.BODY_STREAM);
-                return;
-            } else {
-                closeHandler(state, new SocketTimeoutException());
+            synchronized (state) {
+                if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                    int timeout = state.getTimeout();
+                    conn.setSocketTimeout(timeout);
+                    conn.requestOutput();
+                    state.setRequestState(MessageState.BODY_STREAM);
+                    return;
+                } else {
+                    closeHandler(state, new SocketTimeoutException());
+                }
             }
         }
         if (conn.getStatus() == NHttpConnection.ACTIVE) {

Modified: httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java?rev=1211116&r1=1211115&r2=1211116&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java (original)
+++ httpcomponents/httpcore/trunk/httpcore-nio/src/main/java/org/apache/http/nio/protocol/HttpAsyncServiceHandler.java Tue Dec  6 20:50:19 2011
@@ -118,12 +118,14 @@ public class HttpAsyncServiceHandler imp
     public void closed(final NHttpServerConnection conn) {
         State state = getState(conn);
         if (state != null) {
-            closeHandlers(state);
-            Cancellable asyncProcess = state.getAsyncProcess();
-            if (asyncProcess != null) {
-                asyncProcess.cancel();
+            synchronized (state) {
+                closeHandlers(state);
+                Cancellable asyncProcess = state.getAsyncProcess();
+                if (asyncProcess != null) {
+                    asyncProcess.cancel();
+                }
+                state.reset();
             }
-            state.reset();
         }
     }
 
@@ -131,34 +133,38 @@ public class HttpAsyncServiceHandler imp
             final NHttpServerConnection conn, final Exception cause) {
         State state = ensureNotNull(getState(conn));
         if (state != null) {
-            closeHandlers(state, cause);
-            if (cause instanceof HttpException) {
-                if (conn.isResponseSubmitted() || state.getResponseState() != MessageState.READY) {
-                    // There is not much that we can do if a response
-                    // has already been submitted
-                    closeConnection(conn);
-                } else {
-                    HttpContext context = state.getContext();
-                    HttpAsyncResponseProducer responseProducer = handleException(cause, context);
-                    state.setResponseProducer(responseProducer);
-                    try {
-                        HttpResponse response = responseProducer.generateResponse();
-                        state.setResponse(response);
-                        commitFinalResponse(conn, state);
-                    } catch (Exception ex) {
-                        shutdownConnection(conn);
-                        closeHandlers(state);
-                        state.reset();
-                        if (ex instanceof RuntimeException) {
-                            throw (RuntimeException) ex;
-                        } else {
-                            log(ex);
+            synchronized (state) {
+                closeHandlers(state, cause);
+                if (cause instanceof HttpException) {
+                    if (conn.isResponseSubmitted()
+                            || state.getResponseState() != MessageState.READY) {
+                        // There is not much that we can do if a response
+                        // has already been submitted
+                        closeConnection(conn);
+                    } else {
+                        HttpContext context = state.getContext();
+                        HttpAsyncResponseProducer responseProducer = handleException(
+                                cause, context);
+                        state.setResponseProducer(responseProducer);
+                        try {
+                            HttpResponse response = responseProducer.generateResponse();
+                            state.setResponse(response);
+                            commitFinalResponse(conn, state);
+                        } catch (Exception ex) {
+                            shutdownConnection(conn);
+                            closeHandlers(state);
+                            state.reset();
+                            if (ex instanceof RuntimeException) {
+                                throw (RuntimeException) ex;
+                            } else {
+                                log(ex);
+                            }
                         }
                     }
+                } else {
+                    shutdownConnection(conn);
+                    state.reset();
                 }
-            } else {
-                shutdownConnection(conn);
-                state.reset();
             }
         } else {
             shutdownConnection(conn);
@@ -169,45 +175,47 @@ public class HttpAsyncServiceHandler imp
     public void requestReceived(
             final NHttpServerConnection conn) throws IOException, HttpException {
         State state = ensureNotNull(getState(conn));
-        HttpRequest request = conn.getHttpRequest();
-        HttpContext context = state.getContext();
-        request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
-
-        context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
-        context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
-        this.httpProcessor.process(request, context);
-
-        state.setRequest(request);
-        HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
-        state.setRequestHandler(requestHandler);
-        HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
-        state.setRequestConsumer(consumer);
-
-        consumer.requestReceived(request);
-
-        if (request instanceof HttpEntityEnclosingRequest) {
-            if (((HttpEntityEnclosingRequest) request).expectContinue()) {
-                state.setRequestState(MessageState.ACK_EXPECTED);
-                HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
-                        HttpStatus.SC_CONTINUE, context);
-                if (this.expectationVerifier != null) {
-                    conn.suspendInput();
-                    HttpAsyncServiceExchange httpex = new Exchange(
-                            request, ack, state, conn);
-                    Cancellable asyncProcess = this.expectationVerifier.verify(httpex, context);
-                    state.setAsyncProcess(asyncProcess);
+        synchronized (state) {
+            HttpRequest request = conn.getHttpRequest();
+            HttpContext context = state.getContext();
+            request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
+
+            context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+            context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
+            this.httpProcessor.process(request, context);
+
+            state.setRequest(request);
+            HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
+            state.setRequestHandler(requestHandler);
+            HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
+            state.setRequestConsumer(consumer);
+
+            consumer.requestReceived(request);
+
+            if (request instanceof HttpEntityEnclosingRequest) {
+                if (((HttpEntityEnclosingRequest) request).expectContinue()) {
+                    state.setRequestState(MessageState.ACK_EXPECTED);
+                    HttpResponse ack = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1,
+                            HttpStatus.SC_CONTINUE, context);
+                    if (this.expectationVerifier != null) {
+                        conn.suspendInput();
+                        HttpAsyncServiceExchange httpex = new Exchange(
+                                request, ack, state, conn);
+                        Cancellable asyncProcess = this.expectationVerifier.verify(httpex, context);
+                        state.setAsyncProcess(asyncProcess);
+                    } else {
+                        conn.submitResponse(ack);
+                        state.setRequestState(MessageState.BODY_STREAM);
+                    }
                 } else {
-                    conn.submitResponse(ack);
                     state.setRequestState(MessageState.BODY_STREAM);
                 }
             } else {
-                state.setRequestState(MessageState.BODY_STREAM);
+                // No request content is expected.
+                // Process request right away
+                conn.suspendInput();
+                processRequest(conn, state);
             }
-        } else {
-            // No request content is expected.
-            // Process request right away
-            conn.suspendInput();
-            processRequest(conn, state);
         }
     }
 
@@ -215,55 +223,59 @@ public class HttpAsyncServiceHandler imp
             final NHttpServerConnection conn,
             final ContentDecoder decoder) throws IOException, HttpException {
         State state = ensureNotNull(getState(conn));
-        HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
-        consumer.consumeContent(decoder, conn);
-        state.setRequestState(MessageState.BODY_STREAM);
-        if (decoder.isCompleted()) {
-            conn.suspendInput();
-            processRequest(conn, state);
+        synchronized (state) {
+            HttpAsyncRequestConsumer<?> consumer = ensureNotNull(state.getRequestConsumer());
+            consumer.consumeContent(decoder, conn);
+            state.setRequestState(MessageState.BODY_STREAM);
+            if (decoder.isCompleted()) {
+                conn.suspendInput();
+                processRequest(conn, state);
+            }
         }
     }
 
     public void responseReady(
             final NHttpServerConnection conn) throws IOException, HttpException {
         State state = ensureNotNull(getState(conn));
-        if (state.getResponse() != null) {
-            return;
-        }
-        HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
-        if (responseProducer == null) {
-            return;
-        }
-        HttpContext context = state.getContext();
-        HttpResponse response = responseProducer.generateResponse();
-        int status = response.getStatusLine().getStatusCode();
-        if (state.getRequestState() == MessageState.ACK_EXPECTED) {
-            if (status == 100) {
-                state.setResponseProducer(null);
-                try {
-                    // Make sure 100 response has no entity
-                    response.setEntity(null);
-                    conn.requestInput();
-                    state.setRequestState(MessageState.BODY_STREAM);
-                    conn.submitResponse(response);
-                    responseProducer.responseCompleted(context);
-                } finally {
-                    responseProducer.close();
-                }
-            } else if (status >= 400) {
-                conn.resetInput();
-                state.setRequestState(MessageState.COMPLETED);
-                state.setResponse(response);
-                commitFinalResponse(conn, state);
-            } else {
-                throw new HttpException("Invalid response: " + response.getStatusLine());
+        synchronized (state) {
+            if (state.getResponse() != null) {
+                return;
             }
-        } else {
-            if (status >= 200) {
-                state.setResponse(response);
-                commitFinalResponse(conn, state);
+            HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+            if (responseProducer == null) {
+                return;
+            }
+            HttpContext context = state.getContext();
+            HttpResponse response = responseProducer.generateResponse();
+            int status = response.getStatusLine().getStatusCode();
+            if (state.getRequestState() == MessageState.ACK_EXPECTED) {
+                if (status == 100) {
+                    try {
+                        // Make sure 100 response has no entity
+                        response.setEntity(null);
+                        conn.requestInput();
+                        state.setRequestState(MessageState.BODY_STREAM);
+                        conn.submitResponse(response);
+                        responseProducer.responseCompleted(context);
+                    } finally {
+                        state.setResponseProducer(null);
+                        responseProducer.close();
+                    }
+                } else if (status >= 400) {
+                    conn.resetInput();
+                    state.setRequestState(MessageState.COMPLETED);
+                    state.setResponse(response);
+                    commitFinalResponse(conn, state);
+                } else {
+                    throw new HttpException("Invalid response: " + response.getStatusLine());
+                }
             } else {
-                throw new HttpException("Invalid response: " + response.getStatusLine());
+                if (status >= 200) {
+                    state.setResponse(response);
+                    commitFinalResponse(conn, state);
+                } else {
+                    throw new HttpException("Invalid response: " + response.getStatusLine());
+                }
             }
         }
     }
@@ -272,28 +284,32 @@ public class HttpAsyncServiceHandler imp
             final NHttpServerConnection conn,
             final ContentEncoder encoder) throws IOException {
         State state = ensureNotNull(getState(conn));
-        HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
-        HttpContext context = state.getContext();
-        HttpResponse response = state.getResponse();
+        synchronized (state) {
+            HttpAsyncResponseProducer responseProducer = state.getResponseProducer();
+            HttpContext context = state.getContext();
+            HttpResponse response = state.getResponse();
 
-        responseProducer.produceContent(encoder, conn);
-        state.setResponseState(MessageState.BODY_STREAM);
-        if (encoder.isCompleted()) {
-            responseProducer.responseCompleted(context);
-            if (!this.connStrategy.keepAlive(response, context)) {
-                conn.close();
-            } else {
-                conn.requestInput();
+            responseProducer.produceContent(encoder, conn);
+            state.setResponseState(MessageState.BODY_STREAM);
+            if (encoder.isCompleted()) {
+                responseProducer.responseCompleted(context);
+                if (!this.connStrategy.keepAlive(response, context)) {
+                    conn.close();
+                } else {
+                    conn.requestInput();
+                }
+                closeHandlers(state);
+                state.reset();
             }
-            closeHandlers(state);
-            state.reset();
         }
     }
 
     public void timeout(final NHttpServerConnection conn) throws IOException {
         State state = getState(conn);
         if (state != null) {
-            closeHandlers(state, new SocketTimeoutException());
+            synchronized (state) {
+                closeHandlers(state, new SocketTimeoutException());
+            }
         }
         if (conn.getStatus() == NHttpConnection.ACTIVE) {
             conn.close();
@@ -626,8 +642,6 @@ public class HttpAsyncServiceHandler imp
         private final State state;
         private final NHttpServerConnection conn;
 
-        private volatile boolean completed;
-
         public Exchange(
                 final HttpRequest request,
                 final HttpResponse response,
@@ -648,16 +662,17 @@ public class HttpAsyncServiceHandler imp
             return this.response;
         }
 
-        public synchronized void submitResponse(final HttpAsyncResponseProducer responseProducer) {
+        public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
             if (responseProducer == null) {
                 throw new IllegalArgumentException("Response producer may not be null");
             }
-            if (this.completed) {
-                throw new IllegalStateException("Response already submitted");
+            synchronized (this.state) {
+                if (this.state.getResponseProducer() != null) {
+                    throw new IllegalStateException("Response already submitted");
+                }
+                this.state.setResponseProducer(responseProducer);
+                this.conn.requestOutput();
             }
-            this.completed = true;
-            this.state.setResponseProducer(responseProducer);
-            this.conn.requestOutput();
         }
 
         public void submitResponse() {
@@ -665,7 +680,9 @@ public class HttpAsyncServiceHandler imp
         }
 
         public boolean isCompleted() {
-            return this.completed;
+            synchronized (this.state) {
+                return this.state.getResponseProducer() != null;
+            }
         }
 
     }