You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/08/01 16:28:01 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6373

Repository: activemq
Updated Branches:
  refs/heads/master e73ab3483 -> eb9c584fb


https://issues.apache.org/jira/browse/AMQ-6373

More tests and cleanup


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eb9c584f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eb9c584f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eb9c584f

Branch: refs/heads/master
Commit: eb9c584fbd86e52b2cc7bb7d209068a8865fa7b4
Parents: e73ab34
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Aug 1 12:27:34 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Aug 1 12:27:34 2016 -0400

----------------------------------------------------------------------
 .../command/BrokerSubscriptionInfo.java         |  3 +-
 .../network/DurableSyncNetworkBridgeTest.java   | 63 ++++++++++++++--
 .../openwire/BrokerSubscriptionInfoTest.java    | 78 ++++++++++++++++++++
 3 files changed, 137 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
index 57f854a..0f6a11b 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
@@ -19,7 +19,8 @@ package org.apache.activemq.command;
 import org.apache.activemq.state.CommandVisitor;
 
 /**
- * Used to represent a durable subscription.
+ * Used to represent the durable subscriptions contained by the broker
+ * This is used to synchronize durable subs on bridge creation
  *
  * @openwire:marshaller code="92"
  *

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 3c4a2a0..f3314cc 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
@@ -32,12 +33,16 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.util.Wait.Condition;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -61,6 +66,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
     private Session session1;
     private final FLOW flow;
 
+    @Rule
+    public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
+
     @Parameters
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
@@ -209,6 +217,43 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
 
     }
 
+    @Test
+    public void testSyncLoadTest() throws Exception {
+        String subName = this.subName;
+        //Create 1000 subs
+        for (int i = 0; i < 100; i++) {
+            for (int j = 0; j < 10; j++) {
+                session1.createDurableSubscriber(new ActiveMQTopic("include.test." + i), subName + i + j).close();
+            }
+        }
+        for (int i = 0; i < 100; i++) {
+            assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
+        }
+
+        doTearDown();
+        restartBroker(broker1, false);
+
+        //with bridge off, remove 100 subs
+        for (int i = 0; i < 10; i++) {
+            for (int j = 0; j < 10; j++) {
+                removeSubscription(broker1, new ActiveMQTopic("include.test." + i), subName + i + j);
+            }
+        }
+
+        //restart test that 900 are resynced and 100 are deleted
+        restartBrokers(true);
+
+        for (int i = 0; i < 10; i++) {
+            assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 0);
+        }
+
+        for (int i = 10; i < 100; i++) {
+            assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
+        }
+
+    }
+
+
     /**
      * Using an older version of openwire should not sync but the network bridge
      * should still start without error
@@ -395,10 +440,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         localConnection.setClientID("clientId");
         localConnection.start();
 
-        if (startNetworkConnector) {        // brokerService.setPlugins(new BrokerPlugin[] {new
-            // JavaRuntimeConfigurationPlugin()});
-            // brokerService.setUseVirtualDestSubs(true);
-            // brokerService.setUseVirtualDestSubsOnCreation(isUsevirtualDestinationSubscriptionsOnCreation);
+        if (startNetworkConnector) {
             Wait.waitFor(new Condition() {
                 @Override
                 public boolean isSatisified() throws Exception {
@@ -439,8 +481,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
     protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception {
         BrokerService brokerService = new BrokerService();
         brokerService.setMonitorConnectionSplits(true);
-        brokerService.setDataDirectoryFile(dataDir);
         brokerService.setBrokerName("localBroker");
+        brokerService.setDataDirectoryFile(dataDir);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dataDir);
+        adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+        brokerService.setPersistenceAdapter(adapter);
 
         if (startNetworkConnector) {
             brokerService.addNetworkConnector(configureLocalNetworkConnector());
@@ -477,10 +523,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
         brokerService.setBrokerName("remoteBroker");
         brokerService.setUseJmx(false);
         brokerService.setDataDirectoryFile(dataDir);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dataDir);
+        adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+        brokerService.setPersistenceAdapter(adapter);
 
         remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
 
-        brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion);
+        //Need a larger cache size in order to handle all of the durables
+        brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.cacheSize=2048&wireFormat.version=" + remoteBrokerWireFormatVersion);
 
         return brokerService;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java
new file mode 100644
index 0000000..9075348
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.command.BrokerSubscriptionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerSubscriptionInfoTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(BrokerSubscriptionInfoTest.class);
+
+
+
+    @Test
+    public void testMarshalClientProperties() throws IOException {
+        // marshal object
+        OpenWireFormatFactory factory = new OpenWireFormatFactory();
+        factory.setCacheEnabled(true);
+        OpenWireFormat wf = (OpenWireFormat)factory.createWireFormat();
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        DataOutputStream ds = new DataOutputStream(buffer);
+
+        ConsumerInfo info = new ConsumerInfo();
+        info.setClientId("clientId");
+        info.setConsumerId(new ConsumerId());
+
+        int size = 1000;
+
+
+        ConsumerInfo infos[] = new ConsumerInfo[size];
+        for (int i = 0; i < infos.length; i++) {
+            infos[i] = info;
+        }
+
+        BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo();
+        bsi.setSubscriptionInfos(infos);
+
+        wf.marshal(bsi, ds);
+        ds.close();
+
+        // unmarshal object and check that the properties are present.
+        ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
+        DataInputStream dis = new DataInputStream(in);
+        BrokerSubscriptionInfo actual = (BrokerSubscriptionInfo) wf.unmarshal(dis);
+
+        //assertTrue(actual instanceof BrokerSubscriptionInfo);
+        assertEquals(size, actual.getSubscriptionInfos().length);
+    }
+
+
+}