You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/10/29 07:09:27 UTC

svn commit: r708800 - in /synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp: ClientHandler.java HttpCoreNIOSender.java NhttpConstants.java ServerHandler.java util/SharedOutputBuffer.java

Author: asankha
Date: Tue Oct 28 23:09:27 2008
New Revision: 708800

URL: http://svn.apache.org/viewvc?rev=708800&view=rev
Log:
fix SYNAPSE-341 and SYNAPSE-344

Modified:
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
    synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Tue Oct 28 23:09:27 2008
@@ -128,7 +128,8 @@
      * @param conn the connection to use to send the request, which has been kept open
      * @param axis2Req the new request
      */
-    public void submitRequest(final NHttpClientConnection conn, Axis2HttpRequest axis2Req) {
+    public void submitRequest(final NHttpClientConnection conn, Axis2HttpRequest axis2Req)
+        throws ConnectionClosedException {
         processConnection(conn, axis2Req);
     }
 
@@ -142,7 +143,14 @@
         if (log.isDebugEnabled() ) {
             log.debug("ClientHandler connected : " + conn);
         }
-        processConnection(conn, (Axis2HttpRequest) attachment);
+        try {
+            processConnection(conn, (Axis2HttpRequest) attachment);
+        } catch (ConnectionClosedException e) {
+            if (metrics != null) {
+                metrics.incrementFaultsSending();
+            }
+            handleException("I/O Error submitting request : " + e.getMessage(), e, conn);
+        }
     }
 
     /**
@@ -150,7 +158,8 @@
      * @param conn
      * @param axis2Req
      */
-    private void processConnection(final NHttpClientConnection conn, final Axis2HttpRequest axis2Req) {
+    private void processConnection(final NHttpClientConnection conn,
+        final Axis2HttpRequest axis2Req) throws ConnectionClosedException {
 
         try {
             // Reset connection metrics
@@ -175,7 +184,8 @@
             
             conn.submitRequest(request);
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
-
+        } catch (ConnectionClosedException e) {
+            throw e;
         } catch (IOException e) {
             if (metrics != null) {
                 metrics.incrementFaultsSending();
@@ -268,8 +278,12 @@
      */
     public void exception(final NHttpClientConnection conn, final IOException e) {
         String message = getErrorMessage("I/O error : " + e.getMessage(), conn);
-        log.error(message, e);
-    	checkAxisRequestComplete(conn, NhttpConstants.SND_IO_ERROR_SENDING, message, e);
+        if (message.toLowerCase().indexOf("reset") != -1) {
+            log.warn(message);
+        } else {
+            log.error(message, e);
+        }
+        checkAxisRequestComplete(conn, NhttpConstants.SND_IO_ERROR_SENDING, message, e);
         shutdownConnection(conn);
     }
 
@@ -434,7 +448,12 @@
                 }
                 // reset metrics on connection
                 conn.getMetrics().reset();
-                if (!connStrategy.keepAlive(response, context)) {
+                if (context.getAttribute(NhttpConstants.DISCARD_ON_COMPLETE) != null) {
+                    try {
+                        // this is a connection we should not re-use
+                        conn.shutdown();
+                    } catch (Exception ignore) {}
+                } else if (!connStrategy.keepAlive(response, context)) {
                     conn.close();
                 } else {
                     ConnectionPool.release(conn);
@@ -507,13 +526,13 @@
             }
             if (!req.isSendingCompleted()) {
                 req.getMsgContext().setProperty(NhttpConstants.ERROR_CODE, NhttpConstants.SEND_ABORT);
-                req.setCompleted(true);
                 SharedOutputBuffer outputBuffer = (SharedOutputBuffer)
                         conn.getContext().getAttribute(REQUEST_SOURCE_BUFFER);
                 if (outputBuffer != null) {
                     outputBuffer.shutdown();
                 }
                 log.warn("Remote server aborted request being sent and replied : " + conn);
+                context.setAttribute(NhttpConstants.DISCARD_ON_COMPLETE, Boolean.TRUE);
                 if (metrics != null) {
                     metrics.incrementFaultsSending(NhttpConstants.SEND_ABORT, req.getMsgContext());
                 }
@@ -698,7 +717,11 @@
     // ----------- utility methods -----------
 
     private void handleException(String msg, Exception e, NHttpClientConnection conn) {
-        log.error(msg, e);
+        if (msg.toLowerCase().indexOf("reset") != -1) {
+            log.warn(msg);
+        } else {
+            log.error(msg, e);
+        }
         if (conn != null) {
             shutdownConnection(conn);
         }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java Tue Oct 28 23:09:27 2008
@@ -319,7 +319,15 @@
                     log.debug("A new connection established to : " + url.getHost() + ":" + port);
                 }
             } else {
-                handler.submitRequest(conn, axis2Req);
+                try {
+                    handler.submitRequest(conn, axis2Req);
+                } catch (ConnectionClosedException e) {
+                    ioReactor.connect(new InetSocketAddress(url.getHost(), port),
+                        null, axis2Req, sessionRequestCallback);
+                    if (log.isDebugEnabled()) {
+                        log.debug("A new connection established to : " + url.getHost() + ":" + port);
+                    }
+                }
                 if (log.isDebugEnabled()) {
                     log.debug("An existing connection reused to : " + url.getHost() + ":" + port);
                 }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/NhttpConstants.java Tue Oct 28 23:09:27 2008
@@ -28,6 +28,7 @@
     public static final String DISABLE_KEEPALIVE = "http.connection.disable.keepalive";
     public static final String IGNORE_SC_ACCEPTED = "IGNORE_SC_ACCEPTED";
     public static final String FORCE_SC_ACCEPTED = "FORCE_SC_ACCEPTED";
+    public static final String DISCARD_ON_COMPLETE = "DISCARD_ON_COMPLETE";
 
     public static final String WSDL_EPR_PREFIX = "WSDLEPRPrefix";
     public static final String REMOTE_HOST ="REMOTE_HOST";

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java Tue Oct 28 23:09:27 2008
@@ -201,7 +201,13 @@
             }
 
             if (encoder.isCompleted()) {
-                if (!connStrategy.keepAlive(response, context)) {
+                Boolean reqRead = (Boolean) conn.getContext().getAttribute(NhttpConstants.REQUEST_READ);
+                if (reqRead != null && !reqRead) {
+                    try {
+                        // this is a connection we should not re-use
+                        conn.close();
+                    } catch (Exception ignore) {}
+                } else if (!connStrategy.keepAlive(response, context)) {
                     conn.close();
                 } else {
                     conn.requestInput();
@@ -348,7 +354,13 @@
                         "was closed):" + e.getMessage());
             }
         } else {
-            log.error("I/O error: " + e.getMessage(), e);
+            String msg = e.getMessage().toLowerCase();
+            if (msg.indexOf("broken") != -1) {
+                log.warn("I/O error (Probably the connection " +
+                        "was closed by the remote party):" + e.getMessage());
+            } else {
+                log.error("I/O error: " + e.getMessage(), e);
+            }
             if (metrics != null) {
                 metrics.incrementFaultsReceiving();
             }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java?rev=708800&r1=708799&r2=708800&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/util/SharedOutputBuffer.java Tue Oct 28 23:09:27 2008
@@ -44,7 +44,7 @@
     private final Object mutex;
 
     private volatile boolean shutdown = false;
-    private volatile boolean endOfStream;
+    private volatile boolean endOfStream = false;
 
     public SharedOutputBuffer(int buffersize, final IOControl ioctrl, final ByteBufferAllocator allocator) {
         super(buffersize, allocator);
@@ -53,7 +53,6 @@
         }
         this.ioctrl = ioctrl;
         this.mutex = new Object();
-        this.endOfStream = false;
     }
 
     public void reset() {
@@ -96,10 +95,7 @@
     }
 
     public void close() {
-        if (this.shutdown) {
-            return;
-        }
-        writeCompleted();
+        shutdown();
     }
 
     public void shutdown() {
@@ -152,7 +148,7 @@
                 flushContent();
                 setInputMode();
             }
-            this.buffer.put((byte) b);
+            this.buffer.put((byte)b);
         }
     }
 
@@ -175,7 +171,7 @@
         }
     }
 
-    public void writeCompleted() {
+    public void writeCompleted() throws IOException {
         synchronized (this.mutex) {
             if (this.endOfStream) {
                 return;
@@ -185,4 +181,4 @@
         }
     }
 
-}
+}
\ No newline at end of file