You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/05 15:37:13 UTC
svn commit: r941281 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/
main/java/org/apache/activemq/store/kahadb/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Wed May 5 13:37:13 2010
New Revision: 941281
URL: http://svn.apache.org/viewvc?rev=941281&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2720 - ensure kahaDB getDestinations returns only durable topic destinations and add test case for duplicate messages in a network after restart
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=941281&r1=941280&r2=941281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed May 5 13:37:13 2010
@@ -696,7 +696,7 @@ public abstract class DemandForwardingBr
Message message = configureMessage(md);
if (LOG.isDebugEnabled()) {
- LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
+ LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
}
if (!message.isResponseRequired()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=941281&r1=941280&r2=941281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Wed May 5 13:37:13 2010
@@ -46,6 +46,7 @@ import org.apache.activemq.store.Persist
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
@@ -496,8 +497,22 @@ public class KahaDBStore extends Message
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
- rc.add(convert(entry.getKey()));
+ if (!isEmptyTopic(entry, tx)) {
+ rc.add(convert(entry.getKey()));
+ }
+ }
+ }
+
+ private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) throws IOException {
+ boolean isEmptyTopic = false;
+ ActiveMQDestination dest = convert(entry.getKey());
+ if (dest.isTopic()) {
+ StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
+ if (loadedStore.ackPositions.isEmpty()) {
+ isEmptyTopic = true;
+ }
}
+ return isEmptyTopic;
}
});
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java?rev=941281&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java Wed May 5 13:37:13 2010
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+
+
+public class ThreeBrokerVirtualTopicNetworkAMQPATest extends ThreeBrokerVirtualTopicNetworkTest {
+
+ protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+ File dataFileDir = new File("target/test-amq-data/amq/" + broker.getBrokerName());
+ AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
+ adapter.setDirectory(dataFileDir);
+ broker.setPersistenceAdapter(adapter);
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkAMQPATest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java?rev=941281&r1=941280&r2=941281&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.java Wed May 5 13:37:13 2010
@@ -17,6 +17,7 @@
package org.apache.activemq.usecases;
import java.io.File;
+import java.io.IOException;
import java.net.URI;
import javax.jms.Destination;
@@ -98,23 +99,51 @@ public class ThreeBrokerVirtualTopicNetw
waitForBridgeFormation();
clientA = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
+ LOG.info("recreated clientA");
Thread.sleep(2000);
- sendMessages("BrokerA", dest, 2);
+ sendMessages("BrokerA", dest, 10);
msgsA = getConsumerMessages("BrokerA", clientA);
- msgsA.waitForMessagesToArrive(2);
- msgsB.waitForMessagesToArrive(3);
- msgsC.waitForMessagesToArrive(3);
+ msgsA.waitForMessagesToArrive(10);
+ msgsB.waitForMessagesToArrive(11);
+ msgsC.waitForMessagesToArrive(11);
// ensure we don't get any more messages
Thread.sleep(2000);
- assertEquals(2, msgsA.getMessageCount());
- assertEquals(3, msgsB.getMessageCount());
- assertEquals(3, msgsC.getMessageCount());
+ assertEquals(10, msgsA.getMessageCount());
+ assertEquals(11, msgsB.getMessageCount());
+ assertEquals(11, msgsC.getMessageCount());
+
+ // restart to ensure no hanging messages
+ LOG.info("Restarting brokerA again");
+ brokerItem = brokers.remove("BrokerA");
+ if (brokerItem != null) {
+ brokerItem.destroy();
+ }
+
+ restartedBroker = createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?useJmx=false"));
+ bridgeAndConfigureBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
+ bridgeAndConfigureBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
+ restartedBroker.start();
+ waitForBridgeFormation();
+
+ clientA = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
+ LOG.info("recreated clientA again");
+
+ Thread.sleep(2000);
+
+ msgsA = getConsumerMessages("BrokerA", clientA);
+
+ // ensure we don't get any more messages
+ Thread.sleep(5000);
+
+ assertEquals(0, msgsA.getMessageCount());
+ assertEquals(11, msgsB.getMessageCount());
+ assertEquals(11, msgsC.getMessageCount());
}
@@ -135,10 +164,7 @@ public class ThreeBrokerVirtualTopicNetw
private BrokerService createAndConfigureBroker(URI uri) throws Exception {
BrokerService broker = createBroker(uri);
- File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(dataFileDir);
- broker.setPersistenceAdapter(kaha);
+ configurePersistenceAdapter(broker);
// make all topics virtual and consumers use the default prefix
VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
@@ -147,4 +173,12 @@ public class ThreeBrokerVirtualTopicNetw
broker.setDestinationInterceptors(destinationInterceptors);
return broker;
}
-}
+
+ protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
+ File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(dataFileDir);
+ broker.setPersistenceAdapter(kaha);
+ }
+
+}
\ No newline at end of file