You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 16:21:46 UTC
[17/61] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java
deleted file mode 100644
index e80b05c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.transport.tcp.TcpTransportServer;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.jms.support.JmsUtils;
-
-public class AMQ4469Test {
-
- private static final int maxConnections = 100;
-
- private final ExecutorService executor = Executors.newCachedThreadPool();
- private String connectionUri;
- private BrokerService service;
- private TransportConnector connector;
-
- @Before
- public void setUp() throws Exception {
- service = new BrokerService();
- service.setPersistent(false);
- service.setUseJmx(false);
- connector = service.addConnector("tcp://0.0.0.0:0?maximumConnections=" + maxConnections);
- connectionUri = connector.getPublishableConnectString();
- service.start();
- service.waitUntilStarted();
- }
-
- protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory(connectionUri);
- }
-
- @Test
- public void testMaxConnectionControl() throws Exception {
- final ConnectionFactory cf = createConnectionFactory();
- final CountDownLatch startupLatch = new CountDownLatch(1);
- for (int i = 0; i < maxConnections + 20; i++) {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- Connection conn = null;
- try {
- startupLatch.await();
- conn = cf.createConnection();
- conn.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- JmsUtils.closeConnection(conn);
- }
- }
- });
- }
-
- TcpTransportServer transportServer = (TcpTransportServer) connector.getServer();
- // ensure the max connections is in effect
- assertEquals(maxConnections, transportServer.getMaximumConnections());
- // No connections at first
- assertEquals(0, connector.getConnections().size());
- // Release the latch to set up connections in parallel
- startupLatch.countDown();
- TimeUnit.SECONDS.sleep(5);
-
- final TransportConnector connector = this.connector;
-
- // Expect the max connections is created
- assertTrue("Expected: " + maxConnections + " found: " + connector.getConnections().size(), Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return connector.getConnections().size() == maxConnections;
- }
- }));
- }
-
- @After
- public void tearDown() throws Exception {
- executor.shutdown();
-
- service.stop();
- service.waitUntilStopped();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java
deleted file mode 100644
index b7ae444..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class AMQ4472Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4472Test.class);
-
- @Test
- public void testLostMessage() {
- Connection connection = null;
- try {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false");
- connection = connectionFactory.createConnection();
- connection.start();
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination test_data_destination = session.createQueue("test" + System.currentTimeMillis());
-
- MessageConsumer consumer = session.createConsumer(test_data_destination);
- LOG.info("Consumer 1 connected");
-
- MessageProducer producer = session.createProducer(test_data_destination);
- producer.send(session.createTextMessage("Message 1"));
-
- // committing the session prior to the close
- session.commit();
-
- // starting a new transaction
- producer.send(session.createTextMessage("Message 2"));
-
- // in a new transaction, with prefetch>0, the message
- // 1 will be pending till second commit
- LOG.info("Closing consumer 1...");
- consumer.close();
-
- // create a consumer
- consumer = session.createConsumer(test_data_destination);
- LOG.info("Consumer 2 connected");
-
- // retrieve message previously committed to tmp queue
- Message message = consumer.receive(10000);
- if (message != null) {
- LOG.info("Got message 1:", message);
- assertEquals("expected message", "Message 1", ((TextMessage) message).getText());
- session.commit();
- }
- else {
- LOG.error("Expected message but it never arrived");
- }
- assertNotNull(message);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- finally {
- try {
- connection.close();
- }
- catch (JMSException e) {
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
deleted file mode 100644
index 558bc08..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.util.TimeStampingBrokerPlugin;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4475Test {
-
- private final Log LOG = LogFactory.getLog(AMQ4475Test.class);
-
- private final int NUM_MSGS = 1000;
- private final int MAX_THREADS = 20;
-
- private BrokerService broker;
- private String connectionUri;
-
- private final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
- private final ActiveMQQueue original = new ActiveMQQueue("jms/AQueue");
- private final ActiveMQQueue rerouted = new ActiveMQQueue("jms/AQueue_proxy");
-
- @Before
- public void setUp() throws Exception {
- TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
- tsbp.setZeroExpirationOverride(432000000);
- tsbp.setTtlCeiling(432000000);
- tsbp.setFutureOnly(true);
-
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(true);
- broker.setPlugins(new BrokerPlugin[]{tsbp});
- connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
-
- // Configure Dead Letter Strategy
- DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
- strategy.setProcessExpired(true);
- ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true);
- ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ.");
- strategy.setProcessNonPersistent(true);
-
- // Add policy and individual DLQ strategy
- PolicyEntry policy = new PolicyEntry();
- policy.setTimeBeforeDispatchStarts(3000);
- policy.setDeadLetterStrategy(strategy);
-
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
-
- broker.setDestinationPolicy(pMap);
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void after() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test
- public void testIndividualDeadLetterAndTimeStampPlugin() {
- LOG.info("Starting test ..");
-
- long startTime = System.nanoTime();
-
- // Produce to network
- List<Future<ProducerTask>> tasks = new ArrayList<>();
-
- for (int index = 0; index < 1; index++) {
- ProducerTask p = new ProducerTask(connectionUri, original, NUM_MSGS);
- Future<ProducerTask> future = executor.submit(p, p);
- tasks.add(future);
- }
-
- ForwardingConsumerThread f1 = new ForwardingConsumerThread(original, rerouted, NUM_MSGS);
- f1.start();
- ConsumerThread c1 = new ConsumerThread(connectionUri, rerouted, NUM_MSGS);
- c1.start();
-
- LOG.info("Waiting on consumers and producers to exit");
-
- try {
- for (Future<ProducerTask> future : tasks) {
- ProducerTask e = future.get();
- LOG.info("[Completed] " + e.dest.getPhysicalName());
- }
- executor.shutdown();
- LOG.info("Producing threads complete, waiting on ACKs");
- f1.join(TimeUnit.MINUTES.toMillis(2));
- c1.join(TimeUnit.MINUTES.toMillis(2));
- }
- catch (ExecutionException e) {
- LOG.warn("Caught unexpected exception: {}", e);
- throw new RuntimeException(e);
- }
- catch (InterruptedException ie) {
- LOG.warn("Caught unexpected exception: {}", ie);
- throw new RuntimeException(ie);
- }
-
- assertFalse(f1.isFailed());
- assertFalse(c1.isFailed());
-
- long estimatedTime = System.nanoTime() - startTime;
-
- LOG.info("Testcase duration (seconds): " + estimatedTime / 1000000000.0);
- LOG.info("Consumers and producers exited, all msgs received as expected");
- }
-
- public class ProducerTask implements Runnable {
-
- private final String uri;
- private final ActiveMQQueue dest;
- private final int count;
-
- public ProducerTask(String uri, ActiveMQQueue dest, int count) {
- this.uri = uri;
- this.dest = dest;
- this.count = count;
- }
-
- @Override
- public void run() {
-
- Connection connection = null;
- try {
- String destName = "";
-
- try {
- destName = dest.getQueueName();
- }
- catch (JMSException e) {
- LOG.warn("Caught unexpected exception: {}", e);
- }
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri);
-
- connection = connectionFactory.createConnection();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(dest);
- connection.start();
-
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- String msg = "Test Message";
-
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage(msg + dest.getQueueName() + " " + i));
- }
-
- LOG.info("[" + destName + "] Sent " + count + " msgs");
- }
- catch (Exception e) {
- LOG.warn("Caught unexpected exception: {}", e);
- }
- finally {
- try {
- connection.close();
- }
- catch (Throwable e) {
- LOG.warn("Caught unexpected exception: {}", e);
- }
- }
- }
- }
-
- public class ForwardingConsumerThread extends Thread {
-
- private final ActiveMQQueue original;
- private final ActiveMQQueue forward;
- private int blockSize = 0;
- private final int PARALLEL = 1;
- private boolean failed;
-
- public ForwardingConsumerThread(ActiveMQQueue original, ActiveMQQueue forward, int total) {
- this.original = original;
- this.forward = forward;
- this.blockSize = total / PARALLEL;
- }
-
- public boolean isFailed() {
- return failed;
- }
-
- @Override
- public void run() {
- Connection connection = null;
- try {
-
- for (int index = 0; index < PARALLEL; index++) {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-
- connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(original);
- MessageProducer producer = session.createProducer(forward);
- connection.start();
- int count = 0;
-
- while (count < blockSize) {
-
- Message msg1 = consumer.receive(10000);
- if (msg1 != null) {
- if (msg1 instanceof ActiveMQTextMessage) {
- if (count % 100 == 0) {
- LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count);
- }
-
- producer.send(msg1);
-
- count++;
- }
- else {
- LOG.info("Skipping unknown msg type " + msg1);
- }
- }
- else {
- break;
- }
- }
-
- LOG.info("[" + original.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")");
- connection.close();
- }
- }
- catch (Exception e) {
- LOG.warn("Caught unexpected exception: {}", e);
- }
- finally {
- LOG.debug(getName() + ": is stopping");
- try {
- connection.close();
- }
- catch (Throwable e) {
- }
- }
- }
- }
-
- public class ConsumerThread extends Thread {
-
- private final String uri;
- private final ActiveMQQueue dest;
- private int blockSize = 0;
- private final int PARALLEL = 1;
- private boolean failed;
-
- public ConsumerThread(String uri, ActiveMQQueue dest, int total) {
- this.uri = uri;
- this.dest = dest;
- this.blockSize = total / PARALLEL;
- }
-
- public boolean isFailed() {
- return failed;
- }
-
- @Override
- public void run() {
- Connection connection = null;
- try {
-
- for (int index = 0; index < PARALLEL; index++) {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
-
- connection = factory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(dest);
- connection.start();
- int count = 0;
-
- while (count < blockSize) {
-
- Object msg1 = consumer.receive(10000);
- if (msg1 != null) {
- if (msg1 instanceof ActiveMQTextMessage) {
- if (count % 100 == 0) {
- LOG.info("Consuming -> " + ((ActiveMQTextMessage) msg1).getDestination() + " count=" + count);
- }
-
- count++;
- }
- else {
- LOG.info("Skipping unknown msg type " + msg1);
- }
- }
- else {
- failed = true;
- break;
- }
- }
-
- LOG.info("[" + dest.getQueueName() + "] completed segment (" + index + " of " + blockSize + ")");
- connection.close();
- }
- }
- catch (Exception e) {
- LOG.warn("Caught unexpected exception: {}", e);
- }
- finally {
- LOG.debug(getName() + ": is stopping");
- try {
- connection.close();
- }
- catch (Throwable e) {
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java
deleted file mode 100644
index efaf484..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.File;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.leveldb.LevelDBStore;
-
-public class AMQ4485LowLimitLevelDBTest extends AMQ4485LowLimitTest {
-
- public AMQ4485LowLimitLevelDBTest() {
- super();
- numBrokers = 2;
- }
-
- @Override
- protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception {
- BrokerService broker = super.createBroker(brokerid, addToNetwork);
-
- LevelDBStore levelDBStore = new LevelDBStore();
- levelDBStore.setDirectory(new File(broker.getBrokerDataDirectory(), "levelDB"));
- broker.setPersistenceAdapter(levelDBStore);
- return broker;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
deleted file mode 100644
index 4c48c2c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.QueueConnection;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
-
- static final String payload = new String(new byte[10 * 1024]);
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4485LowLimitTest.class);
- final int portBase = 61600;
- int numBrokers = 8;
- final int numProducers = 30;
- final int numMessages = 1000;
- final int consumerSleepTime = 40;
- StringBuilder brokersUrl = new StringBuilder();
- HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<>();
- private ArrayList<Throwable> exceptions = new ArrayList<>();
-
- protected void buildUrlList() throws Exception {
- for (int i = 0; i < numBrokers; i++) {
- brokersUrl.append("tcp://localhost:" + (portBase + i));
- if (i != numBrokers - 1) {
- brokersUrl.append(',');
- }
- }
- }
-
- protected BrokerService createBroker(int brokerid) throws Exception {
- return createBroker(brokerid, true);
- }
-
- protected BrokerService createBroker(int brokerid, boolean addToNetwork) throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.setPersistent(true);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.getManagementContext().setCreateConnector(false);
-
- broker.setUseJmx(true);
- broker.setBrokerName("B" + brokerid);
- broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
-
- if (addToNetwork) {
- addNetworkConnector(broker);
- }
- broker.setSchedulePeriodForDestinationPurge(0);
- broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024L);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry policyEntry = new PolicyEntry();
- policyEntry.setExpireMessagesPeriod(0);
- policyEntry.setQueuePrefetch(1000);
- policyEntry.setMemoryLimit(2 * 1024 * 1024L);
- policyEntry.setProducerFlowControl(false);
- policyEntry.setEnableAudit(true);
- policyEntry.setUseCache(true);
- policyMap.put(new ActiveMQQueue("GW.>"), policyEntry);
-
- PolicyEntry inPolicyEntry = new PolicyEntry();
- inPolicyEntry.setExpireMessagesPeriod(0);
- inPolicyEntry.setQueuePrefetch(1000);
- inPolicyEntry.setMemoryLimit(5 * 1024 * 1024L);
- inPolicyEntry.setProducerFlowControl(true);
- inPolicyEntry.setEnableAudit(true);
- inPolicyEntry.setUseCache(true);
- policyMap.put(new ActiveMQQueue("IN"), inPolicyEntry);
-
- broker.setDestinationPolicy(policyMap);
-
- KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
- kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
-
- brokers.put(broker.getBrokerName(), new BrokerItem(broker));
- return broker;
- }
-
- private void addNetworkConnector(BrokerService broker) throws Exception {
- StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
- networkConnectorUrl.append(')');
-
- for (int i = 0; i < 2; i++) {
- NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
- nc.setName("Bridge-" + i);
- nc.setNetworkTTL(1);
- nc.setDecreaseNetworkConsumerPriority(true);
- nc.setDynamicOnly(true);
- nc.setPrefetchSize(100);
- nc.setDynamicallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
- broker.addNetworkConnector(nc);
- }
- }
-
- // used to explore contention with concurrentStoreandDispatch - sync commit and task queue reversing
- // order of cursor add and sequence assignment
- public void x_testInterleavedSend() throws Exception {
-
- BrokerService b = createBroker(0, false);
- b.start();
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + 0));
- connectionFactory.setWatchTopicAdvisories(false);
-
- QueueConnection c1 = connectionFactory.createQueueConnection();
- QueueConnection c2 = connectionFactory.createQueueConnection();
- QueueConnection c3 = connectionFactory.createQueueConnection();
-
- c1.start();
- c2.start();
- c3.start();
-
- ActiveMQQueue dest = new ActiveMQQueue("IN");
- final Session s1 = c1.createQueueSession(true, Session.SESSION_TRANSACTED);
- final TextMessage txMessage = s1.createTextMessage("TX");
- final TextMessage noTxMessage = s1.createTextMessage("NO_TX");
-
- final MessageProducer txProducer = s1.createProducer(dest);
- final MessageProducer nonTxProducer = c2.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(dest);
-
- txProducer.send(txMessage);
-
- ExecutorService executorService = Executors.newFixedThreadPool(2);
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- s1.commit();
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
-
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- nonTxProducer.send(noTxMessage);
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
-
- executorService.shutdown();
- executorService.awaitTermination(10, TimeUnit.MINUTES);
-
- }
-
- public void testBrokers() throws Exception {
-
- buildUrlList();
-
- for (int i = 0; i < numBrokers; i++) {
- createBroker(i);
- }
-
- startAllBrokers();
- waitForBridgeFormation(numBrokers - 1);
-
- verifyPeerBrokerInfos(numBrokers - 1);
-
- final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers);
-
- startAllGWFanoutConsumers(numBrokers);
-
- LOG.info("Waiting for percolation of consumers..");
- TimeUnit.SECONDS.sleep(5);
-
- LOG.info("Produce mesages..");
- long startTime = System.currentTimeMillis();
-
- // produce
- produce(numMessages);
-
- assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- for (ConsumerState tally : consumerStates) {
- final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
- LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
- if (tally.accumulator.get() != expected) {
- LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
- if (tally.accumulator.get() > expected - 50) {
- dumpQueueStat(null);
- }
- if (tally.expected.size() == 1) {
- startConsumer(tally.brokerName, tally.destination);
- }
- return false;
- }
- LOG.info("got tally on " + tally.brokerName);
- }
- return true;
- }
- }, 1000 * 60 * 1000L, 20 * 1000));
-
- assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());
-
- LOG.info("done");
- long duration = System.currentTimeMillis() - startTime;
- LOG.info("Duration:" + TimeUtils.printDuration(duration));
-
- assertEquals("nothing in the dlq's", 0, dumpQueueStat(new ActiveMQQueue("ActiveMQ.DLQ")));
-
- }
-
- private void startConsumer(String brokerName, ActiveMQDestination destination) throws Exception {
- int id = Integer.parseInt(brokerName.substring(1));
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (portBase + id));
- connectionFactory.setWatchTopicAdvisories(false);
- QueueConnection queueConnection = connectionFactory.createQueueConnection();
- queueConnection.start();
-
- queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination);
- queueConnection.close();
- }
-
- private long dumpQueueStat(ActiveMQDestination destination) throws Exception {
- long sumTotal = 0;
- Collection<BrokerItem> brokerList = brokers.values();
- for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
- BrokerService brokerService = i.next().broker;
- for (ObjectName objectName : brokerService.getAdminView().getQueues()) {
- if (destination != null && objectName.toString().contains(destination.getPhysicalName())) {
- QueueViewMBean qViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(objectName, QueueViewMBean.class, false);
- LOG.info(brokerService.getBrokerName() + ", " + qViewMBean.getName() + ", Enqueue:" + qViewMBean.getEnqueueCount() + ", Size: " + qViewMBean.getQueueSize());
- sumTotal += qViewMBean.getQueueSize();
- }
- }
- }
- return sumTotal;
- }
-
- private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
-
- StringBuffer compositeDest = new StringBuffer();
- for (int k = 0; k < nBrokers; k++) {
- compositeDest.append("GW." + k);
- if (k + 1 != nBrokers) {
- compositeDest.append(',');
- }
- }
- ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
-
- for (int id = 0; id < nBrokers; id++) {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
- connectionFactory.setWatchTopicAdvisories(false);
-
- QueueConnection queueConnection = connectionFactory.createQueueConnection();
- queueConnection.start();
-
- final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
-
- final MessageProducer producer = queueSession.createProducer(compositeQ);
- queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- producer.send(message);
- queueSession.commit();
- }
- catch (Exception e) {
- LOG.error("Failed to fanout to GW: " + message, e);
- }
-
- }
- });
- }
- }
-
- private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
- List<ConsumerState> consumerStates = new LinkedList<>();
- for (int id = 0; id < nBrokers; id++) {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
- connectionFactory.setWatchTopicAdvisories(false);
-
- QueueConnection queueConnection = connectionFactory.createQueueConnection();
- queueConnection.start();
-
- final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQQueue destination = new ActiveMQQueue("GW." + id);
- QueueReceiver queueReceiver = queueSession.createReceiver(destination);
-
- final ConsumerState consumerState = new ConsumerState();
- consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName();
- consumerState.receiver = queueReceiver;
- consumerState.destination = destination;
- for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) {
- consumerState.expected.add(j);
- }
-
- if (!accumulators.containsKey(destination)) {
- accumulators.put(destination, new AtomicInteger(0));
- }
- consumerState.accumulator = accumulators.get(destination);
-
- queueReceiver.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- if (consumerSleepTime > 0) {
- TimeUnit.MILLISECONDS.sleep(consumerSleepTime);
- }
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- consumerState.accumulator.incrementAndGet();
- try {
- consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM"));
- }
- catch (IOException e) {
- e.printStackTrace();
- }
- //queueSession.commit();
- }
- catch (Exception e) {
- LOG.error("Failed to commit slow receipt of " + message, e);
- }
- }
- });
-
- consumerStates.add(consumerState);
-
- }
- return consumerStates;
- }
-
- private void produce(final int numMessages) throws Exception {
- ExecutorService executorService = Executors.newFixedThreadPool(numProducers);
- final AtomicInteger toSend = new AtomicInteger(numMessages);
- for (int i = 1; i <= numProducers; i++) {
- final int id = i % numBrokers;
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
- connectionFactory.setWatchTopicAdvisories(false);
- QueueConnection queueConnection = connectionFactory.createQueueConnection();
- queueConnection.start();
- QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = queueSession.createProducer(null);
- int val = 0;
- while ((val = toSend.decrementAndGet()) >= 0) {
-
- int id = numMessages - val - 1;
-
- ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
- Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload);
- textMessage.setIntProperty("NUM", id);
- producer.send(compositeQ, textMessage);
- }
- queueConnection.close();
-
- }
- catch (Throwable throwable) {
- throwable.printStackTrace();
- exceptions.add(throwable);
- }
- }
- });
- }
- }
-
- private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception {
- final BrokerService broker = brokerItem.broker;
- final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
- return max == regionBroker.getPeerBrokerInfos().length;
- }
- });
- LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
- List<String> missing = new ArrayList<>();
- for (int i = 0; i < max; i++) {
- missing.add("B" + i);
- }
- if (max != regionBroker.getPeerBrokerInfos().length) {
- for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
- LOG.info(info.getBrokerName());
- missing.remove(info.getBrokerName());
- }
- LOG.info("Broker infos off.." + missing);
- }
- assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
- }
-
- private void verifyPeerBrokerInfos(final int max) throws Exception {
- Collection<BrokerItem> brokerList = brokers.values();
- for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
- verifyPeerBrokerInfo(i.next(), max);
- }
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- class ConsumerState {
-
- AtomicInteger accumulator;
- String brokerName;
- QueueReceiver receiver;
- ActiveMQDestination destination;
- ConcurrentLinkedQueue<Integer> expected = new ConcurrentLinkedQueue<>();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
deleted file mode 100644
index 5ddb14f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.QueueConnection;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport {
-
- static final String payload = new String(new byte[10 * 1024]);
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class);
- final int portBase = 61600;
- final int numBrokers = 4;
- final int numProducers = 10;
- final int numMessages = 800;
- final int consumerSleepTime = 20;
- StringBuilder brokersUrl = new StringBuilder();
- HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<>();
- private ArrayList<Throwable> exceptions = new ArrayList<>();
-
- protected void buildUrlList() throws Exception {
- for (int i = 0; i < numBrokers; i++) {
- brokersUrl.append("tcp://localhost:" + (portBase + i));
- if (i != numBrokers - 1) {
- brokersUrl.append(',');
- }
- }
- }
-
- protected BrokerService createBroker(int brokerid) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setPersistent(true);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.getManagementContext().setCreateConnector(false);
-
- broker.setUseJmx(true);
- broker.setBrokerName("B" + brokerid);
- broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
-
- addNetworkConnector(broker);
- broker.setSchedulePeriodForDestinationPurge(0);
- broker.getSystemUsage().setSendFailIfNoSpace(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
-
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry policyEntry = new PolicyEntry();
- policyEntry.setExpireMessagesPeriod(0);
- policyEntry.setQueuePrefetch(1000);
- policyEntry.setMemoryLimit(1024 * 1024L);
- policyEntry.setOptimizedDispatch(false);
- policyEntry.setProducerFlowControl(false);
- policyEntry.setEnableAudit(true);
- policyEntry.setUseCache(true);
- policyMap.put(new ActiveMQQueue("GW.>"), policyEntry);
- broker.setDestinationPolicy(policyMap);
-
- KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
- kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
-
- brokers.put(broker.getBrokerName(), new BrokerItem(broker));
- return broker;
- }
-
- private void addNetworkConnector(BrokerService broker) throws Exception {
- StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
- networkConnectorUrl.append(')');
-
- for (int i = 0; i < 2; i++) {
- NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
- nc.setName("Bridge-" + i);
- nc.setNetworkTTL(1);
- nc.setDecreaseNetworkConsumerPriority(true);
- nc.setDynamicOnly(true);
- nc.setPrefetchSize(100);
- nc.setDynamicallyIncludedDestinations(Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
- broker.addNetworkConnector(nc);
- }
- }
-
- public void testBrokers() throws Exception {
-
- buildUrlList();
-
- for (int i = 0; i < numBrokers; i++) {
- createBroker(i);
- }
-
- startAllBrokers();
- waitForBridgeFormation(numBrokers - 1);
-
- verifyPeerBrokerInfos(numBrokers - 1);
-
- final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers);
-
- startAllGWFanoutConsumers(numBrokers);
-
- LOG.info("Waiting for percolation of consumers..");
- TimeUnit.SECONDS.sleep(5);
-
- LOG.info("Produce mesages..");
- long startTime = System.currentTimeMillis();
-
- // produce
- produce(numMessages);
-
- assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- for (ConsumerState tally : consumerStates) {
- final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
- LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
- if (tally.accumulator.get() != expected) {
- LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
- return false;
- }
- LOG.info("got tally on " + tally.brokerName);
- }
- return true;
- }
- }, 1000 * 60 * 1000L));
-
- assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());
-
- LOG.info("done");
- long duration = System.currentTimeMillis() - startTime;
- LOG.info("Duration:" + TimeUtils.printDuration(duration));
- }
-
- private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
-
- StringBuffer compositeDest = new StringBuffer();
- for (int k = 0; k < nBrokers; k++) {
- compositeDest.append("GW." + k);
- if (k + 1 != nBrokers) {
- compositeDest.append(',');
- }
- }
- ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
-
- for (int id = 0; id < nBrokers; id++) {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
- connectionFactory.setWatchTopicAdvisories(false);
-
- QueueConnection queueConnection = connectionFactory.createQueueConnection();
- queueConnection.start();
-
- final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
-
- final MessageProducer producer = queueSession.createProducer(compositeQ);
- queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- producer.send(message);
- queueSession.commit();
- }
- catch (Exception e) {
- LOG.error("Failed to fanout to GW: " + message, e);
- }
-
- }
- });
- }
- }
-
- private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
- List<ConsumerState> consumerStates = new LinkedList<>();
- for (int id = 0; id < nBrokers; id++) {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
- connectionFactory.setWatchTopicAdvisories(false);
-
- QueueConnection queueConnection = connectionFactory.createQueueConnection();
- queueConnection.start();
-
- final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQQueue destination = new ActiveMQQueue("GW." + id);
- QueueReceiver queueReceiver = queueSession.createReceiver(destination);
-
- final ConsumerState consumerState = new ConsumerState();
- consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName();
- consumerState.receiver = queueReceiver;
- consumerState.destination = destination;
- for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) {
- consumerState.expected.add(j);
- }
-
- if (!accumulators.containsKey(destination)) {
- accumulators.put(destination, new AtomicInteger(0));
- }
- consumerState.accumulator = accumulators.get(destination);
-
- queueReceiver.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- if (consumerSleepTime > 0) {
- TimeUnit.MILLISECONDS.sleep(consumerSleepTime);
- }
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- consumerState.accumulator.incrementAndGet();
- try {
- consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM"));
- }
- catch (IOException e) {
- e.printStackTrace();
- }
- }
- catch (Exception e) {
- LOG.error("Failed to commit slow receipt of " + message, e);
- }
- }
- });
-
- consumerStates.add(consumerState);
-
- }
- return consumerStates;
- }
-
- private void produce(int numMessages) throws Exception {
- ExecutorService executorService = Executors.newFixedThreadPool(numProducers);
- final AtomicInteger toSend = new AtomicInteger(numMessages);
- for (int i = 1; i <= numProducers; i++) {
- final int id = i % numBrokers;
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
- connectionFactory.setWatchTopicAdvisories(false);
- QueueConnection queueConnection = connectionFactory.createQueueConnection();
- queueConnection.start();
- QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = queueSession.createProducer(null);
- int val = 0;
- while ((val = toSend.decrementAndGet()) >= 0) {
-
- ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
- LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ);
- Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + val + " payload:" + payload);
- textMessage.setIntProperty("NUM", val);
- producer.send(compositeQ, textMessage);
- }
- queueConnection.close();
-
- }
- catch (Throwable throwable) {
- throwable.printStackTrace();
- exceptions.add(throwable);
- }
- }
- });
- }
- }
-
- private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception {
- final BrokerService broker = brokerItem.broker;
- final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
- return max == regionBroker.getPeerBrokerInfos().length;
- }
- });
- LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
- List<String> missing = new ArrayList<>();
- for (int i = 0; i < max; i++) {
- missing.add("B" + i);
- }
- if (max != regionBroker.getPeerBrokerInfos().length) {
- for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
- LOG.info(info.getBrokerName());
- missing.remove(info.getBrokerName());
- }
- LOG.info("Broker infos off.." + missing);
- }
- assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
- }
-
- private void verifyPeerBrokerInfos(final int max) throws Exception {
- Collection<BrokerItem> brokerList = brokers.values();
- for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
- verifyPeerBrokerInfo(i.next(), max);
- }
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- class ConsumerState {
-
- AtomicInteger accumulator;
- String brokerName;
- QueueReceiver receiver;
- ActiveMQDestination destination;
- Vector<Integer> expected = new Vector<>();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
deleted file mode 100644
index 777d582..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransactionBroker;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.transaction.Synchronization;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4485Test extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4485Test.class);
- BrokerService broker;
- ActiveMQConnectionFactory factory;
- final int messageCount = 20;
- int memoryLimit = 40 * 1024;
- final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + this.getClass().getName());
- final Vector<Throwable> exceptions = new Vector<>();
- final CountDownLatch slowSendResume = new CountDownLatch(1);
-
- protected void configureBroker(long memoryLimit) throws Exception {
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setAdvisorySupport(false);
-
- PolicyEntry policy = new PolicyEntry();
- policy.setExpireMessagesPeriod(0);
- policy.setMemoryLimit(memoryLimit);
- policy.setProducerFlowControl(false);
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(pMap);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
- if (messageSend.isInTransaction() && messageSend.getProperty("NUM") != null) {
- final Integer num = (Integer) messageSend.getProperty("NUM");
- if (true) {
- TransactionBroker transactionBroker = (TransactionBroker) broker.getBroker().getAdaptor(TransactionBroker.class);
- transactionBroker.getTransaction(producerExchange.getConnectionContext(), messageSend.getTransactionId(), false).addSynchronization(new Synchronization() {
- @Override
- public void afterCommit() throws Exception {
- LOG.error("AfterCommit, NUM:" + num + ", " + messageSend.getMessageId() + ", tx: " + messageSend.getTransactionId());
- if (num == 5) {
- // we want to add to cursor after usage is exhausted by message 20 and when
- // all other messages have been processed
- LOG.error("Pausing on latch in afterCommit for: " + num + ", " + messageSend.getMessageId());
- slowSendResume.await(20, TimeUnit.SECONDS);
- LOG.error("resuming on latch afterCommit for: " + num + ", " + messageSend.getMessageId());
- }
- else if (messageCount + 1 == num) {
- LOG.error("releasing latch. " + num + ", " + messageSend.getMessageId());
- slowSendResume.countDown();
- // for message X, we need to delay so message 5 can setBatch
- TimeUnit.SECONDS.sleep(5);
- LOG.error("resuming afterCommit for: " + num + ", " + messageSend.getMessageId());
- }
- }
- });
- }
- }
- super.send(producerExchange, messageSend);
- }
- }});
-
- }
-
- public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws Exception {
-
- Set<Integer> expected = new HashSet<>();
- final Vector<Session> sessionVector = new Vector<>();
- ExecutorService executorService = Executors.newCachedThreadPool();
- for (int i = 1; i <= messageCount; i++) {
- sessionVector.add(send(i, 1, true));
- expected.add(i);
- }
-
- // get parallel commit so that the sync writes are batched
- for (int i = 0; i < messageCount; i++) {
- final int id = i;
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- sessionVector.get(id).commit();
- }
- catch (Exception fail) {
- exceptions.add(fail);
- }
- }
- });
- }
-
- final DestinationViewMBean queueViewMBean = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], DestinationViewMBean.class, false);
-
- // not sure how many messages will get enqueued
- TimeUnit.SECONDS.sleep(3);
- if (false)
- assertTrue("all " + messageCount + " on the q", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount());
- return messageCount == queueViewMBean.getEnqueueCount();
- }
- }));
-
- LOG.info("Big send to blow available destination usage before slow send resumes");
- send(messageCount + 1, 35 * 1024, true).commit();
-
- // consume and verify all received
- Connection cosumerConnection = factory.createConnection();
- cosumerConnection.start();
- MessageConsumer consumer = cosumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination);
- for (int i = 1; i <= messageCount + 1; i++) {
- BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000);
- assertNotNull("Got message: " + i + ", " + expected, bytesMessage);
- MessageId mqMessageId = ((ActiveMQBytesMessage) bytesMessage).getMessageId();
- LOG.info("got: " + expected + ", " + mqMessageId + ", NUM=" + ((ActiveMQBytesMessage) bytesMessage).getProperty("NUM"));
- expected.remove(((ActiveMQBytesMessage) bytesMessage).getProperty("NUM"));
- }
- }
-
- private Session send(int id, int messageSize, boolean transacted) throws Exception {
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(new byte[messageSize]);
- bytesMessage.setIntProperty("NUM", id);
- producer.send(bytesMessage);
- LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " + ((ActiveMQBytesMessage) bytesMessage).getTransactionId());
- return session;
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- broker = new BrokerService();
- broker.setBrokerName("thisOne");
- configureBroker(memoryLimit);
- broker.start();
- factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
- factory.setWatchTopicAdvisories(false);
-
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (broker != null) {
- broker.stop();
- broker = null;
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
deleted file mode 100644
index 7d3ee41..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4487Test.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.Enumeration;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4487Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4487Test.class);
-
- private final String destinationName = "TEST.QUEUE";
- private BrokerService broker;
- private ActiveMQConnectionFactory factory;
-
- @Before
- public void startBroker() throws Exception {
- broker = new BrokerService();
- broker.deleteAllMessages();
- broker.setUseJmx(false);
- broker.setAdvisorySupport(false);
-
- PolicyEntry policy = new PolicyEntry();
- policy.setQueue(">");
- policy.setMaxProducersToAudit(75);
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(pMap);
-
- broker.start();
- broker.waitUntilStarted();
- factory = new ActiveMQConnectionFactory("vm://localhost");
- }
-
- @After
- public void stopBroker() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- private void sendMessages(int messageToSend) throws Exception {
- String data = "";
- for (int i = 0; i < 1024 * 2; i++) {
- data += "x";
- }
-
- Connection connection = factory.createConnection();
- connection.start();
-
- for (int i = 0; i < messageToSend; i++) {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(destinationName);
- MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage(data));
- session.close();
- }
-
- connection.close();
- }
-
- @Test
- public void testBrowsingWithLessThanMaxAuditDepth() throws Exception {
- doTestBrowsing(75);
- }
-
- @Test
- public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception {
- doTestBrowsing(300);
- }
-
- @SuppressWarnings("rawtypes")
- private void doTestBrowsing(int messagesToSend) throws Exception {
-
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(destinationName);
-
- sendMessages(messagesToSend);
-
- QueueBrowser browser = session.createBrowser(queue);
- Enumeration enumeration = browser.getEnumeration();
- int received = 0;
- while (enumeration.hasMoreElements()) {
- Message m = (Message) enumeration.nextElement();
- assertNotNull(m);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Browsed Message: {}", m.getJMSMessageID());
- }
-
- received++;
- if (received > messagesToSend) {
- break;
- }
- }
-
- browser.close();
-
- assertEquals(messagesToSend, received);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java
deleted file mode 100644
index a89aca2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4504Test.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotNull;
-
-public class AMQ4504Test {
-
- BrokerService brokerService;
-
- @Before
- public void setup() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.start();
- }
-
- @After
- public void stop() throws Exception {
- brokerService.stop();
- }
-
- @Test
- public void testCompositeDestConsumer() throws Exception {
-
- final int numDests = 20;
- final int numMessages = 200;
- StringBuffer stringBuffer = new StringBuffer();
- for (int i = 0; i < numDests; i++) {
- if (stringBuffer.length() != 0) {
- stringBuffer.append(',');
- }
- stringBuffer.append("ST." + i);
- }
- stringBuffer.append("?consumer.prefetchSize=100");
- ActiveMQQueue activeMQQueue = new ActiveMQQueue(stringBuffer.toString());
- ConnectionFactory factory = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
- Connection connection = factory.createConnection();
- connection.start();
- MessageProducer producer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(activeMQQueue);
- for (int i = 0; i < numMessages; i++) {
- producer.send(new ActiveMQTextMessage());
- }
-
- MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(activeMQQueue);
- try {
- for (int i = 0; i < numMessages * numDests; i++) {
- assertNotNull("received:" + i, consumer.receive(4000));
- }
- }
- finally {
- connection.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
deleted file mode 100644
index ceac82f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4513Test.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4513Test {
-
- private BrokerService brokerService;
- private String connectionUri;
-
- @Before
- public void setup() throws Exception {
- brokerService = new BrokerService();
-
- connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
-
- // Configure Dead Letter Strategy
- DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
- ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true);
- ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ.");
- strategy.setProcessNonPersistent(false);
- strategy.setProcessExpired(false);
-
- // Add policy and individual DLQ strategy
- PolicyEntry policy = new PolicyEntry();
- policy.setTimeBeforeDispatchStarts(3000);
- policy.setDeadLetterStrategy(strategy);
-
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
-
- brokerService.setDestinationPolicy(pMap);
-
- brokerService.setPersistent(false);
- brokerService.start();
- }
-
- @After
- public void stop() throws Exception {
- brokerService.stop();
- }
-
- @Test(timeout = 360000)
- public void test() throws Exception {
-
- final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
-
- ExecutorService service = Executors.newFixedThreadPool(25);
-
- final Random ripple = new Random(System.currentTimeMillis());
-
- for (int i = 0; i < 1000; ++i) {
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTemporaryQueue();
- session.createProducer(destination);
- connection.close();
- TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
- }
- catch (Exception e) {
- }
- }
- });
-
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTemporaryQueue();
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.setTimeToLive(400);
- producer.send(session.createTextMessage());
- producer.send(session.createTextMessage());
- TimeUnit.MILLISECONDS.sleep(500);
- connection.close();
- }
- catch (Exception e) {
- }
- }
- });
-
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTemporaryQueue();
- session.createProducer(destination);
- connection.close();
- TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
- }
- catch (Exception e) {
- }
- }
- });
- }
-
- service.shutdown();
- assertTrue(service.awaitTermination(5, TimeUnit.MINUTES));
- }
-}