You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/03/08 21:54:30 UTC
svn commit: r384328 - in /incubator/servicemix/trunk/servicemix-core/src:
main/java/org/apache/servicemix/jbi/nmr/
main/java/org/apache/servicemix/jbi/nmr/flow/
main/java/org/apache/servicemix/jbi/nmr/flow/seda/
main/java/org/apache/servicemix/jbi/nmr/...
Author: gnodet
Date: Wed Mar 8 12:54:23 2006
New Revision: 384328
URL: http://svn.apache.org/viewcvs?rev=384328&view=rev
Log:
SM-319: multiple flows. last step. create a test case, and ensure everything works
Added:
incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java
incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java Wed Mar 8 12:54:23 2006
@@ -383,7 +383,6 @@
}
if (theEndpoint != null) {
exchange.setEndpoint(theEndpoint);
- exchange.setDestinationId(((AbstractServiceEndpoint) theEndpoint).getComponentNameSpace());
}
if (log.isTraceEnabled()) {
log.trace("Routing exchange " + exchange + " to: " + theEndpoint);
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Wed Mar 8 12:54:23 2006
@@ -211,45 +211,19 @@
}
protected boolean isClustered(MessageExchange me) {
- ServiceEndpoint se = me.getEndpoint();
- if (se == null) {
- // Routing by service name
- QName serviceName = me.getService();
- QName interfaceName = me.getInterfaceName();
- if (serviceName != null) {
- ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForService(serviceName);
- for (int i = 0; i < eps.length; i++) {
- if (eps[i] instanceof InternalEndpoint) {
- String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
- if (!name.equals(broker.getContainerName())) {
- return true;
- }
- }
- }
- return false;
- // Routing by interface name
- } else if (interfaceName != null) {
- ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForInterface(interfaceName);
- for (int i = 0; i < eps.length; i++) {
- if (eps[i] instanceof InternalEndpoint) {
- String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
- if (!name.equals(broker.getContainerName())) {
- return true;
- }
- }
- }
- return false;
+ MessageExchangeImpl mei = (MessageExchangeImpl) me;
+ if (mei.getDestinationId() == null) {
+ ServiceEndpoint se = me.getEndpoint();
+ if (se instanceof InternalEndpoint) {
+ return ((InternalEndpoint) se).isClustered();
+ // Unknown: assume this is not clustered
} else {
- // Should not happen
return false;
}
- // Routing by endpoint
- } else if (se instanceof InternalEndpoint) {
- String name = ((InternalEndpoint) se).getComponentNameSpace().getContainerName();
- return !name.equals(broker.getContainerName());
- // Unknown: assume this is not clustered
} else {
- return false;
+ String destination = mei.getDestinationId().getContainerName();
+ String source = mei.getSourceId().getContainerName();
+ return !source.equals(destination);
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java Wed Mar 8 12:54:23 2006
@@ -36,6 +36,7 @@
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.nmr.Broker;
import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -144,6 +145,9 @@
// If the message has been sent synchronously, do not use seda
// as it would consume threads from the work manager in a useless
// way. This could lead to deadlocks.
+ if (me.getDestinationId() == null) {
+ me.setDestinationId(((AbstractServiceEndpoint) me.getEndpoint()).getComponentNameSpace());
+ }
if (me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) == null &&
me.getSyncState() == MessageExchangeImpl.SYNC_STATE_ASYNC &&
me.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_ASYNC) {
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java Wed Mar 8 12:54:23 2006
@@ -17,6 +17,7 @@
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
@@ -40,6 +41,9 @@
* @throws MessagingException
*/
protected void doSend(MessageExchangeImpl me) throws MessagingException {
+ if (me.getDestinationId() == null) {
+ me.setDestinationId(((AbstractServiceEndpoint) me.getEndpoint()).getComponentNameSpace());
+ }
doRouting(me);
}
Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java?rev=384328&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java Wed Mar 8 12:54:23 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.jbi.container.SpringJBIContainer;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.Sender;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+
+public class MultipleFlowsTest extends TestCase {
+
+ private SpringJBIContainer localContainer;
+ private SpringJBIContainer remoteContainer;
+
+ private Sender localSender;
+ private Sender remoteSender;
+ private Sender clusteredSender;
+
+ private Receiver localReceiver;
+ private Receiver remoteReceiver;
+ private Receiver clusteredReceiver1;
+ private Receiver clusteredReceiver2;
+
+ private AbstractXmlApplicationContext context;
+
+ private int messageCount = 100;
+
+ protected void setUp() throws Exception {
+ context = new ClassPathXmlApplicationContext("org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml");
+ localContainer = (SpringJBIContainer) context.getBean("local");
+ remoteContainer = (SpringJBIContainer) context.getBean("remote");
+ localSender = (Sender) localContainer.getBean("localSender");
+ remoteSender = (Sender) localContainer.getBean("remoteSender");
+ clusteredSender = (Sender) localContainer.getBean("clusteredSender");
+ localReceiver = (Receiver) localContainer.getBean("localReceiver");
+ remoteReceiver = (Receiver) remoteContainer.getBean("remoteReceiver");
+ clusteredReceiver1 = (Receiver) localContainer.getBean("clusteredReceiver");
+ clusteredReceiver2 = (Receiver) remoteContainer.getBean("clusteredReceiver");
+ Thread.sleep(1000);
+ }
+
+ protected void tearDown() throws Exception {
+ context.close();
+ }
+
+ public void test() throws Exception {
+ // Local
+ localSender.sendMessages(messageCount);
+ localReceiver.getMessageList().assertMessagesReceived(messageCount);
+
+ // Remote
+ remoteSender.sendMessages(messageCount);
+ remoteReceiver.getMessageList().assertMessagesReceived(messageCount);
+
+ // Clustered
+ clusteredSender.sendMessages(messageCount);
+ long t0 = System.currentTimeMillis();
+ int n1 = 0;
+ int n2 = 0;
+ while (System.currentTimeMillis() - t0 < 10000) {
+ n1 = clusteredReceiver1.getMessageList().getMessageCount();
+ n2 = clusteredReceiver2.getMessageList().getMessageCount();
+ if (n1 + n2 == messageCount) {
+ break;
+ }
+ }
+ assertEquals(messageCount, n1 + n2);
+ assertTrue(n1 > 0);
+ assertTrue(n2 > 0);
+ }
+
+}
Added: incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml?rev=384328&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml Wed Mar 8 12:54:23 2006
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns:sm="http://servicemix.apache.org/config/1.0"
+ xmlns:foo="urn:foo">
+
+ <sm:container name="local" flowNames="seda,jms" embedded="true">
+ <sm:activationSpecs>
+ <sm:activationSpec id="localSender" destinationService="localReceiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.SenderComponent" />
+ </sm:component>
+ </sm:activationSpec>
+ <sm:activationSpec id="remoteSender" destinationService="remoteReceiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.SenderComponent" />
+ </sm:component>
+ </sm:activationSpec>
+ <sm:activationSpec id="clusteredSender" destinationService="clusteredReceiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.SenderComponent" />
+ </sm:component>
+ </sm:activationSpec>
+ <sm:activationSpec id="localReceiver" service="localReceiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+ </sm:component>
+ </sm:activationSpec>
+ <sm:activationSpec id="clusteredReceiver" service="clusteredReceiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+ </sm:component>
+ </sm:activationSpec>
+ </sm:activationSpecs>
+ </sm:container>
+
+ <sm:container name="remote" flowNames="seda,jms" embedded="true">
+ <sm:activationSpecs>
+ <sm:activationSpec id="remoteReceiver" service="remoteReceiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+ </sm:component>
+ </sm:activationSpec>
+ <sm:activationSpec id="clusteredReceiver" service="clusteredReceiver">
+ <sm:component>
+ <bean class="org.apache.servicemix.tck.ReceiverComponent" />
+ </sm:component>
+ </sm:activationSpec>
+ </sm:activationSpecs>
+ </sm:container>
+
+</beans>
\ No newline at end of file