You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/10/29 07:09:27 UTC
svn commit: r708800 - in
/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp:
ClientHandler.java HttpCoreNIOSender.java NhttpConstants.java
ServerHandler.java util/SharedOutputBuffer.java
Author: asankha
Date: Tue Oct 28 23:09:27 2008
New Revision: 708800
URL: http://svn.apache.org/viewvc?rev=708800&view=rev
Log:
fix SYNAPSE-341 and SYNAPSE-344
Modified:
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Tue Oct 28 23:09:27 2008
@@ -128,7 +128,8 @@
* @param conn the connection to use to send the request, which has been kept open
* @param axis2Req the new request
*/
- public void submitRequest(final NHttpClientConnection conn, Axis2HttpRequest axis2Req) {
+ public void submitRequest(final NHttpClientConnection conn, Axis2HttpRequest axis2Req)
+ throws ConnectionClosedException {
processConnection(conn, axis2Req);
}
@@ -142,7 +143,14 @@
if (log.isDebugEnabled() ) {
log.debug("ClientHandler connected : " + conn);
}
- processConnection(conn, (Axis2HttpRequest) attachment);
+ try {
+ processConnection(conn, (Axis2HttpRequest) attachment);
+ } catch (ConnectionClosedException e) {
+ if (metrics != null) {
+ metrics.incrementFaultsSending();
+ }
+ handleException("I/O Error submitting request : " + e.getMessage(), e, conn);
+ }
}
/**
@@ -150,7 +158,8 @@
* @param conn
* @param axis2Req
*/
- private void processConnection(final NHttpClientConnection conn, final Axis2HttpRequest axis2Req) {
+ private void processConnection(final NHttpClientConnection conn,
+ final Axis2HttpRequest axis2Req) throws ConnectionClosedException {
try {
// Reset connection metrics
@@ -175,7 +184,8 @@
conn.submitRequest(request);
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
-
+ } catch (ConnectionClosedException e) {
+ throw e;
} catch (IOException e) {
if (metrics != null) {
metrics.incrementFaultsSending();
@@ -268,8 +278,12 @@
*/
public void exception(final NHttpClientConnection conn, final IOException e) {
String message = getErrorMessage("I/O error : " + e.getMessage(), conn);
- log.error(message, e);
- checkAxisRequestComplete(conn, NhttpConstants.SND_IO_ERROR_SENDING, message, e);
+ if (message.toLowerCase().indexOf("reset") != -1) {
+ log.warn(message);
+ } else {
+ log.error(message, e);
+ }
+ checkAxisRequestComplete(conn, NhttpConstants.SND_IO_ERROR_SENDING, message, e);
shutdownConnection(conn);
}
@@ -434,7 +448,12 @@
}
// reset metrics on connection
conn.getMetrics().reset();
- if (!connStrategy.keepAlive(response, context)) {
+ if (context.getAttribute(NhttpConstants.DISCARD_ON_COMPLETE) != null) {
+ try {
+ // this is a connection we should not re-use
+ conn.shutdown();
+ } catch (Exception ignore) {}
+ } else if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
ConnectionPool.release(conn);
@@ -507,13 +526,13 @@
}
if (!req.isSendingCompleted()) {
req.getMsgContext().setProperty(NhttpConstants.ERROR_CODE, NhttpConstants.SEND_ABORT);
- req.setCompleted(true);
SharedOutputBuffer outputBuffer = (SharedOutputBuffer)
conn.getContext().getAttribute(REQUEST_SOURCE_BUFFER);
if (outputBuffer != null) {
outputBuffer.shutdown();
}
log.warn("Remote server aborted request being sent and replied : " + conn);
+ context.setAttribute(NhttpConstants.DISCARD_ON_COMPLETE, Boolean.TRUE);
if (metrics != null) {
metrics.incrementFaultsSending(NhttpConstants.SEND_ABORT, req.getMsgContext());
}
@@ -698,7 +717,11 @@
// ----------- utility methods -----------
private void handleException(String msg, Exception e, NHttpClientConnection conn) {
- log.error(msg, e);
+ if (msg.toLowerCase().indexOf("reset") != -1) {
+ log.warn(msg);
+ } else {
+ log.error(msg, e);
+ }
if (conn != null) {
shutdownConnection(conn);
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java Tue Oct 28 23:09:27 2008
@@ -319,7 +319,15 @@
log.debug("A new connection established to : " + url.getHost() + ":" + port);
}
} else {
- handler.submitRequest(conn, axis2Req);
+ try {
+ handler.submitRequest(conn, axis2Req);
+ } catch (ConnectionClosedException e) {
+ ioReactor.connect(new InetSocketAddress(url.getHost(), port),
+ null, axis2Req, sessionRequestCallback);
+ if (log.isDebugEnabled()) {
+ log.debug("A new connection established to : " + url.getHost() + ":" + port);
+ }
+ }
if (log.isDebugEnabled()) {
log.debug("An existing connection reused to : " + url.getHost() + ":" + port);
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java Tue Oct 28 23:09:27 2008
@@ -28,6 +28,7 @@
public static final String DISABLE_KEEPALIVE = "http.connection.disable.keepalive";
public static final String IGNORE_SC_ACCEPTED = "IGNORE_SC_ACCEPTED";
public static final String FORCE_SC_ACCEPTED = "FORCE_SC_ACCEPTED";
+ public static final String DISCARD_ON_COMPLETE = "DISCARD_ON_COMPLETE";
public static final String WSDL_EPR_PREFIX = "WSDLEPRPrefix";
public static final String REMOTE_HOST ="REMOTE_HOST";
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java Tue Oct 28 23:09:27 2008
@@ -201,7 +201,13 @@
}
if (encoder.isCompleted()) {
- if (!connStrategy.keepAlive(response, context)) {
+ Boolean reqRead = (Boolean) conn.getContext().getAttribute(NhttpConstants.REQUEST_READ);
+ if (reqRead != null && !reqRead) {
+ try {
+ // this is a connection we should not re-use
+ conn.close();
+ } catch (Exception ignore) {}
+ } else if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
conn.requestInput();
@@ -348,7 +354,13 @@
"was closed):" + e.getMessage());
}
} else {
- log.error("I/O error: " + e.getMessage(), e);
+ String msg = e.getMessage().toLowerCase();
+ if (msg.indexOf("broken") != -1) {
+ log.warn("I/O error (Probably the connection " +
+ "was closed by the remote party):" + e.getMessage());
+ } else {
+ log.error("I/O error: " + e.getMessage(), e);
+ }
if (metrics != null) {
metrics.incrementFaultsReceiving();
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java Tue Oct 28 23:09:27 2008
@@ -44,7 +44,7 @@
private final Object mutex;
private volatile boolean shutdown = false;
- private volatile boolean endOfStream;
+ private volatile boolean endOfStream = false;
public SharedOutputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
super(buffersize, allocator);
@@ -53,7 +53,6 @@
}
this.ioctrl = ioctrl;
this.mutex = new Object();
- this.endOfStream = false;
}
public void reset() {
@@ -96,10 +95,7 @@
}
public void close() {
- if (this.shutdown) {
- return;
- }
- writeCompleted();
+ shutdown();
}
public void shutdown() {
@@ -152,7 +148,7 @@
flushContent();
setInputMode();
}
- this.buffer.put((byte) b);
+ this.buffer.put((byte)b);
}
}
@@ -175,7 +171,7 @@
}
}
- public void writeCompleted() {
+ public void writeCompleted() throws IOException {
synchronized (this.mutex) {
if (this.endOfStream) {
return;
@@ -185,4 +181,4 @@
}
}
-}
+}
\ No newline at end of file