You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/08/01 16:28:01 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6373
Repository: activemq
Updated Branches:
refs/heads/master e73ab3483 -> eb9c584fb
https://issues.apache.org/jira/browse/AMQ-6373
More tests and cleanup
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eb9c584f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eb9c584f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eb9c584f
Branch: refs/heads/master
Commit: eb9c584fbd86e52b2cc7bb7d209068a8865fa7b4
Parents: e73ab34
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Aug 1 12:27:34 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Aug 1 12:27:34 2016 -0400
----------------------------------------------------------------------
.../command/BrokerSubscriptionInfo.java | 3 +-
.../network/DurableSyncNetworkBridgeTest.java | 63 ++++++++++++++--
.../openwire/BrokerSubscriptionInfoTest.java | 78 ++++++++++++++++++++
3 files changed, 137 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
index 57f854a..0f6a11b 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/BrokerSubscriptionInfo.java
@@ -19,7 +19,8 @@ package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
- * Used to represent a durable subscription.
+ * Used to represent the durable subscriptions contained by the broker
+ * This is used to synchronize durable subs on bridge creation
*
* @openwire:marshaller code="92"
*
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 3c4a2a0..f3314cc 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -21,6 +21,7 @@ import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@@ -32,12 +33,16 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@@ -61,6 +66,9 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
private Session session1;
private final FLOW flow;
+ @Rule
+ public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
+
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
@@ -209,6 +217,43 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
+ @Test
+ public void testSyncLoadTest() throws Exception {
+ String subName = this.subName;
+ //Create 1000 subs
+ for (int i = 0; i < 100; i++) {
+ for (int j = 0; j < 10; j++) {
+ session1.createDurableSubscriber(new ActiveMQTopic("include.test." + i), subName + i + j).close();
+ }
+ }
+ for (int i = 0; i < 100; i++) {
+ assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
+ }
+
+ doTearDown();
+ restartBroker(broker1, false);
+
+ //with bridge off, remove 100 subs
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ removeSubscription(broker1, new ActiveMQTopic("include.test." + i), subName + i + j);
+ }
+ }
+
+ //restart test that 900 are resynced and 100 are deleted
+ restartBrokers(true);
+
+ for (int i = 0; i < 10; i++) {
+ assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 0);
+ }
+
+ for (int i = 10; i < 100; i++) {
+ assertNCDurableSubsCount(broker2, new ActiveMQTopic("include.test." + i), 1);
+ }
+
+ }
+
+
/**
* Using an older version of openwire should not sync but the network bridge
* should still start without error
@@ -395,10 +440,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
localConnection.setClientID("clientId");
localConnection.start();
- if (startNetworkConnector) { // brokerService.setPlugins(new BrokerPlugin[] {new
- // JavaRuntimeConfigurationPlugin()});
- // brokerService.setUseVirtualDestSubs(true);
- // brokerService.setUseVirtualDestSubsOnCreation(isUsevirtualDestinationSubscriptionsOnCreation);
+ if (startNetworkConnector) {
Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
@@ -439,8 +481,12 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
protected BrokerService createLocalBroker(File dataDir, boolean startNetworkConnector) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
- brokerService.setDataDirectoryFile(dataDir);
brokerService.setBrokerName("localBroker");
+ brokerService.setDataDirectoryFile(dataDir);
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(dataDir);
+ adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+ brokerService.setPersistenceAdapter(adapter);
if (startNetworkConnector) {
brokerService.addNetworkConnector(configureLocalNetworkConnector());
@@ -477,10 +523,15 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
brokerService.setBrokerName("remoteBroker");
brokerService.setUseJmx(false);
brokerService.setDataDirectoryFile(dataDir);
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(dataDir);
+ adapter.setJournalDiskSyncStrategy(JournalDiskSyncStrategy.PERIODIC.name());
+ brokerService.setPersistenceAdapter(adapter);
remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
- brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.version=" + remoteBrokerWireFormatVersion);
+ //Need a larger cache size in order to handle all of the durables
+ brokerService.addConnector("tcp://localhost:" + port + "?wireFormat.cacheSize=2048&wireFormat.version=" + remoteBrokerWireFormatVersion);
return brokerService;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/eb9c584f/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java
new file mode 100644
index 0000000..9075348
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/BrokerSubscriptionInfoTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.activemq.command.BrokerSubscriptionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerSubscriptionInfoTest {
+
+ static final Logger LOG = LoggerFactory.getLogger(BrokerSubscriptionInfoTest.class);
+
+
+
+ @Test
+ public void testMarshalClientProperties() throws IOException {
+ // marshal object
+ OpenWireFormatFactory factory = new OpenWireFormatFactory();
+ factory.setCacheEnabled(true);
+ OpenWireFormat wf = (OpenWireFormat)factory.createWireFormat();
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ DataOutputStream ds = new DataOutputStream(buffer);
+
+ ConsumerInfo info = new ConsumerInfo();
+ info.setClientId("clientId");
+ info.setConsumerId(new ConsumerId());
+
+ int size = 1000;
+
+
+ ConsumerInfo infos[] = new ConsumerInfo[size];
+ for (int i = 0; i < infos.length; i++) {
+ infos[i] = info;
+ }
+
+ BrokerSubscriptionInfo bsi = new BrokerSubscriptionInfo();
+ bsi.setSubscriptionInfos(infos);
+
+ wf.marshal(bsi, ds);
+ ds.close();
+
+ // unmarshal object and check that the properties are present.
+ ByteArrayInputStream in = new ByteArrayInputStream(buffer.toByteArray());
+ DataInputStream dis = new DataInputStream(in);
+ BrokerSubscriptionInfo actual = (BrokerSubscriptionInfo) wf.unmarshal(dis);
+
+ //assertTrue(actual instanceof BrokerSubscriptionInfo);
+ assertEquals(size, actual.getSubscriptionInfos().length);
+ }
+
+
+}