You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/17 20:34:14 UTC

git commit: Add missing license header

Updated Branches:
  refs/heads/trunk e7703f70e -> 9b00a0947


Add missing license header

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9b00a094
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9b00a094
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9b00a094

Branch: refs/heads/trunk
Commit: 9b00a0947e2cf8e603b89070afebf22123867f4f
Parents: e7703f7
Author: Timothy Bish <ta...@gmai.com>
Authored: Fri Jan 17 14:34:11 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Fri Jan 17 14:34:11 2014 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/bugs/AMQ4952Test.java   | 179 ++++++++++---------
 1 file changed, 98 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9b00a094/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
index 4e88e82..f9df753 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
@@ -1,12 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.activemq.bugs;
 
+import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.sql.DataSource;
+
 import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.*;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
@@ -15,49 +62,41 @@ import org.apache.activemq.util.Wait;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.*;
-import javax.jms.Connection;
-import javax.sql.DataSource;
-import java.net.URI;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.*;
-
 /**
- * Test creates a broker network with two brokers -
- * producerBroker (with a message producer attached) and consumerBroker (with consumer attached)
+ * Test creates a broker network with two brokers - producerBroker (with a
+ * message producer attached) and consumerBroker (with consumer attached)
  * <p/>
- * Simulates network duplicate message by stopping and restarting the consumerBroker after message (with message ID ending in
- * 120) is persisted to consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the network connection.
- * When the network connection is reestablished the producerBroker resends
- * message (with messageID ending in 120).
+ * Simulates network duplicate message by stopping and restarting the
+ * consumerBroker after message (with message ID ending in 120) is persisted to
+ * consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the
+ * network connection. When the network connection is reestablished the
+ * producerBroker resends message (with messageID ending in 120).
  * <p/>
  * Expectation:
  * <p/>
- * With the following policy entries set,  would  expect the duplicate message to be read from the store
- * and dispatched to the consumer - where the duplicate could be detected by consumer.
+ * With the following policy entries set, would expect the duplicate message to
+ * be read from the store and dispatched to the consumer - where the duplicate
+ * could be detected by consumer.
  * <p/>
- * PolicyEntry policy = new PolicyEntry();
- * policy.setQueue(">");
- * policy.setEnableAudit(false);
- * policy.setUseCache(false);
+ * PolicyEntry policy = new PolicyEntry(); policy.setQueue(">");
+ * policy.setEnableAudit(false); policy.setUseCache(false);
  * policy.setExpireMessagesPeriod(0);
  * <p/>
  * <p/>
- * Note 1: Network needs to use replaywhenNoConsumers so enabling the networkAudit to avoid this scenario is not feasible.
+ * Note 1: Network needs to use replaywhenNoConsumers so enabling the
+ * networkAudit to avoid this scenario is not feasible.
  * <p/>
- * NOTE 2: Added a custom plugin to the consumerBroker so that the consumerBroker shutdown will occur after a message has been
- * persisted to consumerBroker store but before an ACK is sent back to ProducerBroker. This is just a hack to ensure producerBroker will resend
- * the message after shutdown.
+ * NOTE 2: Added a custom plugin to the consumerBroker so that the
+ * consumerBroker shutdown will occur after a message has been persisted to
+ * consumerBroker store but before an ACK is sent back to ProducerBroker. This
+ * is just a hack to ensure producerBroker will resend the message after
+ * shutdown.
  */
 
 @RunWith(value = Parameterized.class)
@@ -81,9 +120,9 @@ public class AMQ4952Test extends TestCase {
     @Parameterized.Parameter(0)
     public boolean enableCursorAudit;
 
-    @Parameterized.Parameters(name="enableAudit={0}")
+    @Parameterized.Parameters(name = "enableAudit={0}")
     public static Iterable<Object[]> getTestParameters() {
-        return Arrays.asList(new Object[][]{{Boolean.TRUE},{Boolean.FALSE}});
+        return Arrays.asList(new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } });
     }
 
     @Test
@@ -106,7 +145,6 @@ public class AMQ4952Test extends TestCase {
                     Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                     MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME);
 
-
                     while (true) {
                         TextMessage textMsg = (TextMessage) messageConsumer.receive(5000);
 
@@ -117,14 +155,14 @@ public class AMQ4952Test extends TestCase {
                         receivedMessageCount++;
                         LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID());
 
-                        // on first delivery ensure the message is pending an ack when it is resent from the producer broker
+                        // on first delivery ensure the message is pending an
+                        // ack when it is resent from the producer broker
                         if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) {
                             LOG.info("Waiting for restart...");
                             consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS);
                         }
 
                         textMsg.acknowledge();
-
                     }
                 } finally {
                     consumerConnection.close();
@@ -133,21 +171,20 @@ public class AMQ4952Test extends TestCase {
         };
 
         Runnable consumerBrokerResetTask = new Runnable() {
+            @Override
             public void run() {
 
                 try {
                     // wait for signal
                     stopConsumerBroker.await();
 
-
                     LOG.info("********* STOPPING CONSUMER BROKER");
 
                     consumerBroker.stop();
                     consumerBroker.waitUntilStopped();
 
-
                     LOG.info("***** STARTING CONSUMER BROKER");
-                    // do not delete  messages on startup
+                    // do not delete messages on startup
                     consumerBroker = createConsumerBroker(false);
 
                     LOG.info("***** CONSUMER BROKER STARTED!!");
@@ -162,28 +199,24 @@ public class AMQ4952Test extends TestCase {
                     }));
                     consumerRestartedAndMessageForwarded.countDown();
 
-
                 } catch (Exception e) {
                     LOG.error("Exception when stopping/starting the consumerBroker ", e);
                 }
 
-
             }
         };
 
-
         ExecutorService executor = Executors.newFixedThreadPool(2);
 
-        //start consumerBroker start/stop task
+        // start consumerBroker start/stop task
         executor.execute(consumerBrokerResetTask);
 
-        //start consuming messages
+        // start consuming messages
         Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask);
 
-
         produceMessages();
 
-        //Wait for consumer to finish
+        // Wait for consumer to finish
         int totalMessagesConsumed = numberOfConsumedMessage.get();
 
         StringBuffer contents = new StringBuffer();
@@ -193,7 +226,6 @@ public class AMQ4952Test extends TestCase {
         assertEquals("number of messages received", 2, totalMessagesConsumed);
         assertEquals("messages left in store", true, messageInStore);
         assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ"));
-
     }
 
     private void produceMessages() throws JMSException {
@@ -253,20 +285,16 @@ public class AMQ4952Test extends TestCase {
         consumerBroker = createConsumerBroker(true);
     }
 
-
     /**
-     * Producer broker
-     * listens on  localhost:2003
-     * networks to consumerBroker - localhost:2006
+     * Producer broker listens on localhost:2003 networks to consumerBroker -
+     * localhost:2006
      *
      * @return
      * @throws Exception
      */
-
     protected BrokerService createProducerBroker() throws Exception {
 
-
-        String networkToPorts[] = new String[]{"2006"};
+        String networkToPorts[] = new String[] { "2006" };
         HashMap<String, String> networkProps = new HashMap<String, String>();
 
         networkProps.put("networkTTL", "10");
@@ -287,8 +315,7 @@ public class AMQ4952Test extends TestCase {
         transportConnectors.add(transportConnector);
         broker.setTransportConnectors(transportConnectors);
 
-
-        //network to consumerBroker
+        // network to consumerBroker
 
         if (networkToPorts != null && networkToPorts.length > 0) {
             StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false");
@@ -296,10 +323,10 @@ public class AMQ4952Test extends TestCase {
             if (networkProps != null) {
                 IntrospectionSupport.setProperties(nc, networkProps);
             }
-            nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQQueue[]{QUEUE_NAME}));
+            nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination> asList(new ActiveMQQueue[] { QUEUE_NAME }));
         }
 
-        //Persistence adapter
+        // Persistence adapter
 
         JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
         EmbeddedDataSource remoteDataSource = new EmbeddedDataSource();
@@ -308,7 +335,7 @@ public class AMQ4952Test extends TestCase {
         jdbc.setDataSource(remoteDataSource);
         broker.setPersistenceAdapter(jdbc);
 
-        //set Policy entries
+        // set Policy entries
         PolicyEntry policy = new PolicyEntry();
 
         policy.setQueue(">");
@@ -317,8 +344,7 @@ public class AMQ4952Test extends TestCase {
         policy.setExpireMessagesPeriod(0);
 
         // set replay with no consumers
-        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
-                new ConditionalNetworkBridgeFilterFactory();
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
         conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
         policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
 
@@ -332,16 +358,14 @@ public class AMQ4952Test extends TestCase {
         return broker;
     }
 
-
     /**
-     * consumerBroker
-     * - listens on localhost:2006
+     * consumerBroker - listens on localhost:2006
      *
-     * @param deleteMessages - drop messages when broker instance is created
+     * @param deleteMessages
+     *            - drop messages when broker instance is created
      * @return
      * @throws Exception
      */
-
     protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception {
 
         String scheme = "tcp";
@@ -358,7 +382,7 @@ public class AMQ4952Test extends TestCase {
         transportConnectors.add(transportConnector);
         broker.setTransportConnectors(transportConnectors);
 
-        //policy entries
+        // policy entries
 
         PolicyEntry policy = new PolicyEntry();
 
@@ -367,8 +391,7 @@ public class AMQ4952Test extends TestCase {
         policy.setExpireMessagesPeriod(0);
 
         // set replay with no consumers
-        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
-                new ConditionalNetworkBridgeFilterFactory();
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
         conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
         policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
 
@@ -377,7 +400,6 @@ public class AMQ4952Test extends TestCase {
         pMap.setDefaultEntry(policy);
         broker.setDestinationPolicy(pMap);
 
-
         // Persistence adapter
         JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter();
         EmbeddedDataSource localDataSource = new EmbeddedDataSource();
@@ -388,7 +410,7 @@ public class AMQ4952Test extends TestCase {
 
         if (deleteMessages) {
             // no plugin on restart
-            broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()});
+            broker.setPlugins(new BrokerPlugin[] { new MyTestPlugin() });
         }
 
         this.localDataSource = localDataSource;
@@ -399,7 +421,6 @@ public class AMQ4952Test extends TestCase {
         return broker;
     }
 
-
     /**
      * Query JDBC Store to see if messages are left
      *
@@ -407,9 +428,7 @@ public class AMQ4952Test extends TestCase {
      * @return
      * @throws SQLException
      */
-
-    private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer)
-            throws SQLException {
+    private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException {
 
         boolean tableHasData = false;
         String query = "select * from ACTIVEMQ_MSGS";
@@ -419,8 +438,6 @@ public class AMQ4952Test extends TestCase {
 
         ResultSet set = null;
 
-
-
         try {
             StringBuffer headers = new StringBuffer();
             set = s.executeQuery();
@@ -434,7 +451,6 @@ public class AMQ4952Test extends TestCase {
             }
             LOG.error(headers.toString());
 
-
             while (set.next()) {
                 tableHasData = true;
 
@@ -462,16 +478,16 @@ public class AMQ4952Test extends TestCase {
         return tableHasData;
     }
 
-
     /**
-     * plugin used to ensure consumerbroker is restared before the network message from producerBroker is acked
+     * plugin used to ensure consumerbroker is restared before the network
+     * message from producerBroker is acked
      */
     class MyTestPlugin implements BrokerPlugin {
 
+        @Override
         public Broker installPlugin(Broker broker) throws Exception {
             return new MyTestBroker(broker);
         }
-
     }
 
     class MyTestBroker extends BrokerFilter {
@@ -480,10 +496,11 @@ public class AMQ4952Test extends TestCase {
             super(next);
         }
 
+        @Override
         public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception {
 
             super.send(producerExchange, messageSend);
-            LOG.error("Stopping broker on send:  " +messageSend.getMessageId().getProducerSequenceId());
+            LOG.error("Stopping broker on send:  " + messageSend.getMessageId().getProducerSequenceId());
             stopConsumerBroker.countDown();
             producerExchange.getConnectionContext().setDontSendReponse(true);
         }