You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/06 17:58:33 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1776 Blocked Bridge is not
resuming after reconnect
Repository: activemq-artemis
Updated Branches:
refs/heads/master 57b9d979f -> 619b2cb2a
ARTEMIS-1776 Blocked Bridge is not resuming after reconnect
This is still part of ARTEMIS-1776 fix, which still part of the same release as we are on now.
Hence I'm not opening a new JIRA for this one.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e5bce133
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e5bce133
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e5bce133
Branch: refs/heads/master
Commit: e5bce13316f7e81bb15a12592622df2ea2632a35
Parents: 57b9d97
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Apr 6 10:00:43 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Apr 6 13:17:13 2018 -0400
----------------------------------------------------------------------
.../core/server/cluster/impl/BridgeImpl.java | 9 ++
.../integration/cluster/bridge/BridgeTest.java | 148 +++++++++++++++++++
2 files changed, 157 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5bce133/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index e40bc46..48f59f4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -220,6 +220,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
this.server = server;
}
+ /** For tests mainly */
+ public boolean isBlockedOnFlowControl() {
+ return blockedOnFlowControl;
+ }
+
public static final byte[] getDuplicateBytes(final UUID nodeUUID, final long messageID) {
byte[] bytes = new byte[24];
@@ -924,6 +929,10 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
+ // need to reset blockedOnFlowControl after creating a new producer
+ // otherwise in case the bridge was blocked before a previous failure
+ // this would never resume
+ blockedOnFlowControl = false;
producer = session.createProducer();
session.addFailureListener(BridgeImpl.this);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e5bce133/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
index ae60a61..2d6add7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java
@@ -75,10 +75,13 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@@ -251,6 +254,151 @@ public class BridgeTest extends ActiveMQTestBase {
System.out.println(timeTaken + "ms");
}
+ @Test
+ public void testBlockedBridgeAndReconnect() throws Exception {
+ long time = System.currentTimeMillis();
+ Map<String, Object> server0Params = new HashMap<>();
+ server0 = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<>();
+ addTargetParameters(server1Params);
+ server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+ server1.getAddressSettingsRepository().clear();
+ server1.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(10124 * 10).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+ server0.getAddressSettingsRepository().clear();
+ server0.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(Long.MAX_VALUE).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
+
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+
+ HashMap<String, TransportConfiguration> connectors = new HashMap<>();
+ connectors.put(server1tc.getName(), server1tc);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
+
+ final int messageSize = 1024;
+
+ final int numMessages = 1000;
+
+ ArrayList<String> connectorConfig = new ArrayList<>();
+ connectorConfig.add(server1tc.getName());
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setRetryInterval(100).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(true).setConfirmationWindowSize(numMessages * messageSize / 2).setStaticConnectors(connectorConfig).setProducerWindowSize(1024);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ server0.start();
+ locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
+ ClientSessionFactory sf0 = addSessionFactory(locator.createSessionFactory(server0tc));
+
+ ClientSessionFactory sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+
+ ClientSession session0 = sf0.createSession(false, true, 0);
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+
+ ClientSession session1 = sf1.createSession(true, true, 0);
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
+
+
+ session1.start();
+
+ final byte[] bytes = new byte[messageSize];
+
+ final SimpleString propKey = new SimpleString("testkey");
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session0.createMessage(true);
+
+ message.putIntProperty(propKey, i);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ producer0.send(message);
+
+ if (i % 100 == 0) {
+ session0.commit();
+ }
+ }
+ session0.commit();
+
+ for (int i = 0; i < numMessages / 2; i++) {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ message.acknowledge();
+ }
+ session1.commit();
+
+ BridgeImpl bridge = (BridgeImpl)server0.getClusterManager().getBridges().get("bridge1");
+
+ // stop in the middle. wait the bridge to block
+ Wait.assertTrue("bridge is never blocked", bridge::isBlockedOnFlowControl);
+
+ session1.close();
+ sf1.close();
+
+ // now restart the server.. the bridge should be reconnecting now
+ server1.stop();
+ server1.start();
+
+ sf1 = addSessionFactory(locator.createSessionFactory(server1tc));
+ session1 = sf1.createSession(true, true, 0);
+ consumer1 = session1.createConsumer(queueName1);
+ session1.start();
+
+ // consume the rest of the messages
+ for (int i = numMessages / 2; i < numMessages; i++) {
+ ClientMessage message = consumer1.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ Assert.assertEquals(i, message.getObjectProperty(propKey));
+
+ message.acknowledge();
+ }
+
+
+ Wait.assertEquals(0, server0.locateQueue(SimpleString.toSimpleString("queue0"))::getMessageCount);
+
+ Assert.assertNull(consumer1.receiveImmediate());
+
+ session0.close();
+
+ session1.close();
+
+ sf0.close();
+
+ sf1.close();
+
+ closeFields();
+ if (server0.getConfiguration().isPersistenceEnabled()) {
+ assertEquals(0, loadQueues(server0).size());
+ }
+ long timeTaken = System.currentTimeMillis() - time;
+ System.out.println(timeTaken + "ms");
+ }
+
public void internaltestSimpleBridge(final boolean largeMessage, final boolean useFiles) throws Exception {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, useFiles, server0Params);
[2/2] activemq-artemis git commit: This closes #1996
Posted by cl...@apache.org.
This closes #1996
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/619b2cb2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/619b2cb2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/619b2cb2
Branch: refs/heads/master
Commit: 619b2cb2a4a7abc4ae4caa5ab0bd160d1901505c
Parents: 57b9d97 e5bce13
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Apr 6 13:58:27 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Apr 6 13:58:27 2018 -0400
----------------------------------------------------------------------
.../core/server/cluster/impl/BridgeImpl.java | 9 ++
.../integration/cluster/bridge/BridgeTest.java | 148 +++++++++++++++++++
2 files changed, 157 insertions(+)
----------------------------------------------------------------------