You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2008/06/09 12:43:22 UTC
svn commit: r664672 [3/7] - in /synapse/trunk/java: ./ modules/core/
modules/core/src/main/java/org/apache/synapse/
modules/core/src/main/java/org/apache/synapse/config/
modules/core/src/main/java/org/apache/synapse/config/xml/
modules/core/src/main/ja...
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java Mon Jun 9 03:43:19 2008
@@ -64,97 +64,93 @@
}
}
- // if no endpoints are defined, send where implicitly stated
- if (endpoint == null) {
+ if (synCtx.isResponse()) {
- if (traceOrDebugOn) {
- StringBuffer sb = new StringBuffer();
- sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
- + " message using implicit message properties..");
- sb.append("\nSending To: " + (synCtx.getTo() != null ?
- synCtx.getTo().getAddress() : "null"));
- sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
- synCtx.getWSAAction() : "null"));
- traceOrDebug(traceOn, sb.toString());
- }
+ Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
+ OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
- if (traceOn && trace.isTraceEnabled()) {
- trace.trace("Envelope : " + synCtx.getEnvelope());
- }
+ boolean isClusteringEnable = false;
- if (synCtx.isResponse()) {
+ // get Axis2 MessageContext and ConfigurationContext
+ org.apache.axis2.context.MessageContext axisMC =
+ axis2MsgCtx.getAxis2MessageContext();
+ ConfigurationContext cc = axisMC.getConfigurationContext();
- Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
- OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
- Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+ //The heck for clustering environment
- if (o != null && o instanceof List) {
+ ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
+ if (clusterManager != null &&
+ clusterManager.getContextManager() != null) {
+ isClusteringEnable = true;
+ }
- boolean isClusteringEnable = false;
+ if (isClusteringEnable) {
+ // if this is a clustering env.
+ // Only keeps endpoint names , because , it is heavy task to
+ // replicate endpoint itself
+ Object epNames = opCtx.getPropertyNonReplicable(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
+ if (epNames != null && epNames instanceof List) {
+
+ List epNameList = (List) epNames;
+ Object obj = epNameList.remove(0);
+ if (obj != null && obj instanceof String) {
+ Object rootEPObj = opCtx.getPropertyNonReplicable(
+ SALoadbalanceEndpoint.ROOT_ENDPOINT);
+
+ if (rootEPObj != null && rootEPObj instanceof Endpoint) {
+ String name = ((Endpoint) rootEPObj).getName();
+
+ if (name != null && name.equals(obj)) {
+ Endpoint rootEP = ((Endpoint) rootEPObj);
+
+ if (rootEP instanceof SALoadbalanceEndpoint) {
+ SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) rootEP;
+ salEP.updateSession(synCtx, epNameList,
+ isClusteringEnable);
+ }
+ }
+ }
- // get Axis2 MessageContext and ConfigurationContext
- org.apache.axis2.context.MessageContext axisMC =
- axis2MsgCtx.getAxis2MessageContext();
- ConfigurationContext cc = axisMC.getConfigurationContext();
-
- //The heck for clustering environment
-
- ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
- if (clusterManager != null &&
- clusterManager.getContextManager() != null) {
- isClusteringEnable = true;
}
+ opCtx.setProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST, epNames);
+ }
+
+ } else {
+ Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+ if (o != null && o instanceof List) {
// we are in the response of the first message of a server initiated session
// so update all session maps
List epList = (List) o;
+ Object e = epList.remove(0);
-
- if (isClusteringEnable) {
- // if this is a clustering env.
- // Only keeps endpoint names , because , it is heavy task to
- // replicate endpoint itself
- Object epNames = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
- if (epNames != null && epNames instanceof List) {
-
- List epNameList = (List) epNames;
- Object obj = epNameList.remove(0);
- if (obj != null && obj instanceof String) {
-
- for (Iterator it = epList.iterator(); it.hasNext();) {
- Object epObj = it.next();
-
- if (epObj != null && epObj instanceof Endpoint) {
- String name = ((Endpoint) epObj).getName();
-
- if (name != null && name.equals(obj)) {
- Endpoint ep = ((Endpoint) epObj);
-
- if (ep instanceof SALoadbalanceEndpoint) {
- SALoadbalanceEndpoint salEP
- = (SALoadbalanceEndpoint) ep;
- salEP.updateSession(synCtx, epNameList,
- isClusteringEnable);
- }
- }
- }
- }
-
- }
- }
-
- } else {
- Object e = epList.remove(0);
-
- if (e != null) {
- if (e instanceof SALoadbalanceEndpoint) {
- SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
- salEP.updateSession(synCtx, epList, isClusteringEnable);
- }
+ if (e != null) {
+ if (e instanceof SALoadbalanceEndpoint) {
+ SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
+ salEP.updateSession(synCtx, epList, isClusteringEnable);
}
}
}
}
+ }
+
+ // if no endpoints are defined, send where implicitly stated
+ if (endpoint == null) {
+
+ if (traceOrDebugOn) {
+ StringBuffer sb = new StringBuffer();
+ sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
+ + " message using implicit message properties..");
+ sb.append("\nSending To: " + (synCtx.getTo() != null ?
+ synCtx.getTo().getAddress() : "null"));
+ sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
+ synCtx.getWSAAction() : "null"));
+ traceOrDebug(traceOn, sb.toString());
+ }
+
+ if (traceOn && trace.isTraceEnabled()) {
+ trace.trace("Envelope : " + synCtx.getEnvelope());
+ }
synCtx.getEnvironment().send(null, synCtx);
} else {
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java Mon Jun 9 03:43:19 2008
@@ -106,64 +106,72 @@
public synchronized boolean isComplete(boolean traceOn, boolean traceOrDebugOn,
Log trace, Log log) {
- // if any messages have been collected, check if the completion criteria is met
- if (!messages.isEmpty()) {
+ if (!completed) {
- // get total messages for this group, from the first message we have collected
- MessageContext mc = messages.get(0);
- Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
-
- if (prop != null && prop instanceof String) {
- String[] msgSequence = prop.toString().split(
- EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
- int total = Integer.parseInt(msgSequence[1]);
+ // if any messages have been collected, check if the completion criteria is met
+ if (!messages.isEmpty()) {
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, trace, log, messages.size() +
- " messages of " + total + " collected in current aggregation");
- }
+ // get total messages for this group, from the first message we have collected
+ MessageContext mc = messages.get(0);
+ Object prop = mc.getProperty(EIPConstants.MESSAGE_SEQUENCE);
+
+ if (prop != null && prop instanceof String) {
+ String[] msgSequence = prop.toString().split(
+ EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
+ int total = Integer.parseInt(msgSequence[1]);
- if (messages.size() >= total) {
if (traceOrDebugOn) {
- traceOrDebug(traceOn, trace, log, "Aggregation complete");
+ traceOrDebug(traceOn, trace, log, messages.size() +
+ " messages of " + total + " collected in current aggregation");
+ }
+
+ if (messages.size() >= total) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log, "Aggregation complete");
+ }
+ return true;
}
- return true;
+ }
+ } else {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
}
}
- } else {
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, trace, log, "No messages collected in current aggregation");
- }
- }
- // if the minimum number of messages has been reached, its complete
- if (minCount > 0 && messages.size() >= minCount) {
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, trace, log,
- "Aggregation complete - the minimum : " + minCount
- + " messages has been reached");
+ // if the minimum number of messages has been reached, its complete
+ if (minCount > 0 && messages.size() >= minCount) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log,
+ "Aggregation complete - the minimum : " + minCount
+ + " messages has been reached");
+ }
+ return true;
}
- return true;
- }
- if (maxCount > 0 && messages.size() >= maxCount) {
- if (traceOrDebugOn) {
- traceOrDebug(traceOn, trace, log,
- "Aggregation complete - the maximum : " + maxCount
- + " messages has been reached");
+ if (maxCount > 0 && messages.size() >= maxCount) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log,
+ "Aggregation complete - the maximum : " + maxCount
+ + " messages has been reached");
+ }
+
+ return true;
}
- return true;
- }
+ // else, has this aggregation reached its timeout?
+ if (expiryTimeMillis > 0 && System.currentTimeMillis() >= expiryTimeMillis) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, trace, log,
+ "Aggregation complete - the aggregation has timed out");
+ }
- // else, has this aggregation reached its timeout?
- if (expiryTimeMillis > 0 && System.currentTimeMillis() >= expiryTimeMillis) {
+ return true;
+ }
+ } else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, trace, log,
- "Aggregation complete - the aggregation has timed out");
+ "Aggregation already completed - this message will not be processed in aggregation");
}
-
- return true;
}
return false;
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/startup/quartz/SimpleQuartz.java Mon Jun 9 03:43:19 2008
@@ -34,6 +34,7 @@
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.ServerManager;
import org.apache.synapse.startup.AbstractStartup;
import org.quartz.CronTrigger;
import org.quartz.JobDataMap;
@@ -92,7 +93,7 @@
// this server name given by system property SynapseServerName
// otherwise take host-name
// else assume localhost
- String thisServerName = System.getProperty(SynapseConstants.SYNAPSE_SERVER_NAME);
+ String thisServerName = ServerManager.getInstance().getServerName();
if(thisServerName == null || thisServerName.equals("")) {
try {
InetAddress addr = InetAddress.getLocalHost();
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/DataSourceRegistrar.java Mon Jun 9 03:43:19 2008
@@ -514,7 +514,7 @@
if ((result == null || result.length() == 0 || "".equals(result)) && def != null) {
if (log.isDebugEnabled()) {
log.debug("The name with ' " + name + " ' cannot be found. " +
- "Using default vale " + def);
+ "Using default value " + def);
}
result = def;
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/util/RMIRegistryController.java Mon Jun 9 03:43:19 2008
@@ -48,6 +48,7 @@
public void removeLocalRegistry() {
if (localRegistry != null) {
try {
+ log.info("Removing the RMI registy instance from the RMI runtime ");
UnicastRemoteObject.unexportObject(localRegistry, true);
} catch (NoSuchObjectException e) {
String msg = "Error when stoping localregistry(RMI)";
Modified: synapse/trunk/java/modules/core/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/resources/log4j.properties?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/resources/log4j.properties (original)
+++ synapse/trunk/java/modules/core/src/main/resources/log4j.properties Mon Jun 9 03:43:19 2008
@@ -35,6 +35,8 @@
log4j.category.org.apache.synapse.transport=INFO
log4j.category.org.apache.axis2.transport=INFO
log4j.category.samples.util=INFO
+#log4j.category.org.apache.synapse.transport.nhttp.util=DEBUG
+#log4j.category.org.apache.http.impl.nio.reactor=DEBUG
# The console appender is used to display general information at console
log4j.appender.CONSOLE_APPENDER=org.apache.log4j.ConsoleAppender
Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBLookupMediatorTest.java Mon Jun 9 03:43:19 2008
@@ -28,6 +28,7 @@
import org.apache.synapse.mediators.TestUtils;
import java.sql.SQLException;
+import java.io.File;
public class DBLookupMediatorTest extends AbstractMediatorTestCase {
@@ -52,7 +53,9 @@
protected void setUp() throws Exception {
- String tempPath = System.getProperty("java.io.tmpdir");
+ File temp = File.createTempFile("temp", "delete");
+ temp.deleteOnExit();
+ String tempPath = temp.getParent();
lookup = (DBLookupMediator)
new DBLookupMediatorFactory().createMediator(createOMElement(
Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/mediators/db/DBReportMediatorTest.java Mon Jun 9 03:43:19 2008
@@ -30,6 +30,7 @@
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.ResultSet;
+import java.io.File;
public class DBReportMediatorTest extends AbstractMediatorTestCase {
@@ -57,7 +58,9 @@
protected void setUp() throws Exception {
- String tempPath = System.getProperty("java.io.tmpdir");
+ File temp = File.createTempFile("temp", "delete");
+ temp.deleteOnExit();
+ String tempPath = temp.getParent();
report = (DBReportMediator)
new DBReportMediatorFactory().createMediator(createOMElement(
Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/n2n/SynapseCommodityServiceTest.java Mon Jun 9 03:43:19 2008
@@ -37,6 +37,7 @@
import org.apache.axis2.engine.MessageReceiver;
import org.apache.axis2.transport.TransportListener;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.ServerManager;
import org.apache.synapse.util.xpath.SynapseXPath;
import org.apache.synapse.utils.Services;
@@ -48,10 +49,11 @@
public class SynapseCommodityServiceTest extends TestCase {
protected void setUp() throws java.lang.Exception {
+ ServerManager.getInstance().setSynapseHome(".");
// Initializing Synapse repository
- System.setProperty(SynapseConstants.SYNAPSE_XML,
+ ServerManager.getInstance().setSynapseXMLPath(
"./../../repository/conf/sample/resources/misc/synapse.xml");
- System.setProperty(org.apache.axis2.Constants.AXIS2_CONF,
+ ServerManager.getInstance().setAxis2Xml(
"./../../repository/conf/axis2.xml");
findAndReplace(
Modified: synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java (original)
+++ synapse/trunk/java/modules/core/src/test/java/org/apache/synapse/util/MessageHelperTest.java Mon Jun 9 03:43:19 2008
@@ -30,9 +30,9 @@
public void testClonePartially() throws Exception {
String key = "propKey";
MessageContext origMc = new MessageContext();
- origMc.setProperty( key, "propValue" );
- MessageContext newMc = MessageHelper.clonePartially( origMc );
- Object result = newMc.getProperty( key );
+ origMc.setProperty(key, "propValue");
+ MessageContext newMc = MessageHelper.clonePartially(origMc);
+ Object result = newMc.getProperty(key);
assertEquals(result, "propValue");
}
}
Modified: synapse/trunk/java/modules/extensions/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/extensions/pom.xml (original)
+++ synapse/trunk/java/modules/extensions/pom.xml Mon Jun 9 03:43:19 2008
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.synapse</groupId>
<artifactId>Apache-Synapse</artifactId>
- <version>1.2-SNAPSHOT</version>
+ <version>SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java (original)
+++ synapse/trunk/java/modules/extensions/src/main/java/org/apache/synapse/mediators/bsf/ScriptMediator.java Mon Jun 9 03:43:19 2008
@@ -336,10 +336,11 @@
String includeKey = (String) iter.next();
String includeSourceCode = (String) includes.get(includeKey);
Entry includeEntry = synCtx.getConfiguration().getEntryDefinition(includeKey);
- boolean includeEntryNeedsReload = (entry != null) && entry.isDynamic()
- && (!entry.isCached() || entry.isExpired());
+ boolean includeEntryNeedsReload = (includeEntry != null) && includeEntry.isDynamic()
+ && (!includeEntry.isCached() || includeEntry.isExpired());
synchronized (resourceLock) {
- if (includeSourceCode == null || needsReload) {
+ if (includeSourceCode == null || includeEntryNeedsReload) {
+ log.debug("Re-/Loading the include script with key " + includeKey);
Object o = synCtx.getEntry(includeKey);
if (o instanceof OMElement) {
includeSourceCode = ((OMElement) (o)).getText();
Modified: synapse/trunk/java/modules/handler/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/handler/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/handler/pom.xml (original)
+++ synapse/trunk/java/modules/handler/pom.xml Mon Jun 9 03:43:19 2008
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.synapse</groupId>
<artifactId>Apache-Synapse</artifactId>
- <version>1.2-SNAPSHOT</version>
+ <version>SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java (original)
+++ synapse/trunk/java/modules/handler/src/main/java/org/apache/synapse/handler/SynapseModule.java Mon Jun 9 03:43:19 2008
@@ -29,6 +29,7 @@
import org.apache.neethi.Policy;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
+import org.apache.synapse.ServerManager;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.axis2.SynapseInitializationModule;
import org.apache.synapse.core.SynapseEnvironment;
@@ -67,10 +68,10 @@
public void init(ConfigurationContext configurationContext,
AxisModule axisModule) throws AxisFault {
if (System.getProperty(SynapseConstants.SYNAPSE_XML) == null) {
- System.setProperty(SynapseConstants.SYNAPSE_XML, configurationContext.
+ ServerManager.getInstance().setSynapseXMLPath(configurationContext.
getAxisConfiguration().getRepository().getPath() + "/conf/synapse.xml");
}
- if (new File(System.getProperty(SynapseConstants.SYNAPSE_XML)).exists()) {
+ if (new File(ServerManager.getInstance().getSynapseXMLPath()).exists()) {
initializationModule = new org.apache.synapse.core.axis2.SynapseInitializationModule();
initializationModule.init(configurationContext, axisModule);
@@ -96,7 +97,7 @@
} else {
handleException("Unable to initialize the Synapse initializationModule. Couldn't " +
"find the configuration file in the location "
- + System.getProperty(SynapseConstants.SYNAPSE_XML));
+ + ServerManager.getInstance().getSynapseXMLPath());
}
}
Modified: synapse/trunk/java/modules/mar/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/mar/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/mar/pom.xml (original)
+++ synapse/trunk/java/modules/mar/pom.xml Mon Jun 9 03:43:19 2008
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.synapse</groupId>
<artifactId>Apache-Synapse</artifactId>
- <version>1.2-SNAPSHOT</version>
+ <version>SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: synapse/trunk/java/modules/samples/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/pom.xml (original)
+++ synapse/trunk/java/modules/samples/pom.xml Mon Jun 9 03:43:19 2008
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.synapse</groupId>
<artifactId>Apache-Synapse</artifactId>
- <version>1.2-SNAPSHOT</version>
+ <version>SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl (original)
+++ synapse/trunk/java/modules/samples/services/FastStockQuoteService/wsdl/FastStockQuoteService.wsdl Mon Jun 9 03:43:19 2008
@@ -114,7 +114,7 @@
</wsdl:binding>
<wsdl:service name="FastStockQuoteService">
<wsdl:port name="FastStockQuoteServiceSOAP11port" binding="axis2:FastStockQuoteServiceSOAP11Binding">
- <soap:address location="http://localhost:8080/soap/FastStockQuoteService"/>
+ <soap:address location="http://localhost:8280/soap/FastStockQuoteService"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/FIXClient.java Mon Jun 9 03:43:19 2008
@@ -112,8 +112,9 @@
options.setAction("urn:mediate");
serviceClient.setOptions(options);
- serviceClient.sendReceive(message);
+ OMElement response = serviceClient.sendReceive(message);
Thread.sleep(5000);
+ System.out.println("Response Received: " + response.toString());
try {
if (configContext != null) {
Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/LoadbalanceFailoverClient.java Mon Jun 9 03:43:19 2008
@@ -74,7 +74,7 @@
public String sessionlessClient() throws AxisFault {
- String synapsePort = "8080";
+ String synapsePort = "8280";
int iterations = 100;
boolean infinite = true;
@@ -176,7 +176,7 @@
*/
private void sessionfullClient() {
- String synapsePort = "8080";
+ String synapsePort = "8280";
int iterations = 100;
boolean infinite = true;
@@ -302,7 +302,7 @@
envelope.addChild(header);
OMNamespace synNamespace = soapFactory.
- createOMNamespace("http://ws.apache.org/namespaces/synapse", "syn");
+ createOMNamespace("http://ws.apache.org/ns/synapse", "syn");
OMElement clientIDElement = soapFactory.createOMElement("ClientID", synNamespace);
clientIDElement.setText(clientID);
header.addChild(clientIDElement);
Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MTOMSwAClient.java Mon Jun 9 03:43:19 2008
@@ -51,7 +51,7 @@
public static void main(String[] args) throws Exception {
- String targetEPR = getProperty("opt_url", "http://localhost:8080/soap/MTOMSwASampleService");
+ String targetEPR = getProperty("opt_url", "http://localhost:8280/soap/MTOMSwASampleService");
String fileName = getProperty("opt_file", "./../../repository/conf/sample/resources/mtom/asf-logo.gif");
String mode = getProperty("opt_mode", "mtom");
Modified: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java (original)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/ThreadedClient.java Mon Jun 9 03:43:19 2008
@@ -44,7 +44,7 @@
String epr1 = System.getProperty("epr");
if (epr1 == null) {
- epr1 = "http://localhost:8080";
+ epr1 = "http://localhost:8280";
}
System.out.println("EPR: " + epr1);
Modified: synapse/trunk/java/modules/samples/src/main/scripts/build.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/scripts/build.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/scripts/build.xml (original)
+++ synapse/trunk/java/modules/samples/src/main/scripts/build.xml Mon Jun 9 03:43:19 2008
@@ -32,8 +32,8 @@
ant stockquote [-Dsymbol=IBM|MSFT|SUN|..]
[-Dmode=quote | customquote | fullquote | placeorder | marketactivity]
[-Daddurl=http://localhost:9000/soap/SimpleStockQuoteService]
- [-Dtrpurl=http://localhost:8080]
- [-Dprxurl=http://localhost:8080]
+ [-Dtrpurl=http://localhost:8280]
+ [-Dprxurl=http://localhost:8280]
[-Drest=true]
[-Dwsrm=true]
[-Dpolicy=../../repository/conf/sample/resources/policy/policy_1.xml]
@@ -59,14 +59,14 @@
examples:
ant optimizeclient [-Dopt_mode=mtom | swa]
- [-Dopt_url=http://localhost:8080/soap/MTOMSwASampleService]
+ [-Dopt_url=http://localhost:8280/soap/MTOMSwASampleService]
[-Dopt_file=./../../repository/conf/sample/resources/mtom/asf-logo.gif]
ant fixclient
A client which could post a FIX message (of type Order-Single) embedded into a SOAP message.
example:
- ant fixclient -Dsymbol=IBM -Dqty=5 -Dmode=buy -Daddurl=http://localhost:8080/soap/FIXProxy
+ ant fixclient -Dsymbol=IBM -Dqty=5 -Dmode=buy -Daddurl=http://localhost:8280/soap/FIXProxy
</echo>
</target>
@@ -124,11 +124,11 @@
<!-- a simple smoke test -->
<target name="smoke" depends="compile">
<property name="addurl" value="http://localhost:9000/soap/SimpleStockQuoteService"/>
- <property name="trpurl" value="http://localhost:8080/"/>
+ <property name="trpurl" value="http://localhost:8280/"/>
<java classname="samples.userguide.StockQuoteClient"
classpathref="javac.classpath" fork="true">
<sysproperty key="addurl" value="http://localhost:9000/soap/SimpleStockQuoteService"/>
- <sysproperty key="trpurl" value="http://localhost:8080/"/>
+ <sysproperty key="trpurl" value="http://localhost:8280/"/>
<sysproperty key="java.io.tmpdir" value="./../../work/temp/sampleClient"/>
</java>
</target>
Modified: synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java (original)
+++ synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/AbstractAutomationTestCase.java Mon Jun 9 03:43:19 2008
@@ -49,7 +49,7 @@
}
protected void setUpSynapseEnv() {
- System.setProperty("port", "8080");
+ System.setProperty("port", "8280");
System.setProperty("org.apache.xerces.xni.parser.XMLParserConfiguration",
"org.apache.xerces.parsers.XMLGrammarCachingConfiguration");
System.setProperty("axis2.xml", "modules/samples/target/test_repos/synapse/conf/axis2.xml");
@@ -88,5 +88,5 @@
protected final String SYNAPSE_REPO = "modules/samples/target/test_repos/synapse";
protected final String SAMPLE_CONFIG_ROOT_PATH = "repository/conf/sample/";
- protected final String SYNAPSE_BASE_URL = "http://localhost:8080/";
+ protected final String SYNAPSE_BASE_URL = "http://localhost:8280/";
}
Modified: synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java (original)
+++ synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_102_Integration.java Mon Jun 9 03:43:19 2008
@@ -15,7 +15,7 @@
}
public void testSample() throws Exception {
- System.setProperty("trpurl", "http://localhost:8080/soap/StockQuoteProxy");
+ System.setProperty("trpurl", "http://localhost:8280/soap/StockQuoteProxy");
try {
getStringResultOfTest(StockQuoteClient.executeTestClient());
} catch (AxisFault f) {
@@ -23,7 +23,7 @@
"/soap/StockQuoteProxy", f.getReason());
}
- System.setProperty("trpurl", "https://localhost:8443/soap/StockQuoteProxy");
+ System.setProperty("trpurl", "https://localhost:8243/soap/StockQuoteProxy");
String resultString = getStringResultOfTest(StockQuoteClient.executeTestClient());
assertXpathExists("ns:getQuoteResponse", resultString);
assertXpathExists("ns:getQuoteResponse/ns:return", resultString);
Modified: synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java (original)
+++ synapse/trunk/java/modules/samples/src/test/java/org/apache/synapse/samples/n2n/SynapseSample_51_Integration.java Mon Jun 9 03:43:19 2008
@@ -17,7 +17,7 @@
public void testSample() throws Exception {
System.setProperty("opt_mode", "mtom");
OMElement response = MTOMSwAClient.sendUsingMTOM(
- "./../../repository/conf/sample/resources/mtom/asf-logo.gif", "http://localhost:8080/soap/MTOMSwASampleService");
+ "./../../repository/conf/sample/resources/mtom/asf-logo.gif", "http://localhost:8280/soap/MTOMSwASampleService");
// assertXpathExists("ns:getQuoteResponse", resultString);
// assertXpathExists("ns:getQuoteResponse/ns:return", resultString);
}
Modified: synapse/trunk/java/modules/transports/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/pom.xml?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/pom.xml (original)
+++ synapse/trunk/java/modules/transports/pom.xml Mon Jun 9 03:43:19 2008
@@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.synapse</groupId>
<artifactId>Apache-Synapse</artifactId>
- <version>1.2-SNAPSHOT</version>
+ <version>SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractPollingTransportListener.java Mon Jun 9 03:43:19 2008
@@ -78,6 +78,14 @@
return;
}
+ if (state == BaseConstants.STOPPED) {
+ if (log.isDebugEnabled()) {
+ log.debug("Transport " + transportName +
+ " onPoll() trigger : Transport is currently stopped..");
+ }
+ return;
+ }
+
workerPool.execute(new Runnable() {
public void run() {
synchronized (pollLock) {
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/AbstractTransportListener.java Mon Jun 9 03:43:19 2008
@@ -142,7 +142,7 @@
if (state != BaseConstants.STARTED) {
state = BaseConstants.STARTED;
// register to receive updates on services for lifetime management
- cfgCtx.getAxisConfiguration().addObservers(axisObserver);
+ // cfgCtx.getAxisConfiguration().addObservers(axisObserver);
}
log.info(transportName.toUpperCase() + " Listener started");
@@ -428,7 +428,6 @@
/**
* Utility method to allow transports to register MBeans
- * @param mbs the MBeanServer
* @param mbeanInstance bean instance
* @param objectName name
*/
@@ -452,9 +451,12 @@
private void unregisterMBean(String objectName) {
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.unregisterMBean(new ObjectName(objectName));
+ ObjectName objName = new ObjectName(objectName);
+ if (mbs.isRegistered(objName)) {
+ mbs.unregisterMBean(objName);
+ }
} catch (Exception e) {
- log.warn("Error registering a MBean with objectname ' " + objectName +
+ log.warn("Error un-registering a MBean with objectname ' " + objectName +
" ' for JMX management", e);
}
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/base/BaseUtils.java Mon Jun 9 03:43:19 2008
@@ -194,16 +194,19 @@
* @param contentType
* @throws AxisFault on errors encountered while setting the envelope to the message context
*/
- public void setSOAPEnvelope(Object message, MessageContext msgContext, String contentType) throws AxisFault {
+ public void setSOAPEnvelope(Object message, MessageContext msgContext, String contentType)
+ throws AxisFault {
SOAPEnvelope envelope = null;
StAXBuilder builder = null;
- String charSetEnc;
+ String charSetEnc = null;
try {
- charSetEnc = new ContentType(contentType).getParameter("charset");
+ if (contentType != null) {
+ charSetEnc = new ContentType(contentType).getParameter("charset");
+ }
} catch (ParseException ex) {
- charSetEnc = null;
+ // ignore
}
InputStream in = getInputStream(message);
@@ -223,7 +226,9 @@
} catch (Exception ignore) {
try {
in.close();
- } catch (IOException e) {}
+ } catch (IOException e) {
+ // ignore
+ }
in = getInputStream(message);
}
@@ -282,7 +287,7 @@
private SOAPEnvelope handleLegacyMessage(MessageContext msgContext, Object message) {
SOAPFactory soapFactory = new SOAP11Factory();
- SOAPEnvelope envelope = null;
+ SOAPEnvelope envelope;
if (log.isDebugEnabled()) {
log.debug("Non SOAP/XML message received");
@@ -392,8 +397,8 @@
} else {
List transports = service.getExposedTransports();
- for (int i = 0; i < transports.size(); i++) {
- if (transportName.equals(transports.get(i))) {
+ for (Object transport : transports) {
+ if (transportName.equals(transport)) {
return true;
}
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSListener.java Mon Jun 9 03:43:19 2008
@@ -19,16 +19,19 @@
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.description.*;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.description.TransportInDescription;
import org.apache.synapse.transport.base.AbstractTransportListener;
+import org.apache.synapse.transport.base.BaseConstants;
import org.apache.synapse.transport.base.BaseUtils;
import org.apache.synapse.transport.base.ManagementSupport;
-import org.apache.synapse.transport.base.BaseConstants;
-import org.apache.commons.logging.LogFactory;
import javax.jms.JMSException;
import javax.naming.NamingException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
/**
* The JMS Transport listener implementation. A JMS Listner will hold one or
@@ -133,6 +136,10 @@
if (serviceName.indexOf('/') != -1) {
serviceName = serviceName.substring(0, serviceName.indexOf('/'));
}
+ // strip out the endpoint name if present
+ if (serviceName.indexOf('.') != -1) {
+ serviceName = serviceName.substring(0, serviceName.indexOf('.'));
+ }
return new EndpointReference[]{
new EndpointReference((String) serviceNameToEPRMap.get(serviceName))};
}
@@ -165,8 +172,8 @@
String destinationType = JMSUtils.getDestinationTypeForService(service);
- log.info("Starting to listen on destination : " + destinationName + " of type " + destinationType
- + " for service " + service.getName());
+ log.info("Starting to listen on destination : " + destinationName + " of type "
+ + destinationType + " for service " + service.getName());
cf.addDestination(destinationName, destinationType, service.getName());
cf.startListeningOnDestination(destinationName, destinationType);
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/jms/JMSUtils.java Mon Jun 9 03:43:19 2008
@@ -227,6 +227,7 @@
* @param property property name
* @return property value
*/
+ @Override
public String getProperty(Object message, String property) {
try {
return ((Message)message).getStringProperty(property);
@@ -310,6 +311,7 @@
* @param message the JMS message
* @return an InputStream to the payload
*/
+ @Override
public InputStream getInputStream(Object message) {
try {
@@ -682,6 +684,7 @@
}
+ @Override
public String getMessageTextPayload(Object message) {
if (message instanceof TextMessage) {
try {
@@ -693,6 +696,7 @@
return null;
}
+ @Override
public byte[] getMessageBinaryPayload(Object message) {
if (message instanceof BytesMessage) {
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailTransportListener.java Mon Jun 9 03:43:19 2008
@@ -58,8 +58,10 @@
public static final String DELETE = "DELETE";
public static final String MOVE = "MOVE";
- /** Keep the list of directories/files and poll durations */
- private final List pollTable = new ArrayList();
+ /** Keep the list of email accounts and poll durations */
+ private final List<PollTableEntry> pollTable = new ArrayList<PollTableEntry>();
+ /** Keep the list of removed pollTable entries */
+ private final List<PollTableEntry> removeTable = new ArrayList<PollTableEntry>();
/**
* Initializes the Mail transport
@@ -79,11 +81,12 @@
* it is time to scan the contents for new files
*/
public void onPoll() {
- Iterator iter = pollTable.iterator();
- while (iter.hasNext()) {
- PollTableEntry entry = (PollTableEntry) iter.next();
- long startTime = System.currentTimeMillis();
+ if (!removeTable.isEmpty()) {
+ pollTable.removeAll(removeTable);
+ }
+ for (PollTableEntry entry : pollTable) {
+ long startTime = System.currentTimeMillis();
if (startTime > entry.getNextPollTime()) {
checkMail(entry, entry.getEmailAddress());
}
@@ -473,12 +476,10 @@
* @throws AxisFault not used
*/
public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault {
- Iterator iter = pollTable.iterator();
- while (iter.hasNext()) {
- PollTableEntry entry = (PollTableEntry) iter.next();
- if (entry.getServiceName().equals(serviceName)) {
- return new EndpointReference[]{
- new EndpointReference(
+ for (PollTableEntry entry : pollTable) {
+ if (entry.getServiceName().equals(serviceName) ||
+ serviceName.startsWith(entry.getServiceName() + ".")) {
+ return new EndpointReference[]{new EndpointReference(
MailConstants.TRANSPORT_PREFIX + entry.getEmailAddress())};
}
}
@@ -581,12 +582,10 @@
}
protected void stopListeningForService(AxisService service) {
- Iterator iter = pollTable.iterator();
- while (iter.hasNext()) {
- PollTableEntry entry = (PollTableEntry) iter.next();
+ for (PollTableEntry entry : pollTable) {
if (service.getName().equals(entry.getServiceName())) {
cancelPoll(service);
- pollTable.remove(entry);
+ removeTable.add(entry);
}
}
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/mail/MailUtils.java Mon Jun 9 03:43:19 2008
@@ -50,6 +50,7 @@
* @param property property name
* @return property value
*/
+ @Override
public String getProperty(Object message, String property) {
try {
Object o = ((Message) message).getHeader(property);
@@ -62,6 +63,7 @@
return null;
}
+ @Override
public InputStream getInputStream(Object message) {
try {
if (message instanceof MimeMessage) {
@@ -100,6 +102,7 @@
return null;
}
+ @Override
public String getMessageTextPayload(Object message) {
try {
return new String(
@@ -113,6 +116,7 @@
return null;
}
+ @Override
public byte[] getMessageBinaryPayload(Object message) {
try {
return getBytesFromInputStream(getInputStream(message), ((Message) message).getSize());
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java Mon Jun 9 03:43:19 2008
@@ -31,20 +31,19 @@
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpVersion;
+import org.apache.http.nio.util.ContentOutputBuffer;
+import org.apache.http.nio.entity.ContentOutputStream;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.protocol.HTTP;
import org.apache.synapse.transport.nhttp.util.MessageFormatterDecoratorFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
import org.apache.synapse.transport.nhttp.util.RESTUtil;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
+import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.Map;
@@ -63,13 +62,15 @@
private HttpHost httpHost = null;
/** the message context being sent */
private MessageContext msgContext = null;
- /** the Pipe which facilitates the serialization output to be written to the channel */
- private PipeImpl pipe = null;
/** The Axis2 MessageFormatter that will ensure proper serialization as per Axis2 semantics */
MessageFormatter messageFormatter = null;
/** The OM Output format holder */
OMOutputFormat format = null;
- protected boolean completed = false; //added for request complete checking
+ private ContentOutputBuffer outputBuffer = null;
+ /** ready to begin streaming? */
+ private boolean readyToStream = false;
+ /** for request complete checking */
+ private boolean completed = false;
public Axis2HttpRequest(EndpointReference epr, HttpHost httpHost, MessageContext msgContext) {
this.epr = epr;
@@ -78,11 +79,23 @@
this.format = NhttpUtils.getOMOutputFormat(msgContext);
this.messageFormatter =
MessageFormatterDecoratorFactory.createMessageFormatterDecorator(msgContext);
- try {
- this.pipe = new PipeImpl();
- } catch (IOException e) {
- log.error("Error creating pipe to write message body", e);
- }
+ }
+
+ public void setReadyToStream(boolean readyToStream) {
+ this.readyToStream = readyToStream;
+ }
+
+ public void setOutputBuffer(ContentOutputBuffer outputBuffer) {
+ this.outputBuffer = outputBuffer;
+ }
+
+ public void clear() {
+ this.epr = null;
+ this.httpHost = null;
+ this.msgContext = null;
+ this.format = null;
+ this.messageFormatter = null;
+ this.outputBuffer = null;
}
public EndpointReference getEpr() {
@@ -120,7 +133,7 @@
"POST", epr.getAddress(), HttpVersion.HTTP_1_0);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- messageFormatter.writeTo(msgContext, format, baos, true);
+ messageFormatter.writeTo(msgContext, format, baos, false);
BasicHttpEntity entity = new BasicHttpEntity();
entity.setContentLength(baos.toByteArray().length);
((BasicHttpEntityEnclosingRequest) httpRequest).setEntity(entity);
@@ -184,28 +197,6 @@
}
/**
- * Return the source channel of the pipe that bridges the serialized output to the socket
- * @return source channel to read serialized message contents
- */
- public ReadableByteChannel getSourceChannel() {
- if (log.isDebugEnabled()) {
- log.debug("get source channel of the pipe on which the outgoing response is written");
- }
- return pipe.source();
- }
-
- /**
- * Return the sink channel of the pipe that bridges the serialized output to the socket
- * @return sink channel to read serialized message contents
- */
- public WritableByteChannel getSinkChannel() {
- if (log.isDebugEnabled()) {
- log.debug("get sink channel of the pipe on which the outgoing response is written");
- }
- return pipe.sink();
- }
-
- /**
* Start streaming the message into the Pipe, so that the contents could be read off the source
* channel returned by getSourceChannel()
* @throws AxisFault on error
@@ -215,24 +206,41 @@
if (log.isDebugEnabled()) {
log.debug("start streaming outgoing http request");
}
- OutputStream out = Channels.newOutputStream(pipe.sink());
- try {
- messageFormatter.writeTo(msgContext, format, out, true);
- } catch (Exception e) {
- /* close PipeImpl will manually raise exception
- while streaming, so blocking status will be released */
- if (e instanceof AxisFault) {
- throw (AxisFault) e;
- } else {
- handleException("Error streaming message context", e);
+
+ synchronized(this) {
+ while (!readyToStream && !completed) {
+ try {
+ this.wait();
+ } catch (InterruptedException ignore) {}
}
}
- finally {
+
+ if (!completed) {
+ OutputStream out = new ContentOutputStream(outputBuffer);
try {
- out.flush();
- out.close();
- } catch (IOException e) {
- handleException("Error closing outgoing message stream", e);
+ messageFormatter.writeTo(msgContext, format, out, false);
+ } catch (Exception e) {
+ Throwable t = e.getCause();
+ if (t != null && t.getCause() != null && t.getCause() instanceof ClosedChannelException) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ignore closed channel exception, as the " +
+ "SessionRequestCallback handles this exception");
+ }
+ } else {
+ if (e instanceof AxisFault) {
+ throw (AxisFault) e;
+ } else {
+ handleException("Error streaming message context", e);
+ }
+ }
+ }
+ finally {
+ try {
+ out.flush();
+ out.close();
+ } catch (IOException e) {
+ handleException("Error closing outgoing message stream", e);
+ }
}
}
}
@@ -248,9 +256,9 @@
}
public void setCompleted(boolean completed) {
- if (completed && !isCompleted()) {
- this.pipe.close();
- }
this.completed = completed;
+ synchronized(this) {
+ this.notifyAll();
+ }
}
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Mon Jun 9 03:43:19 2008
@@ -39,20 +39,17 @@
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpClientHandler;
+import org.apache.http.nio.entity.ContentInputStream;
+import org.apache.http.nio.util.*;
import org.apache.http.params.DefaultedHttpParams;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.*;
import org.apache.synapse.transport.base.MetricsCollector;
import org.apache.synapse.transport.base.threads.WorkerPool;
import org.apache.synapse.transport.base.threads.WorkerPoolFactory;
-import org.apache.synapse.transport.nhttp.util.PipeImpl;
+import org.apache.synapse.transport.nhttp.util.SharedInputBuffer;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
/**
* The client connection handler. An instance of this class is used by each IOReactor, to
@@ -69,6 +66,8 @@
private final HttpProcessor httpProcessor;
/** the connection re-use strategy */
private final ConnectionReuseStrategy connStrategy;
+ /** the buffer allocator */
+ private final ByteBufferAllocator allocator;
/** the Axis2 configuration context */
ConfigurationContext cfgCtx = null;
@@ -79,15 +78,12 @@
/** the metrics collector */
private MetricsCollector metrics = null;
- private static final String REQUEST_BUFFER = "request-buffer";
- private static final String RESPONSE_BUFFER = "response-buffer";
- private static final String OUTGOING_MESSAGE_CONTEXT = "axis2_message_context";
- private static final String REQUEST_SOURCE_CHANNEL = "request-source-channel";
- private static final String RESPONSE_SINK_CHANNEL = "response-sink-channel";
- private static final String REQUEST_SINK_CHANNEL = "request-sink-channel";
- private static final String RESPONSE_SOURCE_CHANNEL = "response-source-channel";
+ public static final String OUTGOING_MESSAGE_CONTEXT = "synapse.axis2_message_context";
+ public static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
+
+ public static final String REQUEST_SOURCE_BUFFER = "synapse.request-source-buffer";
+ public static final String RESPONSE_SINK_BUFFER = "synapse.response-sink-buffer";
- private static final String AXIS2_HTTP_REQUEST = "synapse.axis2-http-request";
private static final String CONTENT_TYPE = "Content-Type";
/**
@@ -105,6 +101,7 @@
this.httpProcessor = getHttpProcessor();
this.connStrategy = new DefaultConnectionReuseStrategy();
this.metrics = metrics;
+ this.allocator = new HeapByteBufferAllocator();
this.cfg = NHttpConfiguration.getInstance();
workerPool = WorkerPoolFactory.getWorkerPool(
@@ -129,13 +126,14 @@
try {
HttpContext context = conn.getContext();
+ ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+ axis2Req.setOutputBuffer(outputBuffer);
+ context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);
+ context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
- context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
- context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
HttpRequest request = axis2Req.getRequest();
request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -144,6 +142,11 @@
conn.submitRequest(request);
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ synchronized(axis2Req) {
+ axis2Req.setReadyToStream(true);
+ axis2Req.notifyAll();
+ }
+
} catch (IOException e) {
handleException("I/O Error : " + e.getMessage(), e, conn);
} catch (HttpException e) {
@@ -165,18 +168,14 @@
try {
HttpContext context = conn.getContext();
Axis2HttpRequest axis2Req = (Axis2HttpRequest) attachment;
+ ContentOutputBuffer outputBuffer = new SharedOutputBuffer(cfg.getBufferSize(), conn, allocator);
+ axis2Req.setOutputBuffer(outputBuffer);
+ context.setAttribute(REQUEST_SOURCE_BUFFER, outputBuffer);
context.setAttribute(AXIS2_HTTP_REQUEST, axis2Req);
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, axis2Req.getHttpHost());
-
- // allocate temporary buffers to process this request
- context.setAttribute(REQUEST_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
- context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(cfg.getBufferSize()));
-
context.setAttribute(OUTGOING_MESSAGE_CONTEXT, axis2Req.getMsgContext());
- context.setAttribute(REQUEST_SOURCE_CHANNEL, axis2Req.getSourceChannel());
- context.setAttribute(REQUEST_SINK_CHANNEL, axis2Req.getSinkChannel());
HttpRequest request = axis2Req.getRequest();
request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
@@ -185,6 +184,11 @@
conn.submitRequest(request);
context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
+ synchronized(axis2Req) {
+ axis2Req.setReadyToStream(true);
+ axis2Req.notifyAll();
+ }
+
} catch (IOException e) {
handleException("I/O Error : " + e.getMessage(), e, conn);
} catch (HttpException e) {
@@ -193,33 +197,18 @@
}
public void closed(final NHttpClientConnection conn) {
- checkAxisRequestComplete(conn, "Abnormal connection close", null);
-
- // Check sink and source channels and close them if they aren't closed already.
- // Normally these should be closed by inputReady() and outputReady(). A null request
- // or response will not hit inputReady and outputReady however.
+ ConnectionPool.forget(conn);
+ checkAxisRequestComplete(conn, "Abnormal connection close", null);
HttpContext context = conn.getContext();
- closeChannel((ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL));
- // Note: We do not close the RESPONSE_SOURCE_CHANNEL at this point in time as its closed
- // by the ClientWorker:run() in the finally block, after the response was processed
- // fix for https://issues.apache.org/jira/browse/SYNAPSE-289
- closeChannel((WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL));
- closeChannel((WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL));
-
+ context.removeAttribute(RESPONSE_SINK_BUFFER);
+ context.removeAttribute(REQUEST_SOURCE_BUFFER);
+
if (log.isTraceEnabled()) {
log.trace("Connection closed");
}
}
- private void closeChannel(Channel chn) {
- try {
- if (chn != null && chn.isOpen()) {
- chn.close();
- }
- } catch (IOException ignore) {}
- }
-
/**
* Handle connection timeouts by shutting down the connections
* @param conn the connection being processed
@@ -272,47 +261,48 @@
* @param exceptionToRaise an Exception to be returned to the MR on failure
*/
private void checkAxisRequestComplete(NHttpClientConnection conn,
- String errorMessage, Exception exceptionToRaise) {
+ final String errorMessage, final Exception exceptionToRaise) {
Axis2HttpRequest axis2Request = (Axis2HttpRequest)
conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
- if (axis2Request == null) {
- log.error("httpContext's AXIS2_HTTP_REQUEST attribute was null");
-
- } else if (!axis2Request.isCompleted()) {
+ if (axis2Request != null && !axis2Request.isCompleted()) {
axis2Request.setCompleted(true);
if (errorMessage == null && exceptionToRaise == null) {
return; // no need to continue
}
- MessageContext mc = axis2Request.getMsgContext();
+ final MessageContext mc = axis2Request.getMsgContext();
if (mc.getAxisOperation() != null &&
mc.getAxisOperation().getMessageReceiver() != null) {
- MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
- try {
- MessageContext nioFaultMessageContext = null;
- if (errorMessage != null) {
- nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
- mc, new AxisFault(errorMessage));
- } else if (exceptionToRaise != null) {
- nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
- /** this is not a mistake I do NOT want getMessage()*/
- mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
- }
+ workerPool.execute( new Runnable() {
+ public void run() {
+ MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+ try {
+ MessageContext nioFaultMessageContext = null;
+ if (errorMessage != null) {
+ nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+ mc, new AxisFault(errorMessage));
+ } else if (exceptionToRaise != null) {
+ nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+ /** this is not a mistake I do NOT want getMessage()*/
+ mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
+ }
+
+ if (nioFaultMessageContext != null) {
+ nioFaultMessageContext.setProperty(
+ NhttpConstants.SENDING_FAULT, Boolean.TRUE);
+ mr.receive(nioFaultMessageContext);
+ }
- if (nioFaultMessageContext != null) {
- nioFaultMessageContext.setProperty(
- NhttpConstants.SENDING_FAULT, Boolean.TRUE);
- mr.receive(nioFaultMessageContext);
+ } catch (AxisFault af) {
+ log.error("Unable to report back failure to the message receiver", af);
+ }
}
-
- } catch (AxisFault af) {
- log.error("Unable to report back failure to the message receiver", af);
- }
+ });
}
}
}
@@ -325,25 +315,18 @@
public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
HttpContext context = conn.getContext();
HttpResponse response = conn.getHttpResponse();
- WritableByteChannel sink
- = (WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL);
- ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
+ ContentInputBuffer inBuf = (ContentInputBuffer) context.getAttribute(RESPONSE_SINK_BUFFER);
try {
- while (decoder.read(inbuf) > 0) {
- inbuf.flip();
- sink.write(inbuf);
- if (metrics != null) {
- metrics.incrementBytesReceived(inbuf.position());
- }
- inbuf.compact();
+ int bytesRead = inBuf.consumeContent(decoder);
+ if (metrics != null && bytesRead > 0) {
+ metrics.incrementBytesReceived(bytesRead);
}
if (decoder.isCompleted()) {
if (metrics != null) {
metrics.incrementMessagesReceived();
}
- if (sink != null) sink.close();
if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
@@ -364,28 +347,18 @@
public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
HttpContext context = conn.getContext();
- ReadableByteChannel source
- = (ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL);
- ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
+ ContentOutputBuffer outBuf = (ContentOutputBuffer) context.getAttribute(REQUEST_SOURCE_BUFFER);
try {
- int bytesRead = source.read(outbuf);
- if (bytesRead == -1) {
- encoder.complete();
- } else {
- outbuf.flip();
- encoder.write(outbuf);
- if (metrics != null) {
- metrics.incrementBytesSent(outbuf.position());
- }
- outbuf.compact();
+ int bytesWritten = outBuf.produceContent(encoder);
+ if (metrics != null && bytesWritten > 0) {
+ metrics.incrementBytesSent(bytesWritten);
}
if (encoder.isCompleted()) {
if (metrics != null) {
metrics.incrementMessagesSent();
}
- source.close();
}
} catch (IOException e) {
@@ -413,6 +386,11 @@
log.debug("Received a 202 Accepted response");
}
+ // sometimes, some http clients sends an "\r\n" as the content body with a
+ // HTTP 202 OK.. we will just get it into this temp buffer and ignore it..
+ ContentInputBuffer inputBuffer = new SharedInputBuffer(8, conn, allocator);
+ context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
+
// create a dummy message with an empty SOAP envelope and a property
// NhttpConstants.SC_ACCEPTED set to Boolean.TRUE to indicate this is a
// placeholder message for the transport to send a HTTP 202 to the
@@ -556,25 +534,24 @@
private void processResponse(final NHttpClientConnection conn, HttpContext context,
HttpResponse response) {
- try {
- PipeImpl responsePipe = new PipeImpl();
- context.setAttribute(RESPONSE_SINK_CHANNEL, responsePipe.sink());
- context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe.source());
-
- BasicHttpEntity entity = new BasicHttpEntity();
- if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
- entity.setChunked(true);
- }
- response.setEntity(entity);
- context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
-
- workerPool.execute(
- new ClientWorker(cfgCtx, Channels.newInputStream(responsePipe.source()), response,
- (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
+ ContentInputBuffer inputBuffer = new SharedInputBuffer(cfg.getBufferSize(), conn, allocator);
+ context.setAttribute(RESPONSE_SINK_BUFFER, inputBuffer);
- } catch (IOException e) {
- handleException("I/O Error : " + e.getMessage(), e, conn);
+ BasicHttpEntity entity = new BasicHttpEntity();
+ if (response.getStatusLine().getProtocolVersion().greaterEquals(HttpVersion.HTTP_1_1)) {
+ entity.setChunked(true);
}
+ response.setEntity(entity);
+ context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
+
+ workerPool.execute(
+ new ClientWorker(cfgCtx, new ContentInputStream(inputBuffer), response,
+ (MessageContext) context.getAttribute(OUTGOING_MESSAGE_CONTEXT)));
+
+ }
+
+ public void execute(Runnable task) {
+ workerPool.execute(task);
}
// ----------- utility methods -----------
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientWorker.java Mon Jun 9 03:43:19 2008
@@ -192,6 +192,7 @@
// used at the sender to set the propper status code when passing the message
responseMsgCtx.setProperty(NhttpConstants.HTTP_SC,
this.response.getStatusLine().getStatusCode());
+ responseMsgCtx.setProperty(NhttpConstants.NON_BLOCKING_TRANSPORT, true);
// process response received
AxisEngine engine = new AxisEngine(cfgCtx);
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ConnectionPool.java Mon Jun 9 03:43:19 2008
@@ -20,6 +20,7 @@
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.protocol.ExecutionContext;
+import org.apache.http.protocol.HttpContext;
import org.apache.http.HttpHost;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,15 +55,16 @@
while (!connections.isEmpty()) {
conn = (NHttpClientConnection) connections.remove(0);
- if (conn.isOpen()) {
+ if (conn.isOpen() && !conn.isStale()) {
if (log.isDebugEnabled()) {
log.debug("A connection to host : " + host + " on port : " +
port + " is available in the pool, and will be reused");
}
+ conn.requestInput(); // asankha - make sure keep alives work properly when reused with throttling
return conn;
} else {
if (log.isDebugEnabled()) {
- log.debug("closing stale connection");
+ log.debug("closing stale connection to : " + host + ":" + port);
}
try {
conn.close();
@@ -93,6 +95,7 @@
}
}
+ cleanConnectionReferences(conn);
connections.add(conn);
if (log.isDebugEnabled()) {
@@ -100,4 +103,37 @@
host.getPort() + " to the connection pool of current size : " + connections.size());
}
}
+
+ private static void cleanConnectionReferences(NHttpClientConnection conn) {
+
+ HttpContext ctx = conn.getContext();
+ Axis2HttpRequest axis2Req =
+ (Axis2HttpRequest) ctx.getAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+ axis2Req.clear(); // this is linked via the selection key attachment and will free itself
+ // on timeout of the keep alive connection. Till then minimize the
+ // memory usage to a few bytes
+
+ ctx.removeAttribute(ClientHandler.AXIS2_HTTP_REQUEST);
+ ctx.removeAttribute(ClientHandler.OUTGOING_MESSAGE_CONTEXT);
+ ctx.removeAttribute(ClientHandler.REQUEST_SOURCE_BUFFER);
+ ctx.removeAttribute(ClientHandler.RESPONSE_SINK_BUFFER);
+
+ ctx.removeAttribute(ExecutionContext.HTTP_REQUEST);
+ ctx.removeAttribute(ExecutionContext.HTTP_RESPONSE);
+ ctx.removeAttribute(ExecutionContext.HTTP_CONNECTION);
+ }
+
+ public static void forget(NHttpClientConnection conn) {
+
+ HttpHost host = (HttpHost) conn.getContext().getAttribute(
+ ExecutionContext.HTTP_TARGET_HOST);
+ String key = host.getHostName() + ":" + Integer.toString(host.getPort());
+
+ List connections = (List) connMap.get(key);
+ if (connections != null) {
+ synchronized(connections) {
+ connections.remove(conn);
+ }
+ }
+ }
}
Modified: synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=664672&r1=664671&r2=664672&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java (original)
+++ synapse/trunk/java/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java Mon Jun 9 03:43:19 2008
@@ -67,8 +67,8 @@
/** The EPR prefix for services available over this transport */
private String serviceEPRPrefix;
- /** The port to listen on, defaults to 8080 */
- private int port = 8080;
+ /** The port to listen on, defaults to 8280 */
+ private int port = 8280;
/** The hostname to use, defaults to localhost */
private String host = "localhost";
/** The bind addresses as (address, port) pairs */