You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/09 19:52:37 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5621

Repository: activemq
Updated Branches:
  refs/heads/master f10aab642 -> 21c3ba358


https://issues.apache.org/jira/browse/AMQ-5621

Clean up some warning, remove System.out calls, remove references to
static ports.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/21c3ba35
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/21c3ba35
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/21c3ba35

Branch: refs/heads/master
Commit: 21c3ba35824ce09aa69ed785397d041a98f8d18f
Parents: f10aab6
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jul 9 13:52:30 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jul 9 13:52:30 2015 -0400

----------------------------------------------------------------------
 .../org/apache/activemq/bugs/AMQ2512Test.java   | 82 +++++++++++++-------
 .../org/apache/activemq/bugs/AMQ5266Test.java   | 43 ++++------
 2 files changed, 66 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/21c3ba35/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
index 669066e..a290549 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
@@ -33,31 +34,41 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2512Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ2512Test.class);
 
-public class AMQ2512Test extends EmbeddedBrokerTestSupport {
-    private static Connection connection;
-    private final static String QUEUE_NAME = "dee.q";
-    private final static int INITIAL_MESSAGES_CNT = 1000;
-    private final static int WORKER_INTERNAL_ITERATIONS = 100;
-    private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS
-            + INITIAL_MESSAGES_CNT;
-    private final static byte[] payload = new byte[5 * 1024];
-    private final static String TEXT = new String(payload);
+    private final String QUEUE_NAME = "dee.q";
+    private final int INITIAL_MESSAGES_CNT = 1000;
+    private final int WORKER_INTERNAL_ITERATIONS = 100;
+    private final int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS + INITIAL_MESSAGES_CNT;
+    private final byte[] payload = new byte[5 * 1024];
+    private final String TEXT = new String(payload);
 
-    private final static String PRP_INITIAL_ID = "initial-id";
-    private final static String PRP_WORKER_ID = "worker-id";
+    private final String PRP_INITIAL_ID = "initial-id";
+    private final String PRP_WORKER_ID = "worker-id";
 
-    private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
+    private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
+    private final AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
 
-    private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger();
+    private BrokerService brokerService;
+    private Connection connection;
+    private String connectionURI;
 
+    @Test(timeout = 60000)
     public void testKahaDBFailure() throws Exception {
-        final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress);
+        final ConnectionFactory fac = new ActiveMQConnectionFactory(connectionURI);
         connection = fac.createConnection();
         final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         final Queue queue = session.createQueue(QUEUE_NAME);
@@ -80,18 +91,20 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
 
         LATCH.await();
         final long endTime = System.nanoTime();
-        System.out.println("Total execution time = "
+        LOG.info("Total execution time = "
                 + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms].");
-        System.out.println("Rate = " + TOTAL_MESSAGES_CNT
+        LOG.info("Rate = " + TOTAL_MESSAGES_CNT
                 / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s].");
 
         for (Consumer c : consumers) {
             c.close();
         }
+
         connection.close();
     }
 
-    private final static class Consumer implements MessageListener {
+    private final class Consumer implements MessageListener {
+
         private final String name;
         private final Session session;
         private final MessageProducer producer;
@@ -111,6 +124,7 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
             }
         }
 
+        @Override
         public void onMessage(Message message) {
             final TextMessage msg = (TextMessage) message;
             try {
@@ -130,7 +144,7 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
             } finally {
                 final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement();
                 if (onMsgCounter % 1000 == 0) {
-                    System.out.println("message received: " + onMsgCounter);
+                    LOG.info("message received: " + onMsgCounter);
                 }
                 LATCH.countDown();
             }
@@ -148,27 +162,37 @@ public class AMQ2512Test extends EmbeddedBrokerTestSupport {
         }
     }
 
-    @Override
-    protected void setUp() throws Exception {
-        bindAddress = "tcp://0.0.0.0:61617";
-        super.setUp();
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.start();
+
+        connectionURI = brokerService.getTransportConnectorByName("openwire").getPublishableConnectString();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
     }
 
-    @Override
     protected BrokerService createBroker() throws Exception {
         File dataFileDir = new File("target/test-amq-2512/datadb");
         IOHelper.mkdirs(dataFileDir);
         IOHelper.deleteChildren(dataFileDir);
+
         KahaDBStore kaha = new KahaDBStore();
-        kaha.setDirectory(dataFileDir); 
+        kaha.setDirectory(dataFileDir);
+        kaha.setEnableJournalDiskSyncs(false);
+
         BrokerService answer = new BrokerService();
         answer.setPersistenceAdapter(kaha);
-      
-        kaha.setEnableJournalDiskSyncs(false);
-        //kaha.setIndexCacheSize(10);
         answer.setDataDirectoryFile(dataFileDir);
         answer.setUseJmx(false);
-        answer.addConnector(bindAddress);
+        answer.addConnector("tcp://localhost:0").setName("openwire");
+
         return answer;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/21c3ba35/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
index e180746..d5ab676 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.bugs;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -24,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -32,6 +34,7 @@ import javax.jms.Queue;
 import javax.jms.QueueConnection;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
@@ -49,19 +52,16 @@ import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import static org.junit.Assert.assertEquals;
-
 /**
- * Stuck messages test client.
- * <p/>
- * Will kick of publisher and consumer simultaneously, and will usually result in stuck messages on the queue.
+ * Will kick of publisher and consumer simultaneously, and will usually result in
+ * stuck messages on the queue.
  */
 @RunWith(Parameterized.class)
 public class AMQ5266Test {
     static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
-    String activemqURL = "tcp://localhost:61617";
-    BrokerService brokerService;
+
+    private String activemqURL;
+    private BrokerService brokerService;
 
     public int messageSize = 1000;
 
@@ -167,7 +167,6 @@ public class AMQ5266Test {
 
         consumer = new ExportQueueConsumer(activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
 
-
         LOG.info("Starting Publisher...");
 
         publisher.start();
@@ -178,17 +177,15 @@ public class AMQ5266Test {
 
         int distinctPublishedCount = 0;
 
-
         LOG.info("Waiting For Publisher Completion...");
 
         publisher.waitForCompletion();
 
-        List publishedIds = publisher.getIDs();
-        distinctPublishedCount = new TreeSet(publishedIds).size();
+        List<String> publishedIds = publisher.getIDs();
+        distinctPublishedCount = new TreeSet<String>(publishedIds).size();
 
         LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", Distinct IDs Published: " + distinctPublishedCount);
 
-
         long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
         while (!consumer.completed() && System.currentTimeMillis() < endWait) {
             try {
@@ -221,7 +218,6 @@ public class AMQ5266Test {
             LOG.info(sb.toString());
 
             assertEquals("expect to get all messages!", 0, diff);
-
         }
     }
 
@@ -315,6 +311,7 @@ public class AMQ5266Test {
                 mp = session.createProducer(q);
             }
 
+            @Override
             public void run() {
 
                 try {
@@ -354,7 +351,6 @@ public class AMQ5266Test {
                 }
             }
         }
-
     }
 
     String messageText;
@@ -378,7 +374,6 @@ public class AMQ5266Test {
         return messageText;
     }
 
-
     public class ExportQueueConsumer {
 
         private final String amqUser = ActiveMQConnection.DEFAULT_USER;
@@ -428,11 +423,8 @@ public class AMQ5266Test {
 
         // Start the threads
         public void start() throws Exception {
-
             for (List<ConsumerThread> list : threads.values()) {
-
                 for (ConsumerThread ct : list) {
-
                     ct.start();
                 }
             }
@@ -441,19 +433,14 @@ public class AMQ5266Test {
         // Tell the threads to stop
         // Then wait for them to stop
         public void shutdown() throws Exception {
-
             for (List<ConsumerThread> list : threads.values()) {
-
                 for (ConsumerThread ct : list) {
-
                     ct.shutdown();
                 }
             }
 
             for (List<ConsumerThread> list : threads.values()) {
-
                 for (ConsumerThread ct : list) {
-
                     ct.join();
                 }
             }
@@ -517,6 +504,7 @@ public class AMQ5266Test {
                 idList = idsByQueue.get(queueName);
             }
 
+            @Override
             public void run() {
 
                 try {
@@ -554,13 +542,10 @@ public class AMQ5266Test {
                             session.commit();
                             count = 0;
 
-                            // Sleep a little before trying to read after not getting a message
-
                             try {
                                 if (idList.size() < totalToExpect) {
                                     LOG.info("did not receive on {}, current count: {}", qName, idList.size());
                                 }
-                                //sleep(3000);
                             } catch (Exception e) {
                             }
                         }
@@ -568,7 +553,6 @@ public class AMQ5266Test {
                 } catch (Exception e) {
                     e.printStackTrace();
                 } finally {
-
                     // Once we exit, close everything
                     close();
                 }
@@ -593,7 +577,6 @@ public class AMQ5266Test {
                 try {
                     qc.close();
                 } catch (Exception e) {
-
                 }
             }
         }