You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/12/15 12:10:07 UTC

svn commit: r1214700 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/command/NetworkBridgeFilter.java test/java/org/apache/activemq/network/SimpleNetworkTest.java

Author: dejanb
Date: Thu Dec 15 11:10:07 2011
New Revision: 1214700

URL: http://svn.apache.org/viewvc?rev=1214700&view=rev
Log:
fix for DuplexNetworkTest failure

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?rev=1214700&r1=1214699&r2=1214700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java Thu Dec 15 11:10:07 2011
@@ -16,17 +16,16 @@
  */
 package org.apache.activemq.command;
 
-import java.io.IOException;
-import java.util.Arrays;
-
-import javax.jms.JMSException;
-
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.Arrays;
+
 /**
  * @openwire:marshaller code="91"
  * 
@@ -106,6 +105,7 @@ public class NetworkBridgeFilter impleme
                 LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
                         + networkBrokerId + "), path: "
                         + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
+                return false;
             }
         }
         return true;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=1214700&r1=1214699&r2=1214700&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Thu Dec 15 11:10:07 2011
@@ -16,22 +16,9 @@
  */
 package org.apache.activemq.network;
 
-import java.net.URI;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicRequestor;
-import javax.jms.TopicSession;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.util.Wait;
@@ -42,6 +29,11 @@ import org.springframework.context.suppo
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.core.io.Resource;
 
+import javax.jms.*;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class SimpleNetworkTest extends org.apache.activemq.TestSupport {
 
     protected static final int MESSAGE_COUNT = 10;
@@ -89,7 +81,6 @@ public class SimpleNetworkTest extends o
     }
 
     public void testFiltering() throws Exception {
-
         MessageConsumer includedConsumer = remoteSession.createConsumer(included);
         MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded);
         MessageProducer includedProducer = localSession.createProducer(included);
@@ -109,7 +100,7 @@ public class SimpleNetworkTest extends o
         MessageProducer producer = localSession.createProducer(included);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
-        waitForConsumerRegistration(localBroker, 2);
+        waitForConsumerRegistration(localBroker, 2, included);
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             Message test = localSession.createTextMessage("test-" + i);
@@ -122,20 +113,23 @@ public class SimpleNetworkTest extends o
         assertNull(consumer2.receive(1000));
     }
 
-    private void waitForConsumerRegistration(final BrokerService brokerService, final int min) throws Exception {
+    private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
         assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
                 if (bridges.length > 0) {
-                    LOG.info(brokerService + " bridges "  + bridges);
+                    LOG.info(brokerService + " bridges "  + Arrays.toString(bridges));
                     DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
                     ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
                     LOG.info(brokerService + " bridge "  + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
                     if (!forwardingBridges.isEmpty()) {
-                        DemandSubscription demandSubscription = (DemandSubscription) forwardingBridges.values().toArray()[0];
-                        LOG.info(brokerService + " DemandSubscription "  + demandSubscription + ", size: " + demandSubscription.size());
-                        return demandSubscription.size() >= min;
+                        for (DemandSubscription demandSubscription : forwardingBridges.values()) {
+                            if (demandSubscription.getLocalInfo().getDestination().equals(destination)) {
+                                LOG.info(brokerService + " DemandSubscription "  + demandSubscription + ", size: " + demandSubscription.size());
+                                return demandSubscription.size() >= min;
+                            }
+                        }
                     }
                 }
                 return false;
@@ -220,8 +214,10 @@ public class SimpleNetworkTest extends o
     protected void doSetUp() throws Exception {
         remoteBroker = createRemoteBroker();
         remoteBroker.start();
+        remoteBroker.waitUntilStarted();
         localBroker = createLocalBroker();
         localBroker.start();
+        localBroker.waitUntilStarted();
         URI localURI = localBroker.getVmConnectorURI();
         ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
         fac.setAlwaysSyncSend(true);