You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2008/06/09 12:43:22 UTC

svn commit: r664672 [3/7] - in /synapse/trunk/java: ./ modules/core/ modules/core/src/main/java/org/apache/synapse/ modules/core/src/main/java/org/apache/synapse/config/ modules/core/src/main/java/org/apache/synapse/config/xml/ modules/core/src/main/ja...

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java Mon Jun  9 03:43:19 2008
@@ -64,97 +64,93 @@
             }
         }
 
-        // if no endpoints are defined, send where implicitly stated
-        if (endpoint == null) {
+        if (synCtx.isResponse()) {
 
-            if (traceOrDebugOn) {
-                StringBuffer sb = new StringBuffer();
-                sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
-                        + " message using implicit message properties..");
-                sb.append("\nSending To: " + (synCtx.getTo() != null ?
-                        synCtx.getTo().getAddress() : "null"));
-                sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
-                        synCtx.getWSAAction() : "null"));
-                traceOrDebug(traceOn, sb.toString());
-            }
+            Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
+            OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
 
-            if (traceOn && trace.isTraceEnabled()) {
-                trace.trace("Envelope : " + synCtx.getEnvelope());
-            }
+            boolean isClusteringEnable = false;
 
-            if (synCtx.isResponse()) {
+            // get Axis2 MessageContext and ConfigurationContext
+            org.apache.axis2.context.MessageContext axisMC =
+                    axis2MsgCtx.getAxis2MessageContext();
+            ConfigurationContext cc = axisMC.getConfigurationContext();
 
-                Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
-                OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
-                Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+            //The heck for clustering environment
 
-                if (o != null && o instanceof List) {
+            ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
+            if (clusterManager != null &&
+                    clusterManager.getContextManager() != null) {
+                isClusteringEnable = true;
+            }
 
-                    boolean isClusteringEnable = false;
+            if (isClusteringEnable) {
+                // if this is a clustering env.
+                // Only keeps endpoint names , because , it is heavy task to
+                // replicate endpoint itself
+                Object epNames = opCtx.getPropertyNonReplicable(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
+                if (epNames != null && epNames instanceof List) {
+
+                    List epNameList = (List) epNames;
+                    Object obj = epNameList.remove(0);
+                    if (obj != null && obj instanceof String) {
+                        Object rootEPObj = opCtx.getPropertyNonReplicable(
+                                SALoadbalanceEndpoint.ROOT_ENDPOINT);
+
+                        if (rootEPObj != null && rootEPObj instanceof Endpoint) {
+                            String name = ((Endpoint) rootEPObj).getName();
+
+                            if (name != null && name.equals(obj)) {
+                                Endpoint rootEP = ((Endpoint) rootEPObj);
+
+                                if (rootEP instanceof SALoadbalanceEndpoint) {
+                                    SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) rootEP;
+                                    salEP.updateSession(synCtx, epNameList,
+                                            isClusteringEnable);
+                                }
+                            }
+                        }
 
-                    // get Axis2 MessageContext and ConfigurationContext
-                    org.apache.axis2.context.MessageContext axisMC =
-                            axis2MsgCtx.getAxis2MessageContext();
-                    ConfigurationContext cc = axisMC.getConfigurationContext();
-
-                    //The heck for clustering environment
-
-                    ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
-                    if (clusterManager != null &&
-                            clusterManager.getContextManager() != null) {
-                        isClusteringEnable = true;
                     }
+                    opCtx.setProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST, epNames);
+                }
+
+            } else {
+                Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+                if (o != null && o instanceof List) {
                     // we are in the response of the first message of a server initiated session
                     // so update all session maps
                     List epList = (List) o;
+                    Object e = epList.remove(0);
 
-
-                    if (isClusteringEnable) {
-                        // if this is a clustering env.
-                        // Only keeps endpoint names , because , it is heavy task to
-                        // replicate endpoint itself
-                        Object epNames = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
-                        if (epNames != null && epNames instanceof List) {
-
-                            List epNameList = (List) epNames;
-                            Object obj = epNameList.remove(0);
-                            if (obj != null && obj instanceof String) {
-
-                                for (Iterator it = epList.iterator(); it.hasNext();) {
-                                    Object epObj = it.next();
-
-                                    if (epObj != null && epObj instanceof Endpoint) {
-                                        String name = ((Endpoint) epObj).getName();
-
-                                        if (name != null && name.equals(obj)) {
-                                            Endpoint ep = ((Endpoint) epObj);
-
-                                            if (ep instanceof SALoadbalanceEndpoint) {
-                                                SALoadbalanceEndpoint salEP
-                                                        = (SALoadbalanceEndpoint) ep;
-                                                salEP.updateSession(synCtx, epNameList,
-                                                        isClusteringEnable);
-                                            }
-                                        }
-                                    }
-                                }
-
-                            }
-                        }
-
-                    } else {
-                        Object e = epList.remove(0);
-
-                        if (e != null) {
-                            if (e instanceof SALoadbalanceEndpoint) {
-                                SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
-                                salEP.updateSession(synCtx, epList, isClusteringEnable);
-                            }
+                    if (e != null) {
+                        if (e instanceof SALoadbalanceEndpoint) {
+                            SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
+                            salEP.updateSession(synCtx, epList, isClusteringEnable);
                         }
                     }
                 }
             }
 
+        }
+
+        // if no endpoints are defined, send where implicitly stated
+        if (endpoint == null) {
+
+            if (traceOrDebugOn) {
+                StringBuffer sb = new StringBuffer();
+                sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
+                        + " message using implicit message properties..");
+                sb.append("\nSending To: " + (synCtx.getTo() != null ?
+                        synCtx.getTo().getAddress() : "null"));
+                sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
+                        synCtx.getWSAAction() : "null"));
+                traceOrDebug(traceOn, sb.toString());
+            }
+
+            if (traceOn && trace.isTraceEnabled()) {
+                trace.trace("Envelope : " + synCtx.getEnvelope());
+            }
             synCtx.getEnvironment().send(null, synCtx);
 
         } else {

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java Mon Jun  9 03:43:19 2008
@@ -106,64 +106,72 @@
     public synchronized boolean isComplete(boolean traceOn, boolean traceOrDebugOn,
         Log trace, Log log) {
 
-        // if any messages have been collected, check if the completion criteria is met
-        if (!messages.isEmpty()) {
+        if (!completed) {
 
-            // get total messages for this group, from the first message we have collected
-            MessageContext mc = messages.get(0);
-            Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
-            
-            if (prop != null && prop instanceof String) {
-                String[] msgSequence = prop.toString().split(
-                        EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
-                int total = Integer.parseInt(msgSequence[1]);
+            // if any messages have been collected, check if the completion criteria is met
+            if (!messages.isEmpty()) {
 
-                if (traceOrDebugOn) {
-                    traceOrDebug(traceOn, trace, log, messages.size() +
-                        " messages of " + total + " collected in current aggregation");
-                }
+                // get total messages for this group, from the first message we have collected
+                MessageContext mc = messages.get(0);
+                Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
+            
+                if (prop != null && prop instanceof String) {
+                    String[] msgSequence = prop.toString().split(
+                            EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
+                    int total = Integer.parseInt(msgSequence[1]);
 
-                if (messages.size() >= total) {
                     if (traceOrDebugOn) {
-                        traceOrDebug(traceOn, trace, log, "Aggregation complete");
+                        traceOrDebug(traceOn, trace, log, messages.size() +
+                                " messages of " + total + " collected in current aggregation");
+                    }
+
+                    if (messages.size() >= total) {
+                        if (traceOrDebugOn) {
+                            traceOrDebug(traceOn, trace, log, "Aggregation complete");
+                        }
+                        return true;
                     }
-                    return true;
+                }
+            } else {
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
                 }
             }
-        } else {
-            if (traceOrDebugOn) {
-                traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
-            }
-        }
 
-        // if the minimum number of messages has been reached, its complete
-        if (minCount > 0 && messages.size() >= minCount) {
-            if (traceOrDebugOn) {
-                traceOrDebug(traceOn, trace, log,
-                    "Aggregation complete - the minimum : " + minCount
-                            + " messages has been reached");
+            // if the minimum number of messages has been reached, its complete
+            if (minCount > 0 && messages.size() >= minCount) {
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, trace, log,
+                            "Aggregation complete - the minimum : " + minCount
+                                    + " messages has been reached");
+                }
+                return true;
             }
-            return true;
-        }
 
-        if (maxCount > 0 && messages.size() >= maxCount) {
-            if (traceOrDebugOn) {
-                traceOrDebug(traceOn, trace, log,
-                    "Aggregation complete - the maximum : " + maxCount
-                            + " messages has been reached");
+            if (maxCount > 0 && messages.size() >= maxCount) {
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, trace, log,
+                            "Aggregation complete - the maximum : " + maxCount
+                                    + " messages has been reached");
+                }
+
+                return true;
             }
 
-            return true;
-        }
+            // else, has this aggregation reached its timeout?
+            if (expiryTimeMillis > 0 && System.currentTimeMillis() >= expiryTimeMillis) {
+                if (traceOrDebugOn) {
+                    traceOrDebug(traceOn, trace, log,
+                            "Aggregation complete - the aggregation has timed out");
+                }
 
-        // else, has this aggregation reached its timeout?
-        if (expiryTimeMillis > 0 && System.currentTimeMillis() >= expiryTimeMillis) {
+                return true;
+            }
+        } else {
             if (traceOrDebugOn) {
                 traceOrDebug(traceOn, trace, log,
-                    "Aggregation complete - the aggregation has timed out");
+                        "Aggregation already completed - this message will not be processed in aggregation");
             }
-
-            return true;
         }
         
         return false;

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java Mon Jun  9 03:43:19 2008
@@ -34,6 +34,7 @@
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.ServerManager;
 import org.apache.synapse.startup.AbstractStartup;
 import org.quartz.CronTrigger;
 import org.quartz.JobDataMap;
@@ -92,7 +93,7 @@
         // this server name given by system property SynapseServerName
         // otherwise take host-name
         // else assume localhost
-        String thisServerName = System.getProperty(SynapseConstants.SYNAPSE_SERVER_NAME);
+        String thisServerName = ServerManager.getInstance().getServerName();
         if(thisServerName == null || thisServerName.equals("")) {
           try {
             InetAddress addr = InetAddress.getLocalHost();

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java Mon Jun  9 03:43:19 2008
@@ -514,7 +514,7 @@
         if ((result == null || result.length() == 0 || "".equals(result)) && def != null) {
             if (log.isDebugEnabled()) {
                 log.debug("The name with ' " + name + " ' cannot be found. " +
-                        "Using default vale " + def);
+                        "Using default value " + def);
             }
             result = def;
         }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java Mon Jun  9 03:43:19 2008
@@ -48,6 +48,7 @@
     public void removeLocalRegistry() {
         if (localRegistry != null) {
             try {
+                log.info("Removing the RMI registy instance from the RMI runtime ");
                 UnicastRemoteObject.unexportObject(localRegistry, true);
             } catch (NoSuchObjectException e) {
                 String msg = "Error when stoping localregistry(RMI)";

Modified: synapse/trunk/java/modules/core/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/resources/log4j.properties?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/resources/log4j.properties (original)
+++ synapse/trunk/java/modules/core/src/main/resources/log4j.properties Mon Jun  9 03:43:19 2008
@@ -35,6 +35,8 @@
 log4j.category.org.apache.synapse.transport=INFO
 log4j.category.org.apache.axis2.transport=INFO
 log4j.category.samples.util=INFO
+#log4j.category.org.apache.synapse.transport.nhttp.util=DEBUG
+#log4j.category.org.apache.http.impl.nio.reactor=DEBUG
 
 # The console appender is used to display general information at console
 log4j.appender.CONSOLE_APPENDER=org.apache.log4j.ConsoleAppender

Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java Mon Jun  9 03:43:19 2008
@@ -28,6 +28,7 @@
 import org.apache.synapse.mediators.TestUtils;
 
 import java.sql.SQLException;
+import java.io.File;
 
 public class DBLookupMediatorTest extends AbstractMediatorTestCase {
 
@@ -52,7 +53,9 @@
 
             protected void setUp() throws Exception {
 
-                String tempPath = System.getProperty("java.io.tmpdir");
+                File temp = File.createTempFile("temp", "delete");
+                temp.deleteOnExit();
+                String tempPath = temp.getParent();
 
                 lookup = (DBLookupMediator)
                     new DBLookupMediatorFactory().createMediator(createOMElement(

Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java Mon Jun  9 03:43:19 2008
@@ -30,6 +30,7 @@
 import java.sql.SQLException;
 import java.sql.Connection;
 import java.sql.ResultSet;
+import java.io.File;
 
 public class DBReportMediatorTest extends AbstractMediatorTestCase {    
 
@@ -57,7 +58,9 @@
 
             protected void setUp() throws Exception {
 
-                String tempPath = System.getProperty("java.io.tmpdir");
+                File temp = File.createTempFile("temp", "delete");
+                temp.deleteOnExit();
+                String tempPath = temp.getParent();
 
                 report = (DBReportMediator)
                     new DBReportMediatorFactory().createMediator(createOMElement(

Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java Mon Jun  9 03:43:19 2008
@@ -37,6 +37,7 @@
 import org.apache.axis2.engine.MessageReceiver;
 import org.apache.axis2.transport.TransportListener;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.ServerManager;
 import org.apache.synapse.util.xpath.SynapseXPath;
 import org.apache.synapse.utils.Services;
 
@@ -48,10 +49,11 @@
 public class SynapseCommodityServiceTest extends TestCase {
 
     protected void setUp() throws java.lang.Exception {
+        ServerManager.getInstance().setSynapseHome(".");
         // Initializing Synapse repository
-        System.setProperty(SynapseConstants.SYNAPSE_XML,
+        ServerManager.getInstance().setSynapseXMLPath(
                            "./../../repository/conf/sample/resources/misc/synapse.xml");
-        System.setProperty(org.apache.axis2.Constants.AXIS2_CONF,
+        ServerManager.getInstance().setAxis2Xml(
                            "./../../repository/conf/axis2.xml");
 
         findAndReplace(

Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java Mon Jun  9 03:43:19 2008
@@ -30,9 +30,9 @@
     public void testClonePartially() throws Exception {
         String key = "propKey";
         MessageContext origMc = new MessageContext();
-        origMc.setProperty( key, "propValue" );
-        MessageContext newMc = MessageHelper.clonePartially( origMc );
-        Object result = newMc.getProperty( key );
+        origMc.setProperty(key, "propValue");
+        MessageContext newMc = MessageHelper.clonePartially(origMc);
+        Object result = newMc.getProperty(key);
         assertEquals(result, "propValue");
     }
 }

Modified: synapse/trunk/java/modules/extensions/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/extensions/pom.xml (original)
+++ synapse/trunk/java/modules/extensions/pom.xml Mon Jun  9 03:43:19 2008
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.synapse</groupId>
         <artifactId>Apache-Synapse</artifactId>
-        <version>1.2-SNAPSHOT</version>
+        <version>SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

Modified: synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java (original)
+++ synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java Mon Jun  9 03:43:19 2008
@@ -336,10 +336,11 @@
             String includeKey = (String) iter.next();
             String includeSourceCode = (String) includes.get(includeKey);
             Entry includeEntry = synCtx.getConfiguration().getEntryDefinition(includeKey);
-            boolean includeEntryNeedsReload = (entry != null) && entry.isDynamic()
-                    && (!entry.isCached() || entry.isExpired());
+            boolean includeEntryNeedsReload = (includeEntry != null) && includeEntry.isDynamic()
+                    && (!includeEntry.isCached() || includeEntry.isExpired());
             synchronized (resourceLock) {
-                if (includeSourceCode == null || needsReload) {
+                if (includeSourceCode == null || includeEntryNeedsReload) {
+                    log.debug("Re-/Loading the include script with key " + includeKey);
                     Object o = synCtx.getEntry(includeKey);
                     if (o instanceof OMElement) {
                         includeSourceCode = ((OMElement) (o)).getText();

Modified: synapse/trunk/java/modules/handler/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/handler/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/handler/pom.xml (original)
+++ synapse/trunk/java/modules/handler/pom.xml Mon Jun  9 03:43:19 2008
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.synapse</groupId>
         <artifactId>Apache-Synapse</artifactId>
-        <version>1.2-SNAPSHOT</version>
+        <version>SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

Modified: synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java (original)
+++ synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java Mon Jun  9 03:43:19 2008
@@ -29,6 +29,7 @@
 import org.apache.neethi.Policy;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
+import org.apache.synapse.ServerManager;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.core.axis2.SynapseInitializationModule;
 import org.apache.synapse.core.SynapseEnvironment;
@@ -67,10 +68,10 @@
     public void init(ConfigurationContext configurationContext,
                      AxisModule axisModule) throws AxisFault {
         if (System.getProperty(SynapseConstants.SYNAPSE_XML) == null) {
-            System.setProperty(SynapseConstants.SYNAPSE_XML, configurationContext.
+            ServerManager.getInstance().setSynapseXMLPath(configurationContext.
                     getAxisConfiguration().getRepository().getPath() + "/conf/synapse.xml");
         }
-        if (new File(System.getProperty(SynapseConstants.SYNAPSE_XML)).exists()) {
+        if (new File(ServerManager.getInstance().getSynapseXMLPath()).exists()) {
             initializationModule = new org.apache.synapse.core.axis2.SynapseInitializationModule();
             initializationModule.init(configurationContext, axisModule);
 
@@ -96,7 +97,7 @@
         } else {
             handleException("Unable to initialize the Synapse initializationModule. Couldn't " +
                     "find the configuration file in the location "
-                    + System.getProperty(SynapseConstants.SYNAPSE_XML));
+                    + ServerManager.getInstance().getSynapseXMLPath());
         }
     }
 

Modified: synapse/trunk/java/modules/mar/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/mar/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/mar/pom.xml (original)
+++ synapse/trunk/java/modules/mar/pom.xml Mon Jun  9 03:43:19 2008
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.synapse</groupId>
         <artifactId>Apache-Synapse</artifactId>
-        <version>1.2-SNAPSHOT</version>
+        <version>SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

Modified: synapse/trunk/java/modules/samples/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/pom.xml (original)
+++ synapse/trunk/java/modules/samples/pom.xml Mon Jun  9 03:43:19 2008
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.synapse</groupId>
         <artifactId>Apache-Synapse</artifactId>
-        <version>1.2-SNAPSHOT</version>
+        <version>SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

Modified: synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl (original)
+++ synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl Mon Jun  9 03:43:19 2008
@@ -114,7 +114,7 @@
 	</wsdl:binding>
 	<wsdl:service name="FastStockQuoteService">
 		<wsdl:port name="FastStockQuoteServiceSOAP11port" binding="axis2:FastStockQuoteServiceSOAP11Binding">
-			<soap:address location="http://localhost:8080/soap/FastStockQuoteService"/>
+			<soap:address location="http://localhost:8280/soap/FastStockQuoteService"/>
 		</wsdl:port>
 	</wsdl:service>
 </wsdl:definitions>

Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java Mon Jun  9 03:43:19 2008
@@ -112,8 +112,9 @@
 
         options.setAction("urn:mediate");
         serviceClient.setOptions(options);
-        serviceClient.sendReceive(message);
+        OMElement response = serviceClient.sendReceive(message);
         Thread.sleep(5000);
+        System.out.println("Response Received: " + response.toString());
 
         try {
             if (configContext != null) {

Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java Mon Jun  9 03:43:19 2008
@@ -74,7 +74,7 @@
 
     public String sessionlessClient() throws AxisFault {
 
-        String synapsePort = "8080";
+        String synapsePort = "8280";
         int iterations = 100;
         boolean infinite = true;
 
@@ -176,7 +176,7 @@
      */
     private void sessionfullClient() {
 
-        String synapsePort = "8080";
+        String synapsePort = "8280";
         int iterations = 100;
         boolean infinite = true;
 
@@ -302,7 +302,7 @@
         envelope.addChild(header);
 
         OMNamespace synNamespace = soapFactory.
-            createOMNamespace("http://ws.apache.org/namespaces/synapse", "syn");
+            createOMNamespace("http://ws.apache.org/ns/synapse", "syn");
         OMElement clientIDElement = soapFactory.createOMElement("ClientID", synNamespace);
         clientIDElement.setText(clientID);
         header.addChild(clientIDElement);

Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java Mon Jun  9 03:43:19 2008
@@ -51,7 +51,7 @@
 
     public static void main(String[] args) throws Exception {
 
-        String targetEPR = getProperty("opt_url", "http://localhost:8080/soap/MTOMSwASampleService");
+        String targetEPR = getProperty("opt_url", "http://localhost:8280/soap/MTOMSwASampleService");
         String fileName = getProperty("opt_file", "./../../repository/conf/sample/resources/mtom/asf-logo.gif");
         String mode = getProperty("opt_mode", "mtom");
 

Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java Mon Jun  9 03:43:19 2008
@@ -44,7 +44,7 @@
 
         String epr1 = System.getProperty("epr");
         if (epr1 == null) {
-            epr1 = "http://localhost:8080";
+            epr1 = "http://localhost:8280";
         }
         System.out.println("EPR: " + epr1);
 

Modified: synapse/trunk/java/modules/samples/src/main/scripts/build.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/scripts/build.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/scripts/build.xml (original)
+++ synapse/trunk/java/modules/samples/src/main/scripts/build.xml Mon Jun  9 03:43:19 2008
@@ -32,8 +32,8 @@
         ant stockquote [-Dsymbol=IBM|MSFT|SUN|..]
             [-Dmode=quote | customquote | fullquote | placeorder | marketactivity]
             [-Daddurl=http://localhost:9000/soap/SimpleStockQuoteService]
-            [-Dtrpurl=http://localhost:8080]
-            [-Dprxurl=http://localhost:8080]
+            [-Dtrpurl=http://localhost:8280]
+            [-Dprxurl=http://localhost:8280]
             [-Drest=true]
             [-Dwsrm=true]
             [-Dpolicy=../../repository/conf/sample/resources/policy/policy_1.xml]
@@ -59,14 +59,14 @@
 
         examples:
         ant optimizeclient [-Dopt_mode=mtom | swa]
-            [-Dopt_url=http://localhost:8080/soap/MTOMSwASampleService]
+            [-Dopt_url=http://localhost:8280/soap/MTOMSwASampleService]
             [-Dopt_file=./../../repository/conf/sample/resources/mtom/asf-logo.gif]
 
     ant fixclient
         A client which could post a FIX message (of type Order-Single) embedded into a SOAP message.
 
 	example:
-        ant fixclient -Dsymbol=IBM -Dqty=5 -Dmode=buy -Daddurl=http://localhost:8080/soap/FIXProxy
+        ant fixclient -Dsymbol=IBM -Dqty=5 -Dmode=buy -Daddurl=http://localhost:8280/soap/FIXProxy
     </echo>
     </target>
 
@@ -124,11 +124,11 @@
     <!-- a simple smoke test -->
     <target name="smoke" depends="compile">
         <property name="addurl" value="http://localhost:9000/soap/SimpleStockQuoteService"/>
-        <property name="trpurl" value="http://localhost:8080/"/>
+        <property name="trpurl" value="http://localhost:8280/"/>
         <java classname="samples.userguide.StockQuoteClient"
               classpathref="javac.classpath" fork="true">
             <sysproperty key="addurl" value="http://localhost:9000/soap/SimpleStockQuoteService"/>
-            <sysproperty key="trpurl" value="http://localhost:8080/"/>
+            <sysproperty key="trpurl" value="http://localhost:8280/"/>
             <sysproperty key="java.io.tmpdir" value="./../../work/temp/sampleClient"/>
         </java>
     </target>

Modified: synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java (original)
+++ synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java Mon Jun  9 03:43:19 2008
@@ -49,7 +49,7 @@
     }
 
     protected void setUpSynapseEnv() {
-        System.setProperty("port", "8080");
+        System.setProperty("port", "8280");
         System.setProperty("org.apache.xerces.xni.parser.XMLParserConfiguration",
                 "org.apache.xerces.parsers.XMLGrammarCachingConfiguration");
         System.setProperty("axis2.xml", "modules/samples/target/test_repos/synapse/conf/axis2.xml");
@@ -88,5 +88,5 @@
 
     protected final String SYNAPSE_REPO = "modules/samples/target/test_repos/synapse";
     protected final String SAMPLE_CONFIG_ROOT_PATH = "repository/conf/sample/";
-    protected final String SYNAPSE_BASE_URL = "http://localhost:8080/";
+    protected final String SYNAPSE_BASE_URL = "http://localhost:8280/";
 }

Modified: synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java (original)
+++ synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java Mon Jun  9 03:43:19 2008
@@ -15,7 +15,7 @@
     }
 
     public void testSample() throws Exception {
-        System.setProperty("trpurl", "http://localhost:8080/soap/StockQuoteProxy");
+        System.setProperty("trpurl", "http://localhost:8280/soap/StockQuoteProxy");
         try {
             getStringResultOfTest(StockQuoteClient.executeTestClient());
         } catch (AxisFault f) {
@@ -23,7 +23,7 @@
                     "/soap/StockQuoteProxy", f.getReason());
         }
 
-        System.setProperty("trpurl", "https://localhost:8443/soap/StockQuoteProxy");
+        System.setProperty("trpurl", "https://localhost:8243/soap/StockQuoteProxy");
         String resultString = getStringResultOfTest(StockQuoteClient.executeTestClient());
         assertXpathExists("ns:getQuoteResponse", resultString);
         assertXpathExists("ns:getQuoteResponse/ns:return", resultString);

Modified: synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java (original)
+++ synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java Mon Jun  9 03:43:19 2008
@@ -17,7 +17,7 @@
     public void testSample() throws Exception {
         System.setProperty("opt_mode", "mtom");
         OMElement response = MTOMSwAClient.sendUsingMTOM(
-                "./../../repository/conf/sample/resources/mtom/asf-logo.gif", "http://localhost:8080/soap/MTOMSwASampleService");
+                "./../../repository/conf/sample/resources/mtom/asf-logo.gif", "http://localhost:8280/soap/MTOMSwASampleService");
 //        assertXpathExists("ns:getQuoteResponse", resultString);
 //        assertXpathExists("ns:getQuoteResponse/ns:return", resultString);
     }

Modified: synapse/trunk/java/modules/transports/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/pom.xml (original)
+++ synapse/trunk/java/modules/transports/pom.xml Mon Jun  9 03:43:19 2008
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.synapse</groupId>
         <artifactId>Apache-Synapse</artifactId>
-        <version>1.2-SNAPSHOT</version>
+        <version>SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java Mon Jun  9 03:43:19 2008
@@ -78,6 +78,14 @@
                     return;
                 }
 
+                if (state == BaseConstants.STOPPED) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Transport " + transportName +
+                                " onPoll() trigger : Transport is currently stopped..");
+                    }
+                    return;
+                }
+
                 workerPool.execute(new Runnable() {
                     public void run() {
                         synchronized (pollLock) {

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java Mon Jun  9 03:43:19 2008
@@ -142,7 +142,7 @@
         if (state != BaseConstants.STARTED) {
             state = BaseConstants.STARTED;
             // register to receive updates on services for lifetime management
-            cfgCtx.getAxisConfiguration().addObservers(axisObserver);
+            // cfgCtx.getAxisConfiguration().addObservers(axisObserver);
         }
         log.info(transportName.toUpperCase() + " Listener started");
 
@@ -428,7 +428,6 @@
     
     /**
      * Utility method to allow transports to register MBeans
-     * @param mbs the MBeanServer
      * @param mbeanInstance bean instance
      * @param objectName name
      */
@@ -452,9 +451,12 @@
     private void unregisterMBean(String objectName) {
         try {
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-            mbs.unregisterMBean(new ObjectName(objectName));
+            ObjectName objName = new ObjectName(objectName);
+            if (mbs.isRegistered(objName)) {
+                mbs.unregisterMBean(objName);
+            }
         } catch (Exception e) {
-            log.warn("Error registering a MBean with objectname ' " + objectName +
+            log.warn("Error un-registering a MBean with objectname ' " + objectName +
                 " ' for JMX management", e);
         }
     }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java Mon Jun  9 03:43:19 2008
@@ -194,16 +194,19 @@
      * @param contentType
      * @throws AxisFault on errors encountered while setting the envelope to the message context
      */
-    public void setSOAPEnvelope(Object message, MessageContext msgContext, String contentType) throws AxisFault {
+    public void setSOAPEnvelope(Object message, MessageContext msgContext, String contentType)
+            throws AxisFault {
 
         SOAPEnvelope envelope = null;
         StAXBuilder builder = null;
 
-        String charSetEnc;
+        String charSetEnc = null;
         try {
-            charSetEnc = new ContentType(contentType).getParameter("charset");
+            if (contentType != null) {
+                charSetEnc = new ContentType(contentType).getParameter("charset");
+            }
         } catch (ParseException ex) {
-            charSetEnc = null;
+            // ignore
         }
         
         InputStream in = getInputStream(message);
@@ -223,7 +226,9 @@
         } catch (Exception ignore) {
             try {
                 in.close();
-            } catch (IOException e) {}
+            } catch (IOException e) {
+                // ignore
+            }
             in = getInputStream(message);
         }
 
@@ -282,7 +287,7 @@
     private SOAPEnvelope handleLegacyMessage(MessageContext msgContext, Object message) {
 
         SOAPFactory soapFactory = new SOAP11Factory();
-        SOAPEnvelope envelope = null;
+        SOAPEnvelope envelope;
 
         if (log.isDebugEnabled()) {
             log.debug("Non SOAP/XML message received");
@@ -392,8 +397,8 @@
 
         } else {
             List transports = service.getExposedTransports();
-            for (int i = 0; i < transports.size(); i++) {
-                if (transportName.equals(transports.get(i))) {
+            for (Object transport : transports) {
+                if (transportName.equals(transport)) {
                     return true;
                 }
             }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java Mon Jun  9 03:43:19 2008
@@ -19,16 +19,19 @@
 import org.apache.axis2.Constants;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.*;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
 import org.apache.synapse.transport.base.AbstractTransportListener;
+import org.apache.synapse.transport.base.BaseConstants;
 import org.apache.synapse.transport.base.BaseUtils;
 import org.apache.synapse.transport.base.ManagementSupport;
-import org.apache.synapse.transport.base.BaseConstants;
-import org.apache.commons.logging.LogFactory;
 
 import javax.jms.JMSException;
 import javax.naming.NamingException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * The JMS Transport listener implementation. A JMS Listner will hold one or
@@ -133,6 +136,10 @@
         if (serviceName.indexOf('/') != -1) {
             serviceName = serviceName.substring(0, serviceName.indexOf('/'));
         }
+        // strip out the endpoint name if present
+        if (serviceName.indexOf('.') != -1) {
+            serviceName = serviceName.substring(0, serviceName.indexOf('.'));
+        }
         return new EndpointReference[]{
             new EndpointReference((String) serviceNameToEPRMap.get(serviceName))};
     }
@@ -165,8 +172,8 @@
 
         String destinationType = JMSUtils.getDestinationTypeForService(service);
         
-        log.info("Starting to listen on destination : " + destinationName + " of type " + destinationType
-            + " for service " + service.getName());
+        log.info("Starting to listen on destination : " + destinationName + " of type "
+                + destinationType + " for service " + service.getName());
         cf.addDestination(destinationName, destinationType, service.getName());
         cf.startListeningOnDestination(destinationName, destinationType);
     }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java Mon Jun  9 03:43:19 2008
@@ -227,6 +227,7 @@
      * @param property property name
      * @return property value
      */
+    @Override
     public String getProperty(Object message, String property) {
         try {
             return ((Message)message).getStringProperty(property);
@@ -310,6 +311,7 @@
      * @param message the JMS message
      * @return an InputStream to the payload
      */
+    @Override
     public InputStream getInputStream(Object message) {
 
         try {
@@ -682,6 +684,7 @@
     }
 
 
+    @Override
     public String getMessageTextPayload(Object message) {
         if (message instanceof TextMessage) {
             try {
@@ -693,6 +696,7 @@
         return null;
     }
 
+    @Override
     public byte[] getMessageBinaryPayload(Object message) {
 
         if (message instanceof BytesMessage) {

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java Mon Jun  9 03:43:19 2008
@@ -58,8 +58,10 @@
     public static final String DELETE = "DELETE";
     public static final String MOVE = "MOVE";
 
-    /** Keep the list of directories/files and poll durations */
-    private final List pollTable = new ArrayList();
+    /** Keep the list of email accounts and poll durations */
+    private final List<PollTableEntry> pollTable = new ArrayList<PollTableEntry>();
+    /** Keep the list of removed pollTable entries */
+    private final List<PollTableEntry> removeTable = new ArrayList<PollTableEntry>();
 
     /**
      * Initializes the Mail transport
@@ -79,11 +81,12 @@
      * it is time to scan the contents for new files
      */
     public void onPoll() {
-        Iterator iter = pollTable.iterator();
-        while (iter.hasNext()) {
-            PollTableEntry entry = (PollTableEntry) iter.next();
-            long startTime = System.currentTimeMillis();
+        if (!removeTable.isEmpty()) {
+            pollTable.removeAll(removeTable);
+        }
 
+        for (PollTableEntry entry : pollTable) {
+            long startTime = System.currentTimeMillis();
             if (startTime > entry.getNextPollTime()) {
                 checkMail(entry, entry.getEmailAddress());
             }
@@ -473,12 +476,10 @@
      * @throws AxisFault not used
      */
     public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
-        Iterator iter = pollTable.iterator();
-        while (iter.hasNext()) {
-            PollTableEntry entry = (PollTableEntry) iter.next();
-            if (entry.getServiceName().equals(serviceName)) {
-                return new EndpointReference[]{
-                    new EndpointReference(
+        for (PollTableEntry entry : pollTable) {
+            if (entry.getServiceName().equals(serviceName) ||
+                    serviceName.startsWith(entry.getServiceName() + ".")) {
+                return new EndpointReference[]{new EndpointReference(
                         MailConstants.TRANSPORT_PREFIX + entry.getEmailAddress())};
             }
         }
@@ -581,12 +582,10 @@
     }
 
     protected void stopListeningForService(AxisService service) {
-        Iterator iter = pollTable.iterator();
-        while (iter.hasNext()) {
-            PollTableEntry entry = (PollTableEntry) iter.next();
+        for (PollTableEntry entry : pollTable) {
             if (service.getName().equals(entry.getServiceName())) {
                 cancelPoll(service);
-                pollTable.remove(entry);
+                removeTable.add(entry);
             }
         }
     }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java Mon Jun  9 03:43:19 2008
@@ -50,6 +50,7 @@
      * @param property property name
      * @return property value
      */
+    @Override
     public String getProperty(Object message, String property) {
         try {
             Object o = ((Message) message).getHeader(property);
@@ -62,6 +63,7 @@
         return null;
     }
 
+    @Override
     public InputStream getInputStream(Object message) {
         try {
             if (message instanceof MimeMessage) {
@@ -100,6 +102,7 @@
         return null;
     }
 
+    @Override
     public String getMessageTextPayload(Object message) {
         try {
             return new String(
@@ -113,6 +116,7 @@
         return null;
     }
 
+    @Override
     public byte[] getMessageBinaryPayload(Object message) {
         try {
             return getBytesFromInputStream(getInputStream(message), ((Message) message).getSize());

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java Mon Jun  9 03:43:19 2008
@@ -31,20 +31,19 @@
 import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
 import org.apache.http.HttpVersion;
+import org.apache.http.nio.util.ContentOutputBuffer;
+import org.apache.http.nio.entity.ContentOutputStream;
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.message.BasicHttpEntityEnclosingRequest;
 import org.apache.http.message.BasicHttpRequest;
 import org.apache.http.protocol.HTTP;
 import org.apache.synapse.transport.nhttp.util.MessageFormatterDecoratorFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
 import org.apache.synapse.transport.nhttp.util.RESTUtil;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
+import java.nio.channels.ClosedChannelException;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -63,13 +62,15 @@
     private HttpHost httpHost = null;
     /** the message context being sent */
     private MessageContext msgContext = null;
-    /** the Pipe which facilitates the serialization output to be written to the channel */
-    private PipeImpl pipe = null;
     /** The Axis2 MessageFormatter that will ensure proper serialization as per Axis2 semantics */
     MessageFormatter messageFormatter = null;
     /** The OM Output format holder */
     OMOutputFormat format = null;
-    protected boolean completed = false; //added for request complete checking
+    private ContentOutputBuffer outputBuffer = null;
+    /** ready to begin streaming? */
+    private boolean readyToStream = false;
+    /** for request complete checking */
+    private boolean completed = false;
 
     public Axis2HttpRequest(EndpointReference epr, HttpHost httpHost, MessageContext msgContext) {
         this.epr = epr;
@@ -78,11 +79,23 @@
         this.format = NhttpUtils.getOMOutputFormat(msgContext);
         this.messageFormatter =
                 MessageFormatterDecoratorFactory.createMessageFormatterDecorator(msgContext);
-        try {
-            this.pipe = new PipeImpl();
-        } catch (IOException e) {
-            log.error("Error creating pipe to write message body", e);
-        }
+    }
+
+    public void setReadyToStream(boolean readyToStream) {
+        this.readyToStream = readyToStream;
+    }
+    
+    public void setOutputBuffer(ContentOutputBuffer outputBuffer) {
+        this.outputBuffer = outputBuffer;
+    }
+
+    public void clear() {
+        this.epr = null;
+        this.httpHost = null;
+        this.msgContext = null;
+        this.format = null;
+        this.messageFormatter = null;
+        this.outputBuffer = null;
     }
 
     public EndpointReference getEpr() {
@@ -120,7 +133,7 @@
                     "POST", epr.getAddress(), HttpVersion.HTTP_1_0);
                 
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                messageFormatter.writeTo(msgContext, format, baos, true);
+                messageFormatter.writeTo(msgContext, format, baos, false);
                 BasicHttpEntity entity = new BasicHttpEntity();
                 entity.setContentLength(baos.toByteArray().length);
                 ((BasicHttpEntityEnclosingRequest) httpRequest).setEntity(entity);
@@ -184,28 +197,6 @@
     }
 
     /**
-     * Return the source channel of the pipe that bridges the serialized output to the socket
-     * @return source channel to read serialized message contents
-     */
-    public ReadableByteChannel getSourceChannel() {
-        if (log.isDebugEnabled()) {
-            log.debug("get source channel of the pipe on which the outgoing response is written");
-        }
-        return pipe.source();
-    }
-
-    /**
-     * Return the sink channel of the pipe that bridges the serialized output to the socket
-     * @return sink channel to read serialized message contents
-     */
-    public WritableByteChannel getSinkChannel() {
-        if (log.isDebugEnabled()) {
-            log.debug("get sink channel of the pipe on which the outgoing response is written");
-        }
-        return pipe.sink();
-    }
-
-    /**
      * Start streaming the message into the Pipe, so that the contents could be read off the source
      * channel returned by getSourceChannel()
      * @throws AxisFault on error
@@ -215,24 +206,41 @@
         if (log.isDebugEnabled()) {
             log.debug("start streaming outgoing http request");
         }
-        OutputStream out = Channels.newOutputStream(pipe.sink());
-        try {
-            messageFormatter.writeTo(msgContext, format, out, true);
-        } catch (Exception e) {
-            /* close PipeImpl will manually raise exception
-               while streaming, so blocking status will be released */
-            if (e instanceof AxisFault) {
-                throw (AxisFault) e;
-            } else {
-                handleException("Error streaming message context", e);
+
+        synchronized(this) {
+            while (!readyToStream && !completed) {
+                try {
+                    this.wait();
+                } catch (InterruptedException ignore) {}
             }
         }
-        finally {
+
+        if (!completed) {
+            OutputStream out = new ContentOutputStream(outputBuffer);
             try {
-                out.flush();
-                out.close();
-            } catch (IOException e) {
-                handleException("Error closing outgoing message stream", e);
+                messageFormatter.writeTo(msgContext, format, out, false);
+            } catch (Exception e) {
+                Throwable t = e.getCause();
+                if (t != null && t.getCause() != null && t.getCause() instanceof ClosedChannelException) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Ignore closed channel exception, as the " +
+                            "SessionRequestCallback handles this exception");
+                    }
+                } else {
+                    if (e instanceof AxisFault) {
+                        throw (AxisFault) e;
+                    } else {
+                        handleException("Error streaming message context", e);
+                    }
+                }
+            }
+            finally {
+                try {
+                    out.flush();
+                    out.close();
+                } catch (IOException e) {
+                    handleException("Error closing outgoing message stream", e);
+                }
             }
         }
     }
@@ -248,9 +256,9 @@
     }
 
     public void setCompleted(boolean completed) {
-        if (completed && !isCompleted()) {
-            this.pipe.close();
-        }
         this.completed = completed;
+        synchronized(this) {
+            this.notifyAll();
+        }
     }
 }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Mon Jun  9 03:43:19 2008
@@ -39,20 +39,17 @@
 import org.apache.http.nio.ContentEncoder;
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.nio.NHttpClientHandler;
+import org.apache.http.nio.entity.ContentInputStream;
+import org.apache.http.nio.util.*;
 import org.apache.http.params.DefaultedHttpParams;
 import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.*;
 import org.apache.synapse.transport.base.MetricsCollector;
 import org.apache.synapse.transport.base.threads.WorkerPool;
 import org.apache.synapse.transport.base.threads.WorkerPoolFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
+import org.apache.synapse.transport.nhttp.util.SharedInputBuffer;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
 
 /**
  * The client connection handler. An instance of this class is used by each IOReactor, to
@@ -69,6 +66,8 @@
     private final HttpProcessor httpProcessor;
     /** the connection re-use strategy */
     private final ConnectionReuseStrategy connStrategy;
+    /** the buffer allocator */
+    private final ByteBufferAllocator allocator;
 
     /** the Axis2 configuration context */
     ConfigurationContext cfgCtx = null;
@@ -79,15 +78,12 @@
     /** the metrics collector */
     private MetricsCollector metrics = null;
 
-    private static final String REQUEST_BUFFER = "request-buffer";
-    private static final String RESPONSE_BUFFER = "response-buffer";
-    private static final String OUTGOING_MESSAGE_CONTEXT = "axis2_message_context";
-    private static final String REQUEST_SOURCE_CHANNEL = "request-source-channel";
-    private static final String RESPONSE_SINK_CHANNEL = "response-sink-channel";
-    private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
-    private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
+    public static final String OUTGOING_MESSAGE_CONTEXT = "synapse.axis2_message_context";
+    public static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
+
+    public static final String REQUEST_SOURCE_BUFFER = "synapse.request-source-buffer";
+    public static final String RESPONSE_SINK_BUFFER = "synapse.response-sink-buffer";
 
-    private static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
     private static final String CONTENT_TYPE = "Content-Type";
 
     /**
@@ -105,6 +101,7 @@
         this.httpProcessor = getHttpProcessor();
         this.connStrategy = new DefaultConnectionReuseStrategy();
         this.metrics = metrics;
+        this.allocator = new HeapByteBufferAllocator();
 
         this.cfg = NHttpConfiguration.getInstance();
         workerPool = WorkerPoolFactory.getWorkerPool(
@@ -129,13 +126,14 @@
 
         try {
             HttpContext context = conn.getContext();
+            ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+            axis2Req.setOutputBuffer(outputBuffer);
+            context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);            
 
+            context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
             context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
             context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
             context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
-            context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
-            context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
 
             HttpRequest request = axis2Req.getRequest();
             request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -144,6 +142,11 @@
             conn.submitRequest(request);
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
 
+            synchronized(axis2Req) {
+                axis2Req.setReadyToStream(true);
+                axis2Req.notifyAll();
+            }
+
         } catch (IOException e) {
             handleException("I/O Error : " + e.getMessage(), e, conn);
         } catch (HttpException e) {
@@ -165,18 +168,14 @@
         try {
             HttpContext context = conn.getContext();
             Axis2HttpRequest axis2Req = (Axis2HttpRequest) attachment;
+            ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+            axis2Req.setOutputBuffer(outputBuffer);
+            context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);
 
             context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
             context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
             context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
-            // allocate temporary buffers to process this request
-            context.setAttribute(REQUEST_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-            context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-
             context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
-            context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
-            context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
 
             HttpRequest request = axis2Req.getRequest();
             request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -185,6 +184,11 @@
             conn.submitRequest(request);
             context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
 
+            synchronized(axis2Req) {
+                axis2Req.setReadyToStream(true);
+                axis2Req.notifyAll();
+            }
+
         } catch (IOException e) {
             handleException("I/O Error : " + e.getMessage(), e, conn);
         } catch (HttpException e) {
@@ -193,33 +197,18 @@
     }
 
     public void closed(final NHttpClientConnection conn) {
-    	checkAxisRequestComplete(conn, "Abnormal connection close", null);
-
-        // Check sink and source channels and close them if they aren't closed already.
-        // Normally these should be closed by inputReady() and outputReady(). A null request
-        // or response will not hit inputReady and outputReady however.
+        ConnectionPool.forget(conn);
+        checkAxisRequestComplete(conn, "Abnormal connection close", null);
 
         HttpContext context = conn.getContext();
-        closeChannel((ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL));
-        // Note: We do not close the RESPONSE_SOURCE_CHANNEL at this point in time as its closed
-        // by the ClientWorker:run() in the finally block, after the response was processed
-        // fix for https://issues.apache.org/jira/browse/SYNAPSE-289
-        closeChannel((WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL));
-        closeChannel((WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL));
-        
+        context.removeAttribute(RESPONSE_SINK_BUFFER);
+        context.removeAttribute(REQUEST_SOURCE_BUFFER);        
+
         if (log.isTraceEnabled()) {
             log.trace("Connection closed");
         }
     }
 
-    private void closeChannel(Channel chn) {
-        try {
-            if (chn != null && chn.isOpen()) {
-                chn.close();
-            }
-        } catch (IOException ignore) {}
-    }
-
     /**
      * Handle connection timeouts by shutting down the connections
      * @param conn the connection being processed
@@ -272,47 +261,48 @@
      * @param exceptionToRaise an Exception to be returned to the MR on failure
      */
     private void checkAxisRequestComplete(NHttpClientConnection conn,
-        String errorMessage, Exception exceptionToRaise) {
+        final String errorMessage, final Exception exceptionToRaise) {
 
         Axis2HttpRequest axis2Request = (Axis2HttpRequest)
                 conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
 
-        if (axis2Request == null) {
-            log.error("httpContext's AXIS2_HTTP_REQUEST attribute was null");
-
-        } else if (!axis2Request.isCompleted()) {
+        if (axis2Request != null && !axis2Request.isCompleted()) {
 
             axis2Request.setCompleted(true);
             if (errorMessage == null && exceptionToRaise == null) {
                 return; // no need to continue
             }
 
-            MessageContext mc = axis2Request.getMsgContext();
+            final MessageContext mc = axis2Request.getMsgContext();
 
             if (mc.getAxisOperation() != null &&
                     mc.getAxisOperation().getMessageReceiver() != null) {
 
-                MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
-                try {
-                    MessageContext nioFaultMessageContext = null;
-                    if (errorMessage != null) {
-                        nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
-                            mc, new AxisFault(errorMessage));
-                    } else if (exceptionToRaise != null) {
-                        nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
-                            /** this is not a mistake I do NOT want getMessage()*/
-                            mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
-                    }
+                workerPool.execute( new Runnable() {
+                    public void run() {
+                        MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+                        try {
+                            MessageContext nioFaultMessageContext = null;
+                            if (errorMessage != null) {
+                                nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+                                    mc, new AxisFault(errorMessage));
+                            } else if (exceptionToRaise != null) {
+                                nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+                                    /** this is not a mistake I do NOT want getMessage()*/
+                                    mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
+                            }
+
+                            if (nioFaultMessageContext != null) {
+                                nioFaultMessageContext.setProperty(
+                                        NhttpConstants.SENDING_FAULT, Boolean.TRUE);
+                                mr.receive(nioFaultMessageContext);
+                            }
 
-                    if (nioFaultMessageContext != null) {
-                        nioFaultMessageContext.setProperty(
-                                NhttpConstants.SENDING_FAULT, Boolean.TRUE);
-                        mr.receive(nioFaultMessageContext);
+                        } catch (AxisFault af) {
+                            log.error("Unable to report back failure to the message receiver", af);
+                        }
                     }
-
-                } catch (AxisFault af) {
-                    log.error("Unable to report back failure to the message receiver", af);
-                }
+                });
             }
         }
     }
@@ -325,25 +315,18 @@
     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
         HttpContext context = conn.getContext();
         HttpResponse response = conn.getHttpResponse();
-        WritableByteChannel sink
-                = (WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL);
-        ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
+        ContentInputBuffer inBuf = (ContentInputBuffer) context.getAttribute(RESPONSE_SINK_BUFFER);
 
         try {
-            while (decoder.read(inbuf) > 0) {
-                inbuf.flip();
-                sink.write(inbuf);
-                if (metrics != null) {
-                    metrics.incrementBytesReceived(inbuf.position());
-                }
-                inbuf.compact();
+            int bytesRead = inBuf.consumeContent(decoder);
+            if (metrics != null && bytesRead > 0) {
+                metrics.incrementBytesReceived(bytesRead);
             }
 
             if (decoder.isCompleted()) {
                 if (metrics != null) {
                     metrics.incrementMessagesReceived();
                 }
-                if (sink != null) sink.close();
                 if (!connStrategy.keepAlive(response, context)) {
                     conn.close();
                 } else {
@@ -364,28 +347,18 @@
     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
         HttpContext context = conn.getContext();
 
-        ReadableByteChannel source
-                = (ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL);
-        ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
+        ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(REQUEST_SOURCE_BUFFER);
 
         try {
-            int bytesRead = source.read(outbuf);
-            if (bytesRead == -1) {
-                encoder.complete();
-            } else {
-                outbuf.flip();
-                encoder.write(outbuf);
-                if (metrics != null) {
-                    metrics.incrementBytesSent(outbuf.position());
-                }
-                outbuf.compact();
+            int bytesWritten = outBuf.produceContent(encoder);
+            if (metrics != null && bytesWritten > 0) {
+                metrics.incrementBytesSent(bytesWritten);
             }
 
             if (encoder.isCompleted()) {
                 if (metrics != null) {
                     metrics.incrementMessagesSent();
                 }
-                source.close();
             }
 
         } catch (IOException e) {
@@ -413,6 +386,11 @@
                     log.debug("Received a 202 Accepted response");
                 }
 
+                // sometimes, some http clients sends an "\r\n" as the content body with a
+                // HTTP 202 OK.. we will just get it into this temp buffer and ignore it..
+                ContentInputBuffer inputBuffer = new SharedInputBuffer(8, conn, allocator);
+                context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
+
                 // create a dummy message with an empty SOAP envelope and a property
                 // NhttpConstants.SC_ACCEPTED set to Boolean.TRUE to indicate this is a
                 // placeholder message for the transport to send a HTTP 202 to the
@@ -556,25 +534,24 @@
     private void processResponse(final NHttpClientConnection conn, HttpContext context,
         HttpResponse response) {
 
-        try {
-            PipeImpl responsePipe = new PipeImpl();
-            context.setAttribute(RESPONSE_SINK_CHANNEL, responsePipe.sink());
-            context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe.source());
-
-            BasicHttpEntity entity = new BasicHttpEntity();
-            if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
-                entity.setChunked(true);
-            }
-            response.setEntity(entity);
-            context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
-
-            workerPool.execute(
-                new ClientWorker(cfgCtx, Channels.newInputStream(responsePipe.source()), response,
-                    (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
+        ContentInputBuffer inputBuffer = new SharedInputBuffer(cfg.getBufferSize(), conn, allocator);
+        context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
 
-        } catch (IOException e) {
-            handleException("I/O Error : " + e.getMessage(), e, conn);
+        BasicHttpEntity entity = new BasicHttpEntity();
+        if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
+            entity.setChunked(true);
         }
+        response.setEntity(entity);
+        context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
+
+        workerPool.execute(
+            new ClientWorker(cfgCtx, new ContentInputStream(inputBuffer), response,
+                (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
+
+    }
+
+    public void execute(Runnable task) {
+        workerPool.execute(task);        
     }
 
     // ----------- utility methods -----------

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java Mon Jun  9 03:43:19 2008
@@ -192,6 +192,7 @@
             // used at the sender to set the propper status code when passing the message
             responseMsgCtx.setProperty(NhttpConstants.HTTP_SC,
                     this.response.getStatusLine().getStatusCode());
+            responseMsgCtx.setProperty(NhttpConstants.NON_BLOCKING_TRANSPORT, true);
 
             // process response received
             AxisEngine engine = new AxisEngine(cfgCtx);

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java Mon Jun  9 03:43:19 2008
@@ -20,6 +20,7 @@
 
 import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.protocol.ExecutionContext;
+import org.apache.http.protocol.HttpContext;
 import org.apache.http.HttpHost;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,15 +55,16 @@
                 while (!connections.isEmpty()) {
                     conn = (NHttpClientConnection) connections.remove(0);
 
-                    if (conn.isOpen()) {
+                    if (conn.isOpen() && !conn.isStale()) {
                         if (log.isDebugEnabled()) {
                             log.debug("A connection to host : " + host + " on port : " +
                                 port + " is available in the pool, and will be reused");
                         }
+                        conn.requestInput(); // asankha - make sure keep alives work properly when reused with throttling
                         return conn;
                     } else {
                         if (log.isDebugEnabled()) {
-                            log.debug("closing stale connection");
+                            log.debug("closing stale connection to : " + host + ":" + port);
                         }
                         try {
                             conn.close();
@@ -93,6 +95,7 @@
             }
         }
 
+        cleanConnectionReferences(conn);
         connections.add(conn);
 
         if (log.isDebugEnabled()) {
@@ -100,4 +103,37 @@
                     host.getPort() + " to the connection pool of current size : " + connections.size());
         }
     }
+
+    private static void cleanConnectionReferences(NHttpClientConnection conn) {
+
+        HttpContext ctx = conn.getContext();        
+        Axis2HttpRequest axis2Req =
+            (Axis2HttpRequest) ctx.getAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+        axis2Req.clear();   // this is linked via the selection key attachment and will free itself
+                            // on timeout of the keep alive connection. Till then minimize the
+                            // memory usage to a few bytes 
+
+        ctx.removeAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+        ctx.removeAttribute(ClientHandler.OUTGOING_MESSAGE_CONTEXT);
+        ctx.removeAttribute(ClientHandler.REQUEST_SOURCE_BUFFER);
+        ctx.removeAttribute(ClientHandler.RESPONSE_SINK_BUFFER);
+
+        ctx.removeAttribute(ExecutionContext.HTTP_REQUEST);
+        ctx.removeAttribute(ExecutionContext.HTTP_RESPONSE);
+        ctx.removeAttribute(ExecutionContext.HTTP_CONNECTION);
+    }
+
+    public static void forget(NHttpClientConnection conn) {
+
+        HttpHost host = (HttpHost) conn.getContext().getAttribute(
+            ExecutionContext.HTTP_TARGET_HOST);
+        String key = host.getHostName() + ":" + Integer.toString(host.getPort());
+
+        List connections = (List) connMap.get(key);
+        if (connections != null) {
+            synchronized(connections) {
+                connections.remove(conn);
+            }
+        }
+    }
 }

Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java Mon Jun  9 03:43:19 2008
@@ -67,8 +67,8 @@
 
     /** The EPR prefix for services available over this transport */
     private String serviceEPRPrefix;
-    /** The port to listen on, defaults to 8080 */
-    private int port = 8080;
+    /** The port to listen on, defaults to 8280 */
+    private int port = 8280;
     /** The hostname to use, defaults to localhost */
     private String host = "localhost";
     /** The bind addresses as (address, port) pairs */