You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/06/02 06:56:14 UTC

svn commit: r662321 - in /synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp: Axis2HttpRequest.java ClientHandler.java HttpCoreNIOSender.java

Author: asankha
Date: Sun Jun  1 21:56:13 2008
New Revision: 662321

URL: http://svn.apache.org/viewvc?rev=662321&view=rev
Log:
fix SYNAPSE-338

Modified:
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
    synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java?rev=662321&r1=662320&r2=662321&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java Sun Jun  1 21:56:13 2008
@@ -208,39 +208,39 @@
         }
 
         synchronized(this) {
-            while (!readyToStream) {
+            while (!readyToStream && !completed) {
                 try {
                     this.wait();
                 } catch (InterruptedException ignore) {}
             }
         }
 
-        OutputStream out = new ContentOutputStream(outputBuffer);
-        try {
-            messageFormatter.writeTo(msgContext, format, out, true);
-        } catch (Exception e) {
-            Throwable t = e.getCause();
-            if (t != null && t.getCause() != null && t.getCause() instanceof ClosedChannelException) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Ignore closed channel exception, as the " +
-                        "SessionRequestCallback handles this exception");
-                }                
-            } else {
-                /* close PipeImpl will manually raise exception
-                   while streaming, so blocking status will be released */
-                if (e instanceof AxisFault) {
-                    throw (AxisFault) e;
+        if (!completed) {
+            OutputStream out = new ContentOutputStream(outputBuffer);
+            try {
+                messageFormatter.writeTo(msgContext, format, out, true);
+            } catch (Exception e) {
+                Throwable t = e.getCause();
+                if (t != null && t.getCause() != null && t.getCause() instanceof ClosedChannelException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Ignore closed channel exception, as the " +
+                            "SessionRequestCallback handles this exception");
+                    }
                 } else {
-                    handleException("Error streaming message context", e);
+                    if (e instanceof AxisFault) {
+                        throw (AxisFault) e;
+                    } else {
+                        handleException("Error streaming message context", e);
+                    }
                 }
             }
-        }
-        finally {
-            try {
-                out.flush();
-                out.close();
-            } catch (IOException e) {
-                handleException("Error closing outgoing message stream", e);
+            finally {
+                try {
+                    out.flush();
+                    out.close();
+                } catch (IOException e) {
+                    handleException("Error closing outgoing message stream", e);
+                }
             }
         }
     }
@@ -257,5 +257,8 @@
 
     public void setCompleted(boolean completed) {
         this.completed = completed;
+        synchronized(this) {
+            this.notifyAll();
+        }
     }
 }

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=662321&r1=662320&r2=662321&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Sun Jun  1 21:56:13 2008
@@ -261,7 +261,7 @@
      * @param exceptionToRaise an Exception to be returned to the MR on failure
      */
     private void checkAxisRequestComplete(NHttpClientConnection conn,
-        String errorMessage, Exception exceptionToRaise) {
+        final String errorMessage, final Exception exceptionToRaise) {
 
         Axis2HttpRequest axis2Request = (Axis2HttpRequest)
                 conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
@@ -273,32 +273,36 @@
                 return; // no need to continue
             }
 
-            MessageContext mc = axis2Request.getMsgContext();
+            final MessageContext mc = axis2Request.getMsgContext();
 
             if (mc.getAxisOperation() != null &&
                     mc.getAxisOperation().getMessageReceiver() != null) {
 
-                MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
-                try {
-                    MessageContext nioFaultMessageContext = null;
-                    if (errorMessage != null) {
-                        nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
-                            mc, new AxisFault(errorMessage));
-                    } else if (exceptionToRaise != null) {
-                        nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
-                            /** this is not a mistake I do NOT want getMessage()*/
-                            mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
-                    }
+                workerPool.execute( new Runnable() {
+                    public void run() {
+                        MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+                        try {
+                            MessageContext nioFaultMessageContext = null;
+                            if (errorMessage != null) {
+                                nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+                                    mc, new AxisFault(errorMessage));
+                            } else if (exceptionToRaise != null) {
+                                nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+                                    /** this is not a mistake I do NOT want getMessage()*/
+                                    mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
+                            }
+
+                            if (nioFaultMessageContext != null) {
+                                nioFaultMessageContext.setProperty(
+                                        NhttpConstants.SENDING_FAULT, Boolean.TRUE);
+                                mr.receive(nioFaultMessageContext);
+                            }
 
-                    if (nioFaultMessageContext != null) {
-                        nioFaultMessageContext.setProperty(
-                                NhttpConstants.SENDING_FAULT, Boolean.TRUE);
-                        mr.receive(nioFaultMessageContext);
+                        } catch (AxisFault af) {
+                            log.error("Unable to report back failure to the message receiver", af);
+                        }
                     }
-
-                } catch (AxisFault af) {
-                    log.error("Unable to report back failure to the message receiver", af);
-                }
+                });
             }
         }
     }
@@ -546,6 +550,10 @@
 
     }
 
+    public void execute(Runnable task) {
+        workerPool.execute(task);        
+    }
+
     // ----------- utility methods -----------
 
     private void handleException(String msg, Exception e, NHttpClientConnection conn) {

Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=662321&r1=662320&r2=662321&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java Sun Jun  1 21:56:13 2008
@@ -476,7 +476,7 @@
      * related to the outgoing request
      * @return a Session request callback
      */
-    private static SessionRequestCallback getSessionRequestCallback() {
+    private SessionRequestCallback getSessionRequestCallback() {
         return new SessionRequestCallback() {
             public void completed(SessionRequest request) {
             }
@@ -503,7 +503,7 @@
 
                         axis2Request.setCompleted(true);
                         MessageContext mc = axis2Request.getMsgContext();
-                        MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+                        final MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
 
                         if (mr != null) {
                             try {
@@ -520,10 +520,19 @@
                                     /** this is not a mistake I do NOT want getMessage()*/
                                     axisFault = new AxisFault(exception.toString(), exception);
                                 }
-                                MessageContext nioFaultMessageContext =
+                                final MessageContext nioFaultMessageContext =
                                     MessageContextBuilder.createFaultMessageContext(mc, axisFault);
                                 nioFaultMessageContext.setProperty(NhttpConstants.SENDING_FAULT, Boolean.TRUE);
-                                mr.receive(nioFaultMessageContext);
+
+                                handler.execute(new Runnable() {
+                                    public void run() {
+                                        try {
+                                            mr.receive(nioFaultMessageContext);
+                                        } catch (AxisFault af) {
+                                            log.error("Error processing fault message context", af);
+                                        }
+                                    }
+                                });
 
                             } catch (AxisFault af) {
                                 log.error("Unable to report back failure to the message receiver", af);