You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/05 15:37:13 UTC

svn commit: r941281 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Wed May  5 13:37:13 2010
New Revision: 941281

URL: http://svn.apache.org/viewvc?rev=941281&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2720 - ensure kahaDB getDestinations returns only durable topic destinations and add test case for duplicate messages in a network after restart

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=941281&r1=941280&r2=941281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed May  5 13:37:13 2010
@@ -696,7 +696,7 @@ public abstract class DemandForwardingBr
                         
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
+                            LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
                         }
                         
                         if (!message.isResponseRequired()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=941281&r1=941280&r2=941281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed May  5 13:37:13 2010
@@ -46,6 +46,7 @@ import org.apache.activemq.store.Persist
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
@@ -496,8 +497,22 @@ public class KahaDBStore extends Message
                     public void execute(Transaction tx) throws IOException {
                         for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
                             Entry<String, StoredDestination> entry = iterator.next();
-                            rc.add(convert(entry.getKey()));
+                            if (!isEmptyTopic(entry, tx)) {
+                                rc.add(convert(entry.getKey()));
+                            }
+                        }
+                    }
+
+                    private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) throws IOException {
+                        boolean isEmptyTopic = false;
+                        ActiveMQDestination dest = convert(entry.getKey());
+                        if (dest.isTopic()) {
+                            StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
+                            if (loadedStore.ackPositions.isEmpty()) {
+                                isEmptyTopic = true;
+                            }
                         }
+                        return isEmptyTopic;
                     }
                 });
             }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java?rev=941281&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java Wed May  5 13:37:13 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+
+
+public class ThreeBrokerVirtualTopicNetworkAMQPATest extends ThreeBrokerVirtualTopicNetworkTest {
+    
+     protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+        File dataFileDir = new File("target/test-amq-data/amq/" + broker.getBrokerName());
+        AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
+        adapter.setDirectory(dataFileDir);
+        broker.setPersistenceAdapter(adapter);
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java?rev=941281&r1=941280&r2=941281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java Wed May  5 13:37:13 2010
@@ -17,6 +17,7 @@
 package org.apache.activemq.usecases;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 
 import javax.jms.Destination;
@@ -98,23 +99,51 @@ public class ThreeBrokerVirtualTopicNetw
         waitForBridgeFormation();
         
         clientA = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
+        LOG.info("recreated clientA");
         
         Thread.sleep(2000);
 
-        sendMessages("BrokerA", dest, 2);
+        sendMessages("BrokerA", dest, 10);
 
         msgsA = getConsumerMessages("BrokerA", clientA);
 
-        msgsA.waitForMessagesToArrive(2);
-        msgsB.waitForMessagesToArrive(3);
-        msgsC.waitForMessagesToArrive(3);
+        msgsA.waitForMessagesToArrive(10);
+        msgsB.waitForMessagesToArrive(11);
+        msgsC.waitForMessagesToArrive(11);
 
         // ensure we don't get any more messages
         Thread.sleep(2000);
         
-        assertEquals(2, msgsA.getMessageCount());
-        assertEquals(3, msgsB.getMessageCount());
-        assertEquals(3, msgsC.getMessageCount());        
+        assertEquals(10, msgsA.getMessageCount());
+        assertEquals(11, msgsB.getMessageCount());
+        assertEquals(11, msgsC.getMessageCount());        
+        
+        // restart to ensure no hanging messages
+        LOG.info("Restarting brokerA again");
+        brokerItem = brokers.remove("BrokerA");
+        if (brokerItem != null) {
+            brokerItem.destroy();
+        }
+        
+        restartedBroker = createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?useJmx=false"));
+        bridgeAndConfigureBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
+        bridgeAndConfigureBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
+        restartedBroker.start();
+        waitForBridgeFormation();
+        
+        clientA = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
+        LOG.info("recreated clientA again");
+        
+        Thread.sleep(2000);
+
+        msgsA = getConsumerMessages("BrokerA", clientA);
+
+        // ensure we don't get any more messages
+        Thread.sleep(5000);
+        
+        assertEquals(0, msgsA.getMessageCount());
+        assertEquals(11, msgsB.getMessageCount());
+        assertEquals(11, msgsC.getMessageCount());
     }
     
 
@@ -135,10 +164,7 @@ public class ThreeBrokerVirtualTopicNetw
     private BrokerService createAndConfigureBroker(URI uri) throws Exception {
         BrokerService broker = createBroker(uri);
         
-        File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
-        KahaDBStore kaha = new KahaDBStore();
-        kaha.setDirectory(dataFileDir);
-        broker.setPersistenceAdapter(kaha);
+        configurePersistenceAdapter(broker);
         
         // make all topics virtual and consumers use the default prefix
         VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
@@ -147,4 +173,12 @@ public class ThreeBrokerVirtualTopicNetw
         broker.setDestinationInterceptors(destinationInterceptors);
         return broker;
     }
-}
+    
+    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+        File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(dataFileDir);
+        broker.setPersistenceAdapter(kaha);
+    }
+
+}
\ No newline at end of file