You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/17 20:34:14 UTC
git commit: Add missing license header
Updated Branches:
refs/heads/trunk e7703f70e -> 9b00a0947
Add missing license header
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9b00a094
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9b00a094
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9b00a094
Branch: refs/heads/trunk
Commit: 9b00a0947e2cf8e603b89070afebf22123867f4f
Parents: e7703f7
Author: Timothy Bish <ta...@gmai.com>
Authored: Fri Jan 17 14:34:11 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Fri Jan 17 14:34:11 2014 -0500
----------------------------------------------------------------------
.../org/apache/activemq/bugs/AMQ4952Test.java | 179 ++++++++++---------
1 file changed, 98 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/9b00a094/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
index 4e88e82..f9df753 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
@@ -1,12 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.bugs;
+import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.sql.DataSource;
+
import junit.framework.TestCase;
+
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.*;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
@@ -15,49 +62,41 @@ import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.*;
-import javax.jms.Connection;
-import javax.sql.DataSource;
-import java.net.URI;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.*;
-
/**
- * Test creates a broker network with two brokers -
- * producerBroker (with a message producer attached) and consumerBroker (with consumer attached)
+ * Test creates a broker network with two brokers - producerBroker (with a
+ * message producer attached) and consumerBroker (with consumer attached)
* <p/>
- * Simulates network duplicate message by stopping and restarting the consumerBroker after message (with message ID ending in
- * 120) is persisted to consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the network connection.
- * When the network connection is reestablished the producerBroker resends
- * message (with messageID ending in 120).
+ * Simulates network duplicate message by stopping and restarting the
+ * consumerBroker after message (with message ID ending in 120) is persisted to
+ * consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the
+ * network connection. When the network connection is reestablished the
+ * producerBroker resends message (with messageID ending in 120).
* <p/>
* Expectation:
* <p/>
- * With the following policy entries set, would expect the duplicate message to be read from the store
- * and dispatched to the consumer - where the duplicate could be detected by consumer.
+ * With the following policy entries set, would expect the duplicate message to
+ * be read from the store and dispatched to the consumer - where the duplicate
+ * could be detected by consumer.
* <p/>
- * PolicyEntry policy = new PolicyEntry();
- * policy.setQueue(">");
- * policy.setEnableAudit(false);
- * policy.setUseCache(false);
+ * PolicyEntry policy = new PolicyEntry(); policy.setQueue(">");
+ * policy.setEnableAudit(false); policy.setUseCache(false);
* policy.setExpireMessagesPeriod(0);
* <p/>
* <p/>
- * Note 1: Network needs to use replaywhenNoConsumers so enabling the networkAudit to avoid this scenario is not feasible.
+ * Note 1: Network needs to use replaywhenNoConsumers so enabling the
+ * networkAudit to avoid this scenario is not feasible.
* <p/>
- * NOTE 2: Added a custom plugin to the consumerBroker so that the consumerBroker shutdown will occur after a message has been
- * persisted to consumerBroker store but before an ACK is sent back to ProducerBroker. This is just a hack to ensure producerBroker will resend
- * the message after shutdown.
+ * NOTE 2: Added a custom plugin to the consumerBroker so that the
+ * consumerBroker shutdown will occur after a message has been persisted to
+ * consumerBroker store but before an ACK is sent back to ProducerBroker. This
+ * is just a hack to ensure producerBroker will resend the message after
+ * shutdown.
*/
@RunWith(value = Parameterized.class)
@@ -81,9 +120,9 @@ public class AMQ4952Test extends TestCase {
@Parameterized.Parameter(0)
public boolean enableCursorAudit;
- @Parameterized.Parameters(name="enableAudit={0}")
+ @Parameterized.Parameters(name = "enableAudit={0}")
public static Iterable<Object[]> getTestParameters() {
- return Arrays.asList(new Object[][]{{Boolean.TRUE},{Boolean.FALSE}});
+ return Arrays.asList(new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } });
}
@Test
@@ -106,7 +145,6 @@ public class AMQ4952Test extends TestCase {
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME);
-
while (true) {
TextMessage textMsg = (TextMessage) messageConsumer.receive(5000);
@@ -117,14 +155,14 @@ public class AMQ4952Test extends TestCase {
receivedMessageCount++;
LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID());
- // on first delivery ensure the message is pending an ack when it is resent from the producer broker
+ // on first delivery ensure the message is pending an
+ // ack when it is resent from the producer broker
if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) {
LOG.info("Waiting for restart...");
consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS);
}
textMsg.acknowledge();
-
}
} finally {
consumerConnection.close();
@@ -133,21 +171,20 @@ public class AMQ4952Test extends TestCase {
};
Runnable consumerBrokerResetTask = new Runnable() {
+ @Override
public void run() {
try {
// wait for signal
stopConsumerBroker.await();
-
LOG.info("********* STOPPING CONSUMER BROKER");
consumerBroker.stop();
consumerBroker.waitUntilStopped();
-
LOG.info("***** STARTING CONSUMER BROKER");
- // do not delete messages on startup
+ // do not delete messages on startup
consumerBroker = createConsumerBroker(false);
LOG.info("***** CONSUMER BROKER STARTED!!");
@@ -162,28 +199,24 @@ public class AMQ4952Test extends TestCase {
}));
consumerRestartedAndMessageForwarded.countDown();
-
} catch (Exception e) {
LOG.error("Exception when stopping/starting the consumerBroker ", e);
}
-
}
};
-
ExecutorService executor = Executors.newFixedThreadPool(2);
- //start consumerBroker start/stop task
+ // start consumerBroker start/stop task
executor.execute(consumerBrokerResetTask);
- //start consuming messages
+ // start consuming messages
Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask);
-
produceMessages();
- //Wait for consumer to finish
+ // Wait for consumer to finish
int totalMessagesConsumed = numberOfConsumedMessage.get();
StringBuffer contents = new StringBuffer();
@@ -193,7 +226,6 @@ public class AMQ4952Test extends TestCase {
assertEquals("number of messages received", 2, totalMessagesConsumed);
assertEquals("messages left in store", true, messageInStore);
assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ"));
-
}
private void produceMessages() throws JMSException {
@@ -253,20 +285,16 @@ public class AMQ4952Test extends TestCase {
consumerBroker = createConsumerBroker(true);
}
-
/**
- * Producer broker
- * listens on localhost:2003
- * networks to consumerBroker - localhost:2006
+ * Producer broker listens on localhost:2003 networks to consumerBroker -
+ * localhost:2006
*
* @return
* @throws Exception
*/
-
protected BrokerService createProducerBroker() throws Exception {
-
- String networkToPorts[] = new String[]{"2006"};
+ String networkToPorts[] = new String[] { "2006" };
HashMap<String, String> networkProps = new HashMap<String, String>();
networkProps.put("networkTTL", "10");
@@ -287,8 +315,7 @@ public class AMQ4952Test extends TestCase {
transportConnectors.add(transportConnector);
broker.setTransportConnectors(transportConnectors);
-
- //network to consumerBroker
+ // network to consumerBroker
if (networkToPorts != null && networkToPorts.length > 0) {
StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false");
@@ -296,10 +323,10 @@ public class AMQ4952Test extends TestCase {
if (networkProps != null) {
IntrospectionSupport.setProperties(nc, networkProps);
}
- nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQQueue[]{QUEUE_NAME}));
+ nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination> asList(new ActiveMQQueue[] { QUEUE_NAME }));
}
- //Persistence adapter
+ // Persistence adapter
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource remoteDataSource = new EmbeddedDataSource();
@@ -308,7 +335,7 @@ public class AMQ4952Test extends TestCase {
jdbc.setDataSource(remoteDataSource);
broker.setPersistenceAdapter(jdbc);
- //set Policy entries
+ // set Policy entries
PolicyEntry policy = new PolicyEntry();
policy.setQueue(">");
@@ -317,8 +344,7 @@ public class AMQ4952Test extends TestCase {
policy.setExpireMessagesPeriod(0);
// set replay with no consumers
- ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
- new ConditionalNetworkBridgeFilterFactory();
+ ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
@@ -332,16 +358,14 @@ public class AMQ4952Test extends TestCase {
return broker;
}
-
/**
- * consumerBroker
- * - listens on localhost:2006
+ * consumerBroker - listens on localhost:2006
*
- * @param deleteMessages - drop messages when broker instance is created
+ * @param deleteMessages
+ * - drop messages when broker instance is created
* @return
* @throws Exception
*/
-
protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception {
String scheme = "tcp";
@@ -358,7 +382,7 @@ public class AMQ4952Test extends TestCase {
transportConnectors.add(transportConnector);
broker.setTransportConnectors(transportConnectors);
- //policy entries
+ // policy entries
PolicyEntry policy = new PolicyEntry();
@@ -367,8 +391,7 @@ public class AMQ4952Test extends TestCase {
policy.setExpireMessagesPeriod(0);
// set replay with no consumers
- ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
- new ConditionalNetworkBridgeFilterFactory();
+ ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
@@ -377,7 +400,6 @@ public class AMQ4952Test extends TestCase {
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
-
// Persistence adapter
JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter();
EmbeddedDataSource localDataSource = new EmbeddedDataSource();
@@ -388,7 +410,7 @@ public class AMQ4952Test extends TestCase {
if (deleteMessages) {
// no plugin on restart
- broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()});
+ broker.setPlugins(new BrokerPlugin[] { new MyTestPlugin() });
}
this.localDataSource = localDataSource;
@@ -399,7 +421,6 @@ public class AMQ4952Test extends TestCase {
return broker;
}
-
/**
* Query JDBC Store to see if messages are left
*
@@ -407,9 +428,7 @@ public class AMQ4952Test extends TestCase {
* @return
* @throws SQLException
*/
-
- private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer)
- throws SQLException {
+ private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException {
boolean tableHasData = false;
String query = "select * from ACTIVEMQ_MSGS";
@@ -419,8 +438,6 @@ public class AMQ4952Test extends TestCase {
ResultSet set = null;
-
-
try {
StringBuffer headers = new StringBuffer();
set = s.executeQuery();
@@ -434,7 +451,6 @@ public class AMQ4952Test extends TestCase {
}
LOG.error(headers.toString());
-
while (set.next()) {
tableHasData = true;
@@ -462,16 +478,16 @@ public class AMQ4952Test extends TestCase {
return tableHasData;
}
-
/**
- * plugin used to ensure consumerbroker is restared before the network message from producerBroker is acked
+ * plugin used to ensure consumerbroker is restared before the network
+ * message from producerBroker is acked
*/
class MyTestPlugin implements BrokerPlugin {
+ @Override
public Broker installPlugin(Broker broker) throws Exception {
return new MyTestBroker(broker);
}
-
}
class MyTestBroker extends BrokerFilter {
@@ -480,10 +496,11 @@ public class AMQ4952Test extends TestCase {
super(next);
}
+ @Override
public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message messageSend) throws Exception {
super.send(producerExchange, messageSend);
- LOG.error("Stopping broker on send: " +messageSend.getMessageId().getProducerSequenceId());
+ LOG.error("Stopping broker on send: " + messageSend.getMessageId().getProducerSequenceId());
stopConsumerBroker.countDown();
producerExchange.getConnectionContext().setDontSendReponse(true);
}