You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/05 05:20:19 UTC
[2/3] activemq-artemis git commit: ARTEMIS-465 Testing possible races
through large message replication
ARTEMIS-465 Testing possible races through large message replication
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7da22ff1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7da22ff1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7da22ff1
Branch: refs/heads/master
Commit: 7da22ff10531511918523b0fb4be69eeddaaf515
Parents: b16b864
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Apr 4 18:36:36 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Apr 4 23:15:54 2016 -0400
----------------------------------------------------------------------
...eOnSyncLargeMessageOverReplication2Test.java | 291 +++++++++++++++++++
...ceOnSyncLargeMessageOverReplicationTest.java | 246 ++++++++++++++++
2 files changed, 537 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7da22ff1/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java
new file mode 100644
index 0000000..f8d1b04
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplication2Test.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/** This test will add more bytes to the large message while still syncing.
+ * At the time of writing I couldn't replicate any issues, but I'm keeping it here to validate the impl */
+@RunWith(BMUnitRunner.class)
+public class RaceOnSyncLargeMessageOverReplication2Test extends ActiveMQTestBase {
+
+ public static int messageChunkCount = 0;
+
+ private static final ReusableLatch ruleFired = new ReusableLatch(1);
+ private static ActiveMQServer backupServer;
+ private static ActiveMQServer liveServer;
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
+ ActiveMQConnection connection;
+ Session session;
+ Queue queue;
+ MessageProducer producer;
+
+ Configuration backupConfig;
+
+ Configuration liveConfig;
+
+ // To inform the main thread the condition is met
+ static final ReusableLatch flagArrived = new ReusableLatch(1);
+ // To wait while the condition is worked out
+ static final ReusableLatch flagWait = new ReusableLatch(1);
+
+ static final ReusableLatch flag15Arrived = new ReusableLatch(1);
+ // To wait while the condition is worked out
+ static final ReusableLatch flag15Wait = new ReusableLatch(1);
+
+ // To inform the main thread the condition is met
+ static final ReusableLatch flagSyncArrived = new ReusableLatch(1);
+ // To wait while the condition is worked out
+ static final ReusableLatch flagSyncWait = new ReusableLatch(1);
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ System.out.println("Tmp::" + getTemporaryDir());
+
+ flagArrived.setCount(1);
+ flagWait.setCount(1);
+
+ flag15Arrived.setCount(1);
+ flag15Wait.setCount(1);
+
+ ruleFired.setCount(1);
+ messageChunkCount = 0;
+
+ TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
+ TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
+ TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
+ TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
+
+ backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).
+ setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).
+ setLargeMessagesDirectory(getLargeMessagesDir(0, true));
+
+ liveConfig = createDefaultInVMConfig();
+
+ ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
+
+ liveServer = createServer(liveConfig);
+ liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue"));
+ liveServer.start();
+
+ waitForServerToStart(liveServer);
+
+ // Just to make sure the expression worked
+ Assert.assertEquals(10000, factory.getMinLargeMessageSize());
+ Assert.assertEquals(10000, factory.getProducerWindowSize());
+ Assert.assertEquals(100, factory.getRetryInterval());
+ Assert.assertEquals(-1, factory.getReconnectAttempts());
+ Assert.assertTrue(factory.isHA());
+
+ connection = (ActiveMQConnection) factory.createConnection();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ queue = session.createQueue("jms.queue.Queue");
+ producer = session.createProducer(queue);
+
+ }
+
+ private void startBackup() throws Exception {
+ backupServer = createServer(backupConfig);
+ backupServer.start();
+
+ waitForServerToStart(backupServer);
+
+ }
+
+ @After
+ public void stopServers() throws Exception {
+ if (connection != null) {
+ try {
+ connection.close();
+ }
+ catch (Exception e) {
+ }
+ }
+ if (backupServer != null) {
+ backupServer.stop();
+ backupServer = null;
+ }
+
+ if (liveServer != null) {
+ liveServer.stop();
+ liveServer = null;
+ }
+
+ backupServer = liveServer = null;
+ }
+
+ @Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "InterruptSending",
+ targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext",
+ targetMethod = "sendLargeMessageChunk",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnSyncLargeMessageOverReplication2Test.messageChunkSent();"), @BMRule(
+ name = "InterruptSync",
+ targetClass = "org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager",
+ targetMethod = "sendLargeMessageFiles",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnSyncLargeMessageOverReplication2Test.syncLargeMessage();")})
+ public void testSendLargeMessage() throws Exception {
+
+ final CountDownLatch failedOver = new CountDownLatch(1);
+ connection.setFailoverListener(new FailoverEventListener() {
+ @Override
+ public void failoverEvent(FailoverEventType eventType) {
+ failedOver.countDown();
+ }
+ });
+ Thread t;
+
+ {
+ final MapMessage message = createLargeMessage();
+
+ t = new Thread() {
+ public void run() {
+ try {
+ producer.send(message);
+ session.commit();
+ }
+ catch (JMSException expected) {
+ expected.printStackTrace();
+ }
+ }
+ };
+ }
+
+ t.start();
+
+ // I'm trying to simulate the following race here:
+ // The message is syncing while the client is already sending the body of the message
+
+ Assert.assertTrue(flagArrived.await(10, TimeUnit.SECONDS));
+
+ startBackup();
+
+ Assert.assertTrue(flagSyncArrived.await(10, TimeUnit.SECONDS));
+
+ flagWait.countDown();
+
+ Assert.assertTrue(flag15Arrived.await(10, TimeUnit.SECONDS));
+
+ flag15Wait.countDown();
+
+ t.join(5000);
+
+ flagSyncWait.countDown();
+
+ System.out.println("Thread joined");
+
+ Assert.assertFalse(t.isAlive());
+
+ waitForRemoteBackup(connection.getSessionFactory(), 30);
+
+
+ liveServer.stop(true);
+
+ Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS));
+
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connection.start();
+
+ MapMessage message = (MapMessage) consumer.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
+ }
+
+ session.commit();
+ }
+ }
+
+ public static void syncLargeMessage() {
+
+ try {
+ flagSyncArrived.countDown();
+ flagSyncWait.await(10, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void messageChunkSent() {
+ messageChunkCount++;
+
+ try {
+ if (messageChunkCount == 10) {
+ flagArrived.countDown();
+ flagWait.await(10, TimeUnit.SECONDS);
+ }
+ if (messageChunkCount == 15) {
+ flag15Arrived.countDown();
+ flag15Wait.await(10, TimeUnit.SECONDS);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private MapMessage createLargeMessage() throws JMSException {
+ MapMessage message = session.createMapMessage();
+
+ for (int i = 0; i < 10; i++) {
+ message.setBytes("test" + i, new byte[1024 * 1024]);
+ }
+ return message;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7da22ff1/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplicationTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplicationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplicationTest.java
new file mode 100644
index 0000000..d1bf50f
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/RaceOnSyncLargeMessageOverReplicationTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * This test will validate the sync of large messages will not lose any messages in a specific scenario.
+ */
+@RunWith(BMUnitRunner.class)
+public class RaceOnSyncLargeMessageOverReplicationTest extends ActiveMQTestBase {
+
+ private static ActiveMQServer backupServer;
+ private static ActiveMQServer liveServer;
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
+ ActiveMQConnection connection;
+ Session session;
+ Queue queue;
+ MessageProducer producer;
+
+ Configuration backupConfig;
+
+ Configuration liveConfig;
+
+ // To inform the main thread the condition is met
+ static final ReusableLatch flagArrived = new ReusableLatch(1);
+ // To wait while the condition is worked out
+ static final ReusableLatch flagWait = new ReusableLatch(1);
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ System.out.println("Tmp::" + getTemporaryDir());
+
+ flagArrived.setCount(1);
+ flagWait.setCount(1);
+
+ TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
+ TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
+ TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
+ TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
+
+ backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).
+ setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).
+ setLargeMessagesDirectory(getLargeMessagesDir(0, true));
+
+ liveConfig = createDefaultInVMConfig();
+
+ ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
+
+ liveServer = createServer(liveConfig);
+ liveServer.getConfiguration().addQueueConfiguration(new CoreQueueConfiguration().setName("jms.queue.Queue").setAddress("jms.queue.Queue"));
+ liveServer.start();
+
+ waitForServerToStart(liveServer);
+
+ // Just to make sure the expression worked
+ Assert.assertEquals(10000, factory.getMinLargeMessageSize());
+ Assert.assertEquals(10000, factory.getProducerWindowSize());
+ Assert.assertEquals(100, factory.getRetryInterval());
+ Assert.assertEquals(-1, factory.getReconnectAttempts());
+ Assert.assertTrue(factory.isHA());
+
+ connection = (ActiveMQConnection) factory.createConnection();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ queue = session.createQueue("jms.queue.Queue");
+ producer = session.createProducer(queue);
+
+ }
+
+ private void startBackup() throws Exception {
+ backupServer = createServer(backupConfig);
+ backupServer.start();
+
+ waitForServerToStart(backupServer);
+
+ }
+
+ @After
+ public void stopServers() throws Exception {
+ if (connection != null) {
+ try {
+ connection.close();
+ }
+ catch (Exception e) {
+ }
+ }
+ if (backupServer != null) {
+ backupServer.stop();
+ backupServer = null;
+ }
+
+ if (liveServer != null) {
+ liveServer.stop();
+ liveServer = null;
+ }
+
+ backupServer = liveServer = null;
+ }
+
+ /*
+ * simple test to induce a potential race condition where the server's acceptors are active, but the server's
+ * state != STARTED
+ */
+ @Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "InterruptSync",
+ targetClass = "org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager",
+ targetMethod = "createLargeMessage(long,org.apache.activemq.artemis.core.message.impl.MessageInternal)",
+ targetLocation = "EXIT",
+ action = "org.apache.activemq.artemis.tests.extras.byteman.RaceOnSyncLargeMessageOverReplicationTest.syncLargeMessage();")})
+ public void testSendLargeMessage() throws Exception {
+
+ final CountDownLatch failedOver = new CountDownLatch(1);
+ connection.setFailoverListener(new FailoverEventListener() {
+ @Override
+ public void failoverEvent(FailoverEventType eventType) {
+ failedOver.countDown();
+ }
+ });
+ Thread t;
+
+ {
+ final MapMessage message = createLargeMessage();
+
+ t = new Thread() {
+ public void run() {
+ try {
+ producer.send(message);
+ session.commit();
+ }
+ catch (JMSException expected) {
+ expected.printStackTrace();
+ }
+ }
+ };
+ }
+
+ t.start();
+
+ // I'm trying to simulate the following race here:
+ // The message is syncing while the client is already sending the body of the message
+
+ Assert.assertTrue(flagArrived.await(10, TimeUnit.SECONDS));
+
+ startBackup();
+
+ waitForRemoteBackup(connection.getSessionFactory(), 30);
+
+ flagWait.countDown();
+
+ t.join(5000);
+
+ System.out.println("Thread joined");
+
+ Assert.assertFalse(t.isAlive());
+
+
+ liveServer.stop(true);
+
+ Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS));
+
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connection.start();
+
+ MapMessage message = (MapMessage) consumer.receive(5000);
+
+ Assert.assertNotNull(message);
+
+ for (int i = 0; i < 10; i++) {
+ Assert.assertEquals(1024 * 1024, message.getBytes("test" + i).length);
+ }
+
+ session.commit();
+ }
+ }
+
+ public static void syncLargeMessage() {
+ try {
+ flagArrived.countDown();
+ flagWait.await(10, TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private MapMessage createLargeMessage() throws JMSException {
+ MapMessage message = session.createMapMessage();
+
+ for (int i = 0; i < 10; i++) {
+ message.setBytes("test" + i, new byte[1024 * 1024]);
+ }
+ return message;
+ }
+
+}
\ No newline at end of file