You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/04/19 16:01:07 UTC

[3/3] activemq-artemis git commit: ARTEMIS-490 Adding test with LargeMessage.copy through replication

ARTEMIS-490 Adding test with LargeMessage.copy through replication

I'm keeping a commit just for the test so it would be easier to replicate the bug.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dcf65137
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dcf65137
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dcf65137

Branch: refs/heads/master
Commit: dcf651376f1e94f21c818c66624fcdf439a6e760
Parents: b4a6427
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Apr 18 17:46:55 2016 -0400
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue Apr 19 15:00:09 2016 +0100

----------------------------------------------------------------------
 .../divert/ReplicationWithDivertTest.java       | 258 +++++++++++++++++++
 1 file changed, 258 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dcf65137/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java
new file mode 100644
index 0000000..f641b40
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.divert;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
+import org.apache.activemq.artemis.api.core.client.FailoverEventType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicationWithDivertTest extends ActiveMQTestBase {
+
+   public static final String JMS_SOURCE_QUEUE = "Queue";
+   public static final String SOURCE_QUEUE = "jms.queue." + JMS_SOURCE_QUEUE;
+   public static final String JMS_TARGET_QUEUE = "DestQueue";
+   public static final String TARGET_QUEUE = "jms.queue." + JMS_TARGET_QUEUE;
+   public static int messageChunkCount = 0;
+
+   private static ActiveMQServer backupServer;
+   private static ActiveMQServer liveServer;
+
+   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=-1&producerWindowSize=10000");
+   ActiveMQConnection connection;
+   Session session;
+   Queue queue;
+   Queue targetQueue;
+   MessageProducer producer;
+
+   Configuration backupConfig;
+
+   Configuration liveConfig;
+
+   // To inform the main thread the condition is met
+   static final ReusableLatch flagChunkEntered = new ReusableLatch(1);
+   // To wait while the condition is worked out
+   static final ReusableLatch flagChunkWait = new ReusableLatch(1);
+
+   // To inform the main thread the condition is met
+   static final ReusableLatch flagSyncEntered = new ReusableLatch(1);
+   // To wait while the condition is worked out
+   static final ReusableLatch flagSyncWait = new ReusableLatch(1);
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      System.out.println("Tmp::" + getTemporaryDir());
+
+      flagChunkEntered.setCount(1);
+      flagChunkWait.setCount(1);
+
+      flagSyncEntered.setCount(1);
+      flagSyncWait.setCount(1);
+
+      messageChunkCount = 0;
+
+      TransportConfiguration liveConnector = TransportConfigurationUtils.getNettyConnector(true, 0);
+      TransportConfiguration liveAcceptor = TransportConfigurationUtils.getNettyAcceptor(true, 0);
+      TransportConfiguration backupConnector = TransportConfigurationUtils.getNettyConnector(false, 0);
+      TransportConfiguration backupAcceptor = TransportConfigurationUtils.getNettyAcceptor(false, 0);
+
+      backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)).
+         setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).
+         setLargeMessagesDirectory(getLargeMessagesDir(0, true));
+      backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE));
+      backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE));
+
+
+      DivertConfiguration divertConfiguration = new DivertConfiguration().setName("Test").setAddress(SOURCE_QUEUE).setForwardingAddress(TARGET_QUEUE).setRoutingName("Test");
+
+
+      liveConfig = createDefaultInVMConfig();
+      liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true));
+      liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true));
+      liveConfig.addDivertConfiguration(divertConfiguration);
+
+      backupConfig.addDivertConfiguration(divertConfiguration);
+
+      ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor);
+
+      liveServer = createServer(liveConfig);
+      liveServer.start();
+
+      startBackup();
+
+      waitForServerToStart(liveServer);
+
+      // Just to make sure the expression worked
+      Assert.assertEquals(10000, factory.getMinLargeMessageSize());
+      Assert.assertEquals(10000, factory.getProducerWindowSize());
+      Assert.assertEquals(100, factory.getRetryInterval());
+      Assert.assertEquals(-1, factory.getReconnectAttempts());
+      Assert.assertTrue(factory.isHA());
+
+      connection = (ActiveMQConnection) factory.createConnection();
+      session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      queue = session.createQueue(JMS_SOURCE_QUEUE);
+      targetQueue = session.createQueue(JMS_TARGET_QUEUE);
+
+      producer = session.createProducer(queue);
+
+   }
+
+   private void startBackup() throws Exception {
+      backupServer = createServer(backupConfig);
+      backupServer.start();
+
+      waitForServerToStart(backupServer);
+
+   }
+
+   @After
+   public void stopServers() throws Exception {
+      if (connection != null) {
+         try {
+            connection.close();
+         }
+         catch (Exception e) {
+         }
+      }
+      if (backupServer != null) {
+         backupServer.stop();
+         backupServer = null;
+      }
+
+      if (liveServer != null) {
+         liveServer.stop();
+         liveServer = null;
+      }
+
+      backupServer = liveServer = null;
+   }
+
+   @Test
+   public void testSendLargeMessage() throws Exception {
+
+      final CountDownLatch failedOver = new CountDownLatch(1);
+      connection.setFailoverListener(new FailoverEventListener() {
+         @Override
+         public void failoverEvent(FailoverEventType eventType) {
+            failedOver.countDown();
+         }
+      });
+      Thread t;
+
+      final int numberOfMessage = 5;
+      {
+         final MapMessage message = createLargeMessage();
+
+         t = new Thread() {
+            public void run() {
+               try {
+                  for (int i = 0; i < numberOfMessage; i++) {
+                     producer.send(message);
+                     session.commit();
+                  }
+               }
+               catch (JMSException expected) {
+                  expected.printStackTrace();
+               }
+            }
+         };
+      }
+
+      t.start();
+
+      t.join(10000);
+
+      {
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         connection.start();
+
+         for (int msgi = 0; msgi < numberOfMessage; msgi++) {
+            MapMessage message = (MapMessage) consumer.receive(5000);
+
+            Assert.assertNotNull(message);
+
+            for (int i = 0; i < 10; i++) {
+               Assert.assertEquals(200 * 1024, message.getBytes("test" + i).length);
+            }
+
+            session.commit();
+         }
+         consumer.close();
+      }
+
+      Assert.assertFalse(t.isAlive());
+      liveServer.stop(true);
+      Assert.assertTrue(failedOver.await(10, TimeUnit.SECONDS));
+
+      {
+         MessageConsumer consumer = session.createConsumer(targetQueue);
+
+         connection.start();
+
+         for (int msgi = 0; msgi < numberOfMessage; msgi++) {
+            MapMessage message = (MapMessage) consumer.receive(5000);
+
+            Assert.assertNotNull(message);
+
+            for (int i = 0; i < 10; i++) {
+               Assert.assertEquals(200 * 1024, message.getBytes("test" + i).length);
+            }
+
+            session.commit();
+         }
+
+         consumer.close();
+      }
+   }
+
+   private MapMessage createLargeMessage() throws JMSException {
+      MapMessage message = session.createMapMessage();
+
+      for (int i = 0; i < 10; i++) {
+         message.setBytes("test" + i, new byte[200 * 1024]);
+      }
+      return message;
+   }
+
+}
\ No newline at end of file