You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/03/08 21:54:30 UTC

svn commit: r384328 - in /incubator/servicemix/trunk/servicemix-core/src: main/java/org/apache/servicemix/jbi/nmr/ main/java/org/apache/servicemix/jbi/nmr/flow/ main/java/org/apache/servicemix/jbi/nmr/flow/seda/ main/java/org/apache/servicemix/jbi/nmr/...

Author: gnodet
Date: Wed Mar  8 12:54:23 2006
New Revision: 384328

URL: http://svn.apache.org/viewcvs?rev=384328&view=rev
Log:
SM-319: multiple flows. last step. create a test case, and ensure everything works

Added:
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java
    incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml
Modified:
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/Broker.java Wed Mar  8 12:54:23 2006
@@ -383,7 +383,6 @@
         }
         if (theEndpoint != null) {
             exchange.setEndpoint(theEndpoint);
-            exchange.setDestinationId(((AbstractServiceEndpoint) theEndpoint).getComponentNameSpace());
         }
         if (log.isTraceEnabled()) {
             log.trace("Routing exchange " + exchange + " to: " + theEndpoint);

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Wed Mar  8 12:54:23 2006
@@ -211,45 +211,19 @@
     }
 
     protected boolean isClustered(MessageExchange me) {
-        ServiceEndpoint se = me.getEndpoint();
-        if (se == null) {
-            // Routing by service name
-            QName serviceName = me.getService();
-            QName interfaceName = me.getInterfaceName();
-            if (serviceName != null) {
-                ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForService(serviceName);
-                for (int i = 0; i < eps.length; i++) {
-                    if (eps[i] instanceof InternalEndpoint) {
-                        String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
-                        if (!name.equals(broker.getContainerName())) {
-                            return true;
-                        }
-                    }
-                }
-                return false;
-            // Routing by interface name
-            } else if (interfaceName != null) {
-                ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForInterface(interfaceName);
-                for (int i = 0; i < eps.length; i++) {
-                    if (eps[i] instanceof InternalEndpoint) {
-                        String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
-                        if (!name.equals(broker.getContainerName())) {
-                            return true;
-                        }
-                    }
-                }
-                return false;
+        MessageExchangeImpl mei = (MessageExchangeImpl) me;
+        if (mei.getDestinationId() == null) {
+            ServiceEndpoint se = me.getEndpoint();
+            if (se instanceof InternalEndpoint) {
+                return ((InternalEndpoint) se).isClustered();
+            // Unknown: assume this is not clustered
             } else {
-                // Should not happen
                 return false;
             }
-        // Routing by endpoint
-        } else if (se instanceof InternalEndpoint) {
-            String name = ((InternalEndpoint) se).getComponentNameSpace().getContainerName();
-            return !name.equals(broker.getContainerName());
-        // Unknown: assume this is not clustered
         } else {
-            return false;
+            String destination = mei.getDestinationId().getContainerName();
+            String source = mei.getSourceId().getContainerName();
+            return !source.equals(destination);
         }
     }
     

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java Wed Mar  8 12:54:23 2006
@@ -36,6 +36,7 @@
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.nmr.Broker;
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -144,6 +145,9 @@
         // If the message has been sent synchronously, do not use seda
         // as it would consume threads from the work manager in a useless
         // way.  This could lead to deadlocks.
+        if (me.getDestinationId() == null) {
+            me.setDestinationId(((AbstractServiceEndpoint) me.getEndpoint()).getComponentNameSpace());
+        }
         if (me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) == null &&
             me.getSyncState() == MessageExchangeImpl.SYNC_STATE_ASYNC &&
             me.getMirror().getSyncState() == MessageExchangeImpl.SYNC_STATE_ASYNC) {

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java?rev=384328&r1=384327&r2=384328&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java Wed Mar  8 12:54:23 2006
@@ -17,6 +17,7 @@
 
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.AbstractServiceEndpoint;
 
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
@@ -40,6 +41,9 @@
      * @throws MessagingException
      */
     protected void doSend(MessageExchangeImpl me) throws MessagingException {
+        if (me.getDestinationId() == null) {
+            me.setDestinationId(((AbstractServiceEndpoint) me.getEndpoint()).getComponentNameSpace());
+        }
         doRouting(me);
     }
     

Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java?rev=384328&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/MultipleFlowsTest.java Wed Mar  8 12:54:23 2006
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr.flow;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.jbi.container.SpringJBIContainer;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.Sender;
+import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
+
+public class MultipleFlowsTest extends TestCase {
+
+    private SpringJBIContainer localContainer;
+    private SpringJBIContainer remoteContainer;
+    
+    private Sender localSender;
+    private Sender remoteSender;
+    private Sender clusteredSender;
+    
+    private Receiver localReceiver;
+    private Receiver remoteReceiver;
+    private Receiver clusteredReceiver1;
+    private Receiver clusteredReceiver2;
+    
+    private AbstractXmlApplicationContext context;
+    
+    private int messageCount = 100;
+    
+    protected void setUp() throws Exception {
+        context = new ClassPathXmlApplicationContext("org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml");
+        localContainer = (SpringJBIContainer) context.getBean("local");
+        remoteContainer = (SpringJBIContainer) context.getBean("remote");
+        localSender = (Sender) localContainer.getBean("localSender");
+        remoteSender = (Sender) localContainer.getBean("remoteSender");
+        clusteredSender = (Sender) localContainer.getBean("clusteredSender");
+        localReceiver = (Receiver) localContainer.getBean("localReceiver");
+        remoteReceiver = (Receiver) remoteContainer.getBean("remoteReceiver");
+        clusteredReceiver1 = (Receiver) localContainer.getBean("clusteredReceiver");
+        clusteredReceiver2 = (Receiver) remoteContainer.getBean("clusteredReceiver");
+        Thread.sleep(1000);
+    }
+    
+    protected void tearDown() throws Exception {
+        context.close();
+    }
+    
+    public void test() throws Exception {
+        // Local
+        localSender.sendMessages(messageCount);
+        localReceiver.getMessageList().assertMessagesReceived(messageCount);
+
+        // Remote
+        remoteSender.sendMessages(messageCount);
+        remoteReceiver.getMessageList().assertMessagesReceived(messageCount);
+
+        // Clustered
+        clusteredSender.sendMessages(messageCount);
+        long t0 = System.currentTimeMillis();
+        int n1 = 0;
+        int n2 = 0;
+        while (System.currentTimeMillis() - t0 < 10000) {
+            n1 = clusteredReceiver1.getMessageList().getMessageCount();
+            n2 = clusteredReceiver2.getMessageList().getMessageCount();
+            if (n1 + n2 == messageCount) {
+                break;
+            }
+        }
+        assertEquals(messageCount, n1 + n2);
+        assertTrue(n1 > 0);
+        assertTrue(n2 > 0);
+    }
+
+}

Added: incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml?rev=384328&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/resources/org/apache/servicemix/jbi/nmr/flow/multiple-flows.xml Wed Mar  8 12:54:23 2006
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?> 
+<beans xmlns:sm="http://servicemix.apache.org/config/1.0" 
+       xmlns:foo="urn:foo"> 
+
+  <sm:container name="local" flowNames="seda,jms" embedded="true"> 
+    <sm:activationSpecs> 
+      <sm:activationSpec id="localSender" destinationService="localReceiver"> 
+        <sm:component> 
+          <bean class="org.apache.servicemix.tck.SenderComponent" /> 
+        </sm:component> 
+      </sm:activationSpec> 
+      <sm:activationSpec id="remoteSender" destinationService="remoteReceiver"> 
+        <sm:component> 
+          <bean class="org.apache.servicemix.tck.SenderComponent" /> 
+        </sm:component> 
+      </sm:activationSpec> 
+      <sm:activationSpec id="clusteredSender" destinationService="clusteredReceiver"> 
+        <sm:component> 
+          <bean class="org.apache.servicemix.tck.SenderComponent" /> 
+        </sm:component> 
+      </sm:activationSpec> 
+      <sm:activationSpec id="localReceiver" service="localReceiver"> 
+        <sm:component> 
+          <bean class="org.apache.servicemix.tck.ReceiverComponent" /> 
+        </sm:component> 
+      </sm:activationSpec> 
+      <sm:activationSpec id="clusteredReceiver" service="clusteredReceiver"> 
+        <sm:component> 
+          <bean class="org.apache.servicemix.tck.ReceiverComponent" /> 
+        </sm:component> 
+      </sm:activationSpec> 
+    </sm:activationSpecs> 
+  </sm:container> 
+  
+  <sm:container name="remote" flowNames="seda,jms" embedded="true"> 
+    <sm:activationSpecs> 
+      <sm:activationSpec id="remoteReceiver" service="remoteReceiver"> 
+        <sm:component> 
+          <bean class="org.apache.servicemix.tck.ReceiverComponent" /> 
+        </sm:component> 
+      </sm:activationSpec> 
+      <sm:activationSpec id="clusteredReceiver" service="clusteredReceiver"> 
+        <sm:component> 
+          <bean class="org.apache.servicemix.tck.ReceiverComponent" /> 
+        </sm:component> 
+      </sm:activationSpec> 
+    </sm:activationSpecs> 
+  </sm:container> 
+  
+</beans> 
\ No newline at end of file