You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hc.apache.org by ol...@apache.org on 2007/06/04 11:32:23 UTC
svn commit: r544101 - in /jakarta/httpcomponents/httpcore/trunk:
module-nio/src/main/java/org/apache/http/nio/protocol/
module-nio/src/main/java/org/apache/http/nio/util/
module-niossl/src/test/java/org/apache/http/impl/nio/reactor/
Author: olegk
Date: Mon Jun 4 02:32:10 2007
New Revision: 544101
URL: http://svn.apache.org/viewvc?view=rev&rev=544101
Log:
A complete re-write of the ThrottlingHttpServiceHandler
Modified:
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java
jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java
jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java?view=diff&rev=544101&r1=544100&r2=544101
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/protocol/ThrottlingHttpServiceHandler.java Mon Jun 4 02:32:10 2007
@@ -172,35 +172,12 @@
}
}
- public void exception(final NHttpServerConnection conn, final HttpException httpex) {
-
- final HttpContext context = conn.getContext();
- final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
+ public void exception(final NHttpServerConnection conn, final HttpException ex) {
+ shutdownConnection(conn);
- this.executor.execute(new Runnable() {
-
- public void run() {
- try {
-
- HttpContext context = new HttpExecutionContext(conn.getContext());
- context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
- handleException(connState, httpex, context);
- commitResponse(connState, conn);
-
- } catch (IOException ex) {
- shutdownConnection(conn);
- if (eventListener != null) {
- eventListener.fatalIOException(ex, conn);
- }
- } catch (HttpException ex) {
- shutdownConnection(conn);
- if (eventListener != null) {
- eventListener.fatalProtocolException(ex, conn);
- }
- }
- }
-
- });
+ if (this.eventListener != null) {
+ this.eventListener.fatalProtocolException(ex, conn);
+ }
}
public void exception(final NHttpServerConnection conn, final IOException ex) {
@@ -222,39 +199,51 @@
public void requestReceived(final NHttpServerConnection conn) {
HttpContext context = conn.getContext();
- HttpRequest request = conn.getHttpRequest();
- HttpParamsLinker.link(request, this.params);
-
+ final HttpRequest request = conn.getHttpRequest();
final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
- connState.resetInput();
- connState.setRequest(request);
- connState.setInputState(ServerConnState.REQUEST_RECEIVED);
+ synchronized (connState) {
+ connState.setRequest(request);
+ connState.setInputState(ServerConnState.REQUEST_RECEIVED);
- this.executor.execute(new Runnable() {
-
- public void run() {
- try {
- HttpContext context = new HttpExecutionContext(conn.getContext());
- context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
- handleRequest(connState, context);
- commitResponse(connState, conn);
-
- } catch (IOException ex) {
- shutdownConnection(conn);
- if (eventListener != null) {
- eventListener.fatalIOException(ex, conn);
- }
- } catch (HttpException ex) {
- shutdownConnection(conn);
- if (eventListener != null) {
- eventListener.fatalProtocolException(ex, conn);
- }
+ boolean contentExpected = false;
+ if (request instanceof HttpEntityEnclosingRequest) {
+ HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
+ if (entity != null) {
+ contentExpected = true;
}
}
- });
+ if (!contentExpected) {
+ conn.suspendInput();
+ }
+
+ this.executor.execute(new Runnable() {
+
+ public void run() {
+ try {
+ HttpContext context = new HttpExecutionContext(conn.getContext());
+ handleRequest(connState, conn, request, context);
+
+ } catch (IOException ex) {
+ shutdownConnection(conn);
+ if (eventListener != null) {
+ eventListener.fatalIOException(ex, conn);
+ }
+ } catch (HttpException ex) {
+ shutdownConnection(conn);
+ if (eventListener != null) {
+ eventListener.fatalProtocolException(ex, conn);
+ }
+ }
+ }
+
+ });
+
+ connState.notifyAll();
+ }
+
}
public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
@@ -263,14 +252,17 @@
ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
ContentInputBuffer buffer = connState.getInbuffer();
- // Update connection state
- connState.setInputState(ServerConnState.REQUEST_BODY_STREAM);
-
try {
-
- buffer.consumeContent(decoder);
- if (decoder.isCompleted()) {
- connState.setInputState(ServerConnState.REQUEST_BODY_DONE);
+
+ synchronized (connState) {
+ buffer.consumeContent(decoder);
+ if (decoder.isCompleted()) {
+ connState.setInputState(ServerConnState.REQUEST_BODY_DONE);
+ } else {
+ connState.setInputState(ServerConnState.REQUEST_BODY_STREAM);
+ }
+
+ connState.notifyAll();
}
} catch (IOException ex) {
@@ -279,6 +271,7 @@
this.eventListener.fatalIOException(ex, conn);
}
}
+
}
public void responseReady(final NHttpServerConnection conn) {
@@ -286,16 +279,30 @@
ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
HttpResponse response = connState.getResponse();
- if (connState.getOutputState() != ServerConnState.RESPONSE_SENT
+ if (connState.getOutputState() == ServerConnState.READY
&& response != null
&& !conn.isResponseSubmitted()) {
try {
conn.submitResponse(response);
- // Notify the worker thread of the connection state
- // change
synchronized (connState) {
- connState.setOutputState(ServerConnState.RESPONSE_SENT);
+ int statusCode = response.getStatusLine().getStatusCode();
+ HttpEntity entity = response.getEntity();
+
+ if (statusCode >= 200 && entity == null) {
+ connState.resetOutput();
+ connState.resetInput();
+
+ if (!this.connStrategy.keepAlive(response, context)) {
+ conn.close();
+ } else {
+ // Ready for new request
+ conn.requestInput();
+ }
+ } else {
+ connState.setOutputState(ServerConnState.RESPONSE_SENT);
+ }
+
connState.notifyAll();
}
@@ -315,29 +322,31 @@
public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
HttpContext context = conn.getContext();
- HttpResponse response = conn.getHttpResponse();
ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
ContentOutputBuffer buffer = connState.getOutbuffer();
- // Update connection state
- connState.setOutputState(ServerConnState.RESPONSE_BODY_STREAM);
-
try {
- buffer.produceContent(encoder);
- if (encoder.isCompleted()) {
-
- // Notify the worker thread of the connection state
- // change
- synchronized (connState) {
- connState.setOutputState(ServerConnState.RESPONSE_BODY_DONE);
- connState.notifyAll();
- }
+ synchronized (connState) {
+ HttpResponse response = connState.getResponse();
- if (!this.connStrategy.keepAlive(response, context)) {
- conn.close();
+ buffer.produceContent(encoder);
+ if (encoder.isCompleted()) {
+ connState.resetOutput();
+ connState.resetInput();
+
+ if (!this.connStrategy.keepAlive(response, context)) {
+ conn.close();
+ } else {
+ // Ready for new request
+ conn.requestInput();
+ }
+ } else {
+ connState.setOutputState(ServerConnState.RESPONSE_BODY_STREAM);
}
+
+ connState.notifyAll();
}
} catch (IOException ex) {
@@ -362,7 +371,7 @@
}
}
- private void waitForOutput(
+ private void waitForOutputState(
final ServerConnState connState,
int expectedState) throws InterruptedIOException {
synchronized (connState) {
@@ -383,12 +392,15 @@
}
}
- private void handleException(
+ private HttpResponse handleException(
final ServerConnState connState,
+ final NHttpServerConnection conn,
final HttpException ex,
final HttpContext context) throws HttpException, IOException {
HttpRequest request = connState.getRequest();
+
+ context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
HttpVersion ver;
@@ -411,23 +423,26 @@
code,
context);
+ HttpParamsLinker.link(response, this.params);
+
byte[] msg = EncodingUtils.getAsciiBytes(ex.getMessage());
ByteArrayEntity entity = new ByteArrayEntity(msg);
entity.setContentType("text/plain; charset=US-ASCII");
response.setEntity(entity);
-
- context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
-
- this.httpProcessor.process(response, context);
-
- connState.setResponse(response);
+ return response;
}
private void handleRequest(
final ServerConnState connState,
+ final NHttpServerConnection conn,
+ final HttpRequest request,
final HttpContext context) throws HttpException, IOException {
- HttpRequest request = connState.getRequest();
+ waitForOutputState(connState, ServerConnState.READY);
+
+ HttpParamsLinker.link(request, this.params);
+
+ context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
HttpVersion ver = request.getRequestLine().getHttpVersion();
@@ -437,7 +452,7 @@
ver = HttpVersion.HTTP_1_1;
}
- HttpResponse response;
+ HttpResponse response = null;
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntityEnclosingRequest entityReq = (HttpEntityEnclosingRequest) request;
@@ -452,27 +467,27 @@
try {
this.expectationVerifier.verify(request, response, context);
} catch (HttpException ex) {
- handleException(connState, ex ,context);
- return;
+ response = handleException(connState, conn, ex ,context);
}
}
if (response.getStatusLine().getStatusCode() < 200) {
+
// Send 1xx response indicating the server expections
// have been met
- waitForOutput(connState, ServerConnState.READY);
- connState.setResponse(response);
synchronized (connState) {
- waitForOutput(connState, ServerConnState.RESPONSE_SENT);
+ connState.setResponse(response);
+ conn.requestOutput();
+ waitForOutputState(connState, ServerConnState.RESPONSE_SENT);
connState.resetOutput();
}
+ response = null;
} else {
- // The request does not meet the server expections
- context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
- this.httpProcessor.process(response, context);
- connState.setResponse(response);
- return;
+ // Discard entity
+ conn.resetInput();
+ entityReq.setEntity(null);
}
+
}
// Create a wrapper entity instead of the original one
@@ -483,49 +498,41 @@
}
}
- response = this.responseFactory.newHttpResponse(
- ver,
- HttpStatus.SC_OK,
- context);
- HttpParamsLinker.link(response, this.params);
+ if (response == null) {
+ response = this.responseFactory.newHttpResponse(
+ ver,
+ HttpStatus.SC_OK,
+ context);
+ HttpParamsLinker.link(response, this.params);
- context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
-
- try {
+ context.setAttribute(HttpExecutionContext.HTTP_RESPONSE, response);
+
+ try {
- this.httpProcessor.process(request, context);
+ this.httpProcessor.process(request, context);
- HttpRequestHandler handler = null;
- if (this.handlerResolver != null) {
- String requestURI = request.getRequestLine().getUri();
- handler = this.handlerResolver.lookup(requestURI);
- }
- if (handler != null) {
- handler.handle(request, response, context);
- } else {
- response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
- }
-
- } catch (HttpException ex) {
- handleException(connState, ex ,context);
- return;
+ HttpRequestHandler handler = null;
+ if (this.handlerResolver != null) {
+ String requestURI = request.getRequestLine().getUri();
+ handler = this.handlerResolver.lookup(requestURI);
+ }
+ if (handler != null) {
+ handler.handle(request, response, context);
+ } else {
+ response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
+ }
+
+ } catch (HttpException ex) {
+ response = handleException(connState, conn, ex ,context);
+ }
}
this.httpProcessor.process(response, context);
-
- connState.setResponse(response);
- }
-
- private void commitResponse(
- final ServerConnState connState,
- final IOControl ioControl) throws IOException, HttpException {
- waitForOutput(connState, ServerConnState.READY);
-
+ connState.setResponse(response);
// Response is ready to be committed
- HttpResponse response = connState.getResponse();
+ conn.requestOutput();
- int terminalState;
if (response.getEntity() != null) {
ContentOutputBuffer buffer = connState.getOutbuffer();
OutputStream outstream = new ContentOutputStream(buffer);
@@ -534,17 +541,9 @@
entity.writeTo(outstream);
outstream.flush();
outstream.close();
- terminalState = ServerConnState.RESPONSE_BODY_DONE;
- } else {
- ioControl.requestOutput();
- terminalState = ServerConnState.RESPONSE_SENT;
- }
- synchronized (connState) {
- waitForOutput(connState, terminalState);
- connState.resetOutput();
}
}
-
+
static class ServerConnState {
public static final int SHUTDOWN = -1;
Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java?view=diff&rev=544101&r1=544100&r2=544101
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/util/SharedOutputBuffer.java Mon Jun 4 02:32:10 2007
@@ -37,16 +37,11 @@
public class SharedOutputBuffer extends ExpandableBuffer implements ContentOutputBuffer {
- private static final int READY = 0;
- private static final int STREAMING = 1;
- private static final int CLOSING = 2;
- private static final int CLOSED = 4;
-
private final IOControl ioctrl;
private final Object mutex;
private volatile boolean shutdown = false;
- private volatile int state;
+ private volatile boolean endOfStream;
public SharedOutputBuffer(int buffersize, final IOControl ioctrl) {
super(buffersize);
@@ -55,7 +50,7 @@
}
this.ioctrl = ioctrl;
this.mutex = new Object();
- this.state = READY;
+ this.endOfStream = false;
}
public void reset() {
@@ -64,7 +59,7 @@
}
synchronized (this.mutex) {
clear();
- this.state = READY;
+ this.endOfStream = false;
}
}
@@ -78,17 +73,16 @@
if (hasData()) {
bytesWritten = encoder.write(this.buffer);
if (encoder.isCompleted()) {
- this.state = CLOSED;
+ this.endOfStream = false;
}
}
if (!hasData()) {
// No more buffered content
// If at the end of the stream, terminate
- if (this.state == CLOSING && !encoder.isCompleted()) {
+ if (this.endOfStream && !encoder.isCompleted()) {
encoder.complete();
- this.state = CLOSED;
}
- if (this.state == STREAMING) {
+ if (!this.endOfStream) {
// suspend output events
this.ioctrl.suspendOutput();
}
@@ -113,15 +107,14 @@
return;
}
synchronized (this.mutex) {
- if (this.shutdown || this.state == CLOSING || this.state == CLOSED) {
+ if (this.shutdown || this.endOfStream) {
throw new IllegalStateException("Buffer already closed for writing");
}
- this.state = STREAMING;
setInputMode();
int remaining = len;
while (remaining > 0) {
if (!this.buffer.hasRemaining()) {
- flush();
+ flushContent();
setInputMode();
}
int chunk = Math.min(remaining, this.buffer.remaining());
@@ -141,13 +134,12 @@
public void write(int b) throws IOException {
synchronized (this.mutex) {
- if (this.shutdown || this.state == CLOSING || this.state == CLOSED) {
+ if (this.shutdown || this.endOfStream) {
throw new IllegalStateException("Buffer already closed for writing");
}
- this.state = STREAMING;
setInputMode();
if (!this.buffer.hasRemaining()) {
- flush();
+ flushContent();
setInputMode();
}
this.buffer.put((byte)b);
@@ -155,6 +147,9 @@
}
public void flush() throws IOException {
+ }
+
+ private void flushContent() throws IOException {
synchronized (this.mutex) {
try {
while (hasData() && !this.shutdown) {
@@ -168,20 +163,11 @@
}
public void writeCompleted() throws IOException {
- if (this.state == CLOSING || this.state == CLOSED) {
+ if (this.endOfStream) {
return;
}
- synchronized (this.mutex) {
- this.state = CLOSING;
- try {
- while (this.state != CLOSED && !this.shutdown) {
- this.ioctrl.requestOutput();
- this.mutex.wait();
- }
- } catch (InterruptedException ex) {
- throw new IOException("Interrupted while closing the content buffer");
- }
- }
+ this.endOfStream = true;
+ this.ioctrl.requestOutput();
}
}
Modified: jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java?view=diff&rev=544101&r1=544100&r2=544101
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java (original)
+++ jakarta/httpcomponents/httpcore/trunk/module-niossl/src/test/java/org/apache/http/impl/nio/reactor/TestNIOSSLHttp.java Mon Jun 4 02:32:10 2007
@@ -117,7 +117,7 @@
protected void setUp() throws Exception {
HttpParams serverParams = new BasicHttpParams();
serverParams
- .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 2000)
+ .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 30000)
.setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(HttpConnectionParams.TCP_NODELAY, true)
@@ -127,7 +127,7 @@
HttpParams clientParams = new BasicHttpParams();
clientParams
- .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 2000)
+ .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 30000)
.setIntParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 2000)
.setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK, false)