You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2007/06/04 11:32:23 UTC

svn commit: r544101 - in /jakarta/httpcomponents/httpcore/trunk: module-nio/src/main/java/org/apache/http/nio/protocol/ module-nio/src/main/java/org/apache/http/nio/util/ module-niossl/src/test/java/org/apache/http/impl/nio/reactor/

Author: olegk
Date: Mon Jun  4 02:32:10 2007
New Revision: 544101

URL: http://svn.apache.org/viewvc?view=rev&rev=544101
Log:
A complete re-write of the ThrottlingHttpServiceHandler

Modified:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java
    jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java?view=diff&rev=544101&r1=544100&r2=544101
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java Mon Jun  4 02:32:10 2007
@@ -172,35 +172,12 @@
         }
     }
     
-    public void exception(final NHttpServerConnection conn, final HttpException httpex) {
-
-        final HttpContext context = conn.getContext();
-        final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
+    public void exception(final NHttpServerConnection conn, final HttpException ex) {
+        shutdownConnection(conn);
         
-        this.executor.execute(new Runnable() {
-            
-            public void run() {
-                try {
-
-                    HttpContext context = new HttpExecutionContext(conn.getContext());
-                    context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
-                    handleException(connState, httpex, context);
-                    commitResponse(connState, conn);
-                    
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
-                    if (eventListener != null) {
-                        eventListener.fatalIOException(ex, conn);
-                    }
-                } catch (HttpException ex) {
-                    shutdownConnection(conn);
-                    if (eventListener != null) {
-                        eventListener.fatalProtocolException(ex, conn);
-                    }
-                }
-            }
-            
-        });        
+        if (this.eventListener != null) {
+            this.eventListener.fatalProtocolException(ex, conn);
+        }
     }
 
     public void exception(final NHttpServerConnection conn, final IOException ex) {
@@ -222,39 +199,51 @@
     public void requestReceived(final NHttpServerConnection conn) {
         HttpContext context = conn.getContext();
         
-        HttpRequest request = conn.getHttpRequest();
-        HttpParamsLinker.link(request, this.params);
-
+        final HttpRequest request = conn.getHttpRequest();
         final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
 
-        connState.resetInput();
-        connState.setRequest(request);
-        connState.setInputState(ServerConnState.REQUEST_RECEIVED);
+        synchronized (connState) {
+            connState.setRequest(request);
+            connState.setInputState(ServerConnState.REQUEST_RECEIVED);
 
-        this.executor.execute(new Runnable() {
-            
-            public void run() {
-                try {
-                    HttpContext context = new HttpExecutionContext(conn.getContext());
-                    context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
-                    handleRequest(connState, context);
-                    commitResponse(connState, conn);
-                    
-                } catch (IOException ex) {
-                    shutdownConnection(conn);
-                    if (eventListener != null) {
-                        eventListener.fatalIOException(ex, conn);
-                    }
-                } catch (HttpException ex) {
-                    shutdownConnection(conn);
-                    if (eventListener != null) {
-                        eventListener.fatalProtocolException(ex, conn);
-                    }
+            boolean contentExpected = false;
+            if (request instanceof HttpEntityEnclosingRequest) {
+                HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
+                if (entity != null) {
+                    contentExpected = true;
                 }
             }
             
-        });
+            if (!contentExpected) {
+                conn.suspendInput();
+            }
+            
+            this.executor.execute(new Runnable() {
+                
+                public void run() {
+                    try {
 
+                        HttpContext context = new HttpExecutionContext(conn.getContext());
+                        handleRequest(connState, conn, request, context);
+                        
+                    } catch (IOException ex) {
+                        shutdownConnection(conn);
+                        if (eventListener != null) {
+                            eventListener.fatalIOException(ex, conn);
+                        }
+                    } catch (HttpException ex) {
+                        shutdownConnection(conn);
+                        if (eventListener != null) {
+                            eventListener.fatalProtocolException(ex, conn);
+                        }
+                    }
+                }
+                
+            });
+        
+            connState.notifyAll();
+        }
+        
     }
 
     public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
@@ -263,14 +252,17 @@
         ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
         ContentInputBuffer buffer = connState.getInbuffer();
 
-        // Update connection state
-        connState.setInputState(ServerConnState.REQUEST_BODY_STREAM);
-        
         try {
-            
-            buffer.consumeContent(decoder);
-            if (decoder.isCompleted()) {
-                connState.setInputState(ServerConnState.REQUEST_BODY_DONE);
+
+            synchronized (connState) {
+                buffer.consumeContent(decoder);
+                if (decoder.isCompleted()) {
+                    connState.setInputState(ServerConnState.REQUEST_BODY_DONE);
+                } else {
+                    connState.setInputState(ServerConnState.REQUEST_BODY_STREAM);
+                }
+                
+                connState.notifyAll();
             }
             
         } catch (IOException ex) {
@@ -279,6 +271,7 @@
                 this.eventListener.fatalIOException(ex, conn);
             }
         }
+        
     }
     
     public void responseReady(final NHttpServerConnection conn) {
@@ -286,16 +279,30 @@
 
         ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
         HttpResponse response = connState.getResponse();
-        if (connState.getOutputState() != ServerConnState.RESPONSE_SENT 
+        if (connState.getOutputState() == ServerConnState.READY 
                 && response != null 
                 && !conn.isResponseSubmitted()) {
             try {
                 conn.submitResponse(response);
 
-                // Notify the worker thread of the connection state
-                // change
                 synchronized (connState) {
-                    connState.setOutputState(ServerConnState.RESPONSE_SENT);
+                    int statusCode = response.getStatusLine().getStatusCode();
+                    HttpEntity entity = response.getEntity();
+
+                    if (statusCode >= 200 && entity == null) {
+                        connState.resetOutput();
+                        connState.resetInput();
+
+                        if (!this.connStrategy.keepAlive(response, context)) {
+                            conn.close();
+                        } else {
+                            // Ready for new request
+                            conn.requestInput();
+                        }
+                    } else {
+                        connState.setOutputState(ServerConnState.RESPONSE_SENT);
+                    }
+                    
                     connState.notifyAll();
                 }
                 
@@ -315,29 +322,31 @@
 
     public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
         HttpContext context = conn.getContext();
-        HttpResponse response = conn.getHttpResponse();
 
         ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
         ContentOutputBuffer buffer = connState.getOutbuffer();
         
-        // Update connection state
-        connState.setOutputState(ServerConnState.RESPONSE_BODY_STREAM);
-        
         try {
 
-            buffer.produceContent(encoder);
-            if (encoder.isCompleted()) {
-
-                // Notify the worker thread of the connection state
-                // change
-                synchronized (connState) {
-                    connState.setOutputState(ServerConnState.RESPONSE_BODY_DONE);
-                    connState.notifyAll();
-                }
+            synchronized (connState) {
+                HttpResponse response = connState.getResponse();
                 
-                if (!this.connStrategy.keepAlive(response, context)) {
-                    conn.close();
+                buffer.produceContent(encoder);
+                if (encoder.isCompleted()) {
+                    connState.resetOutput();
+                    connState.resetInput();
+
+                    if (!this.connStrategy.keepAlive(response, context)) {
+                        conn.close();
+                    } else {
+                        // Ready for new request
+                        conn.requestInput();
+                    }
+                } else {
+                    connState.setOutputState(ServerConnState.RESPONSE_BODY_STREAM);
                 }
+                
+                connState.notifyAll();
             }
             
         } catch (IOException ex) {
@@ -362,7 +371,7 @@
         }
     }
     
-    private void waitForOutput(
+    private void waitForOutputState(
             final ServerConnState connState, 
             int expectedState) throws InterruptedIOException {
         synchronized (connState) {
@@ -383,12 +392,15 @@
         }
     }
     
-    private void handleException(
+    private HttpResponse handleException(
             final ServerConnState connState,
+            final NHttpServerConnection conn,
             final HttpException ex,
             final HttpContext context) throws HttpException, IOException {
 
         HttpRequest request = connState.getRequest();
+
+        context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
         context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
         
         HttpVersion ver;
@@ -411,23 +423,26 @@
                 code, 
                 context);
 
+        HttpParamsLinker.link(response, this.params);
+        
         byte[] msg = EncodingUtils.getAsciiBytes(ex.getMessage());
         ByteArrayEntity entity = new ByteArrayEntity(msg);
         entity.setContentType("text/plain; charset=US-ASCII");
         response.setEntity(entity);
-
-        context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
-
-        this.httpProcessor.process(response, context);
-        
-        connState.setResponse(response);
+        return response;
     }
     
     private void handleRequest(
             final ServerConnState connState,
+            final NHttpServerConnection conn,
+            final HttpRequest request,
             final HttpContext context) throws HttpException, IOException {
 
-        HttpRequest request = connState.getRequest();
+        waitForOutputState(connState, ServerConnState.READY);
+        
+        HttpParamsLinker.link(request, this.params);
+        
+        context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
         context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
 
         HttpVersion ver = request.getRequestLine().getHttpVersion();
@@ -437,7 +452,7 @@
             ver = HttpVersion.HTTP_1_1;
         }
 
-        HttpResponse response;
+        HttpResponse response = null;
 
         if (request instanceof HttpEntityEnclosingRequest) {
             HttpEntityEnclosingRequest entityReq = (HttpEntityEnclosingRequest) request;
@@ -452,27 +467,27 @@
                     try {
                         this.expectationVerifier.verify(request, response, context);
                     } catch (HttpException ex) {
-                        handleException(connState, ex ,context);
-                        return;
+                        response = handleException(connState, conn, ex ,context);
                     }
                 }
             
                 if (response.getStatusLine().getStatusCode() < 200) {
+                    
                     // Send 1xx response indicating the server expections
                     // have been met
-                    waitForOutput(connState, ServerConnState.READY);
-                    connState.setResponse(response);
                     synchronized (connState) {
-                        waitForOutput(connState, ServerConnState.RESPONSE_SENT);
+                        connState.setResponse(response);
+                        conn.requestOutput();
+                        waitForOutputState(connState, ServerConnState.RESPONSE_SENT);
                         connState.resetOutput();
                     }
+                    response = null;
                 } else {
-                    // The request does not meet the server expections
-                    context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
-                    this.httpProcessor.process(response, context);
-                    connState.setResponse(response);
-                    return;
+                    // Discard entity
+                    conn.resetInput();
+                    entityReq.setEntity(null);
                 }
+
             }
 
             // Create a wrapper entity instead of the original one
@@ -483,49 +498,41 @@
             }
         }
 
-        response = this.responseFactory.newHttpResponse(
-                ver, 
-                HttpStatus.SC_OK, 
-                context);
-        HttpParamsLinker.link(response, this.params);
+        if (response == null) {
+            response = this.responseFactory.newHttpResponse(
+                    ver, 
+                    HttpStatus.SC_OK, 
+                    context);
+            HttpParamsLinker.link(response, this.params);
 
-        context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
-        
-        try {
+            context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
+            
+            try {
 
-            this.httpProcessor.process(request, context);
+                this.httpProcessor.process(request, context);
 
-            HttpRequestHandler handler = null;
-            if (this.handlerResolver != null) {
-                String requestURI = request.getRequestLine().getUri();
-                handler = this.handlerResolver.lookup(requestURI);
-            }
-            if (handler != null) {
-                handler.handle(request, response, context);
-            } else {
-                response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
-            }
-
-        } catch (HttpException ex) {
-            handleException(connState, ex ,context);
-            return;
+                HttpRequestHandler handler = null;
+                if (this.handlerResolver != null) {
+                    String requestURI = request.getRequestLine().getUri();
+                    handler = this.handlerResolver.lookup(requestURI);
+                }
+                if (handler != null) {
+                    handler.handle(request, response, context);
+                } else {
+                    response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
+                }
+
+            } catch (HttpException ex) {
+                response = handleException(connState, conn, ex ,context);
+            }
         }
 
         this.httpProcessor.process(response, context);
-        
-        connState.setResponse(response);
-    }
-    
-    private void commitResponse(
-            final ServerConnState connState,
-            final IOControl ioControl) throws IOException, HttpException {
 
-        waitForOutput(connState, ServerConnState.READY);
-        
+        connState.setResponse(response);
         // Response is ready to be committed
-        HttpResponse response = connState.getResponse();
+        conn.requestOutput();
 
-        int terminalState;
         if (response.getEntity() != null) {
             ContentOutputBuffer buffer = connState.getOutbuffer();
             OutputStream outstream = new ContentOutputStream(buffer);
@@ -534,17 +541,9 @@
             entity.writeTo(outstream);
             outstream.flush();
             outstream.close();
-            terminalState = ServerConnState.RESPONSE_BODY_DONE;
-        } else {
-            ioControl.requestOutput();
-            terminalState = ServerConnState.RESPONSE_SENT;
-        }
-        synchronized (connState) {
-            waitForOutput(connState, terminalState);
-            connState.resetOutput();
         }
     }
-
+    
     static class ServerConnState {
         
         public static final int SHUTDOWN                   = -1;

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java?view=diff&rev=544101&r1=544100&r2=544101
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java Mon Jun  4 02:32:10 2007
@@ -37,16 +37,11 @@
 
 public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
 
-    private static final int READY      = 0;
-    private static final int STREAMING  = 1;
-    private static final int CLOSING    = 2;
-    private static final int CLOSED     = 4;
-    
     private final IOControl ioctrl;
     private final Object mutex;
     
     private volatile boolean shutdown = false;
-    private volatile int state;
+    private volatile boolean endOfStream;
     
     public SharedOutputBuffer(int buffersize, final IOControl ioctrl) {
         super(buffersize);
@@ -55,7 +50,7 @@
         }
         this.ioctrl = ioctrl;
         this.mutex = new Object();
-        this.state = READY;
+        this.endOfStream = false;
     }
 
     public void reset() {
@@ -64,7 +59,7 @@
         }
         synchronized (this.mutex) {
             clear();
-            this.state = READY;
+            this.endOfStream = false;
         }
     }
     
@@ -78,17 +73,16 @@
             if (hasData()) {
                 bytesWritten = encoder.write(this.buffer);
                 if (encoder.isCompleted()) {
-                    this.state = CLOSED;
+                    this.endOfStream = false;
                 }
             }
             if (!hasData()) {
                 // No more buffered content
                 // If at the end of the stream, terminate
-                if (this.state == CLOSING && !encoder.isCompleted()) {
+                if (this.endOfStream && !encoder.isCompleted()) {
                     encoder.complete();
-                    this.state = CLOSED;
                 } 
-                if (this.state == STREAMING) {
+                if (!this.endOfStream) {
                     // suspend output events
                     this.ioctrl.suspendOutput();
                 }
@@ -113,15 +107,14 @@
             return;
         }
         synchronized (this.mutex) {
-            if (this.shutdown || this.state == CLOSING || this.state == CLOSED) {
+            if (this.shutdown || this.endOfStream) {
                 throw new IllegalStateException("Buffer already closed for writing");
             }
-            this.state = STREAMING;
             setInputMode();
             int remaining = len;
             while (remaining > 0) {
                 if (!this.buffer.hasRemaining()) {
-                    flush();
+                    flushContent();
                     setInputMode();
                 }
                 int chunk = Math.min(remaining, this.buffer.remaining());
@@ -141,13 +134,12 @@
 
     public void write(int b) throws IOException {
         synchronized (this.mutex) {
-            if (this.shutdown || this.state == CLOSING || this.state == CLOSED) {
+            if (this.shutdown || this.endOfStream) {
                 throw new IllegalStateException("Buffer already closed for writing");
             }
-            this.state = STREAMING;
             setInputMode();
             if (!this.buffer.hasRemaining()) {
-                flush();
+                flushContent();
                 setInputMode();
             }
             this.buffer.put((byte)b);
@@ -155,6 +147,9 @@
     }
 
     public void flush() throws IOException {
+    }
+
+    private void flushContent() throws IOException {
         synchronized (this.mutex) {
             try {
                 while (hasData() && !this.shutdown) {
@@ -168,20 +163,11 @@
     }
     
     public void writeCompleted() throws IOException {
-        if (this.state == CLOSING || this.state == CLOSED) {
+        if (this.endOfStream) {
             return;
         }
-        synchronized (this.mutex) {
-            this.state = CLOSING;
-            try {
-                while (this.state != CLOSED && !this.shutdown) {
-                    this.ioctrl.requestOutput();
-                    this.mutex.wait();
-                }
-            } catch (InterruptedException ex) {
-                throw new IOException("Interrupted while closing the content buffer");
-            }
-        }
+        this.endOfStream = true;
+        this.ioctrl.requestOutput();
     }
     
 }

Modified: jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java?view=diff&rev=544101&r1=544100&r2=544101
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java Mon Jun  4 02:32:10 2007
@@ -117,7 +117,7 @@
     protected void setUp() throws Exception {
         HttpParams serverParams = new BasicHttpParams();
         serverParams
-            .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 2000)
+            .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 30000)
             .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
             .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK, false)
             .setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
@@ -127,7 +127,7 @@
         
         HttpParams clientParams = new BasicHttpParams();
         clientParams
-            .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 2000)
+            .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 30000)
             .setIntParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 2000)
             .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
             .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK, false)