You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:31 UTC
[22/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
deleted file mode 100644
index 882105b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
+++ /dev/null
@@ -1,520 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQPrefetchPolicy;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AMQ4083Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class);
- private static BrokerService brokerService;
- private static String BROKER_ADDRESS = "tcp://localhost:0";
- private static String TEST_QUEUE = "testQueue";
- private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
-
- private final int messageCount = 100;
-
- private String connectionUri;
- private String[] data;
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- brokerService.setDeleteAllMessagesOnStartup(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- brokerService.start();
- brokerService.waitUntilStarted();
-
- data = new String[messageCount];
-
- for (int i = 0; i < messageCount; i++) {
- data[i] = "Text for message: " + i + " at " + new Date();
- }
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testExpiredMsgsBeforeNonExpired() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- connection.getPrefetchPolicy().setQueuePrefetch(400);
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- connection.start();
-
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
-
- // send a batch that expires in a short time.
- for (int i = 0; i < 100; i++) {
- producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
- }
-
- // and send one that doesn't expire to we can ack it.
- producer.send(session.createTextMessage());
-
- // wait long enough so the first batch times out.
- TimeUnit.SECONDS.sleep(5);
-
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
- assertEquals(101, queueView.getInFlightCount());
-
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- message.acknowledge();
- }
- catch (JMSException e) {
- }
- }
- });
-
- TimeUnit.SECONDS.sleep(5);
-
- assertEquals(0, queueView.getInFlightCount());
-
- for (int i = 0; i < 200; i++) {
- producer.send(session.createTextMessage());
- }
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
- LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
- LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
- LOG.info("Expired Count: {}", queueView.getExpiredCount());
- LOG.info("InFlight Count: {}", queueView.getInFlightCount());
- }
-
- @Test
- public void testExpiredMsgsBeforeNonExpiredWithTX() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- connection.getPrefetchPolicy().setQueuePrefetch(400);
-
- final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- connection.start();
-
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
-
- // send a batch that expires in a short time.
- for (int i = 0; i < 100; i++) {
- producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
- }
-
- // and send one that doesn't expire to we can ack it.
- producer.send(session.createTextMessage());
- session.commit();
-
- // wait long enough so the first batch times out.
- TimeUnit.SECONDS.sleep(5);
-
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
- assertEquals(101, queueView.getInFlightCount());
-
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- try {
- session.commit();
- }
- catch (JMSException e) {
- }
- }
- });
-
- TimeUnit.SECONDS.sleep(5);
-
- assertEquals(0, queueView.getInFlightCount());
-
- for (int i = 0; i < 200; i++) {
- producer.send(session.createTextMessage());
- }
- session.commit();
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
- LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
- LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
- LOG.info("Expired Count: {}", queueView.getExpiredCount());
- LOG.info("InFlight Count: {}", queueView.getInFlightCount());
- }
-
- @Test
- public void testExpiredMsgsInterleavedWithNonExpired() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- connection.getPrefetchPolicy().setQueuePrefetch(400);
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- connection.start();
-
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
-
- // send a batch that expires in a short time.
- for (int i = 0; i < 200; i++) {
-
- if ((i % 2) == 0) {
- producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
- }
- else {
- producer.send(session.createTextMessage());
- }
- }
-
- // wait long enough so the first batch times out.
- TimeUnit.SECONDS.sleep(5);
-
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
- assertEquals(200, queueView.getInFlightCount());
-
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- try {
- LOG.debug("Acking message: {}", message);
- message.acknowledge();
- }
- catch (JMSException e) {
- }
- }
- });
-
- TimeUnit.SECONDS.sleep(5);
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- for (int i = 0; i < 200; i++) {
- producer.send(session.createTextMessage());
- }
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
- LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
- LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
- LOG.info("Expired Count: {}", queueView.getExpiredCount());
- LOG.info("InFlight Count: {}", queueView.getInFlightCount());
- }
-
- @Test
- public void testExpiredMsgsInterleavedWithNonExpiredCumulativeAck() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- connection.getPrefetchPolicy().setQueuePrefetch(400);
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- connection.start();
-
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
-
- // send a batch that expires in a short time.
- for (int i = 0; i < 200; i++) {
-
- if ((i % 2) == 0) {
- producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
- }
- else {
- producer.send(session.createTextMessage());
- }
- }
-
- // wait long enough so the first batch times out.
- TimeUnit.SECONDS.sleep(5);
-
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
- assertEquals(200, queueView.getInFlightCount());
-
- final AtomicInteger msgCount = new AtomicInteger();
-
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- try {
- if (msgCount.incrementAndGet() == 100) {
- LOG.debug("Acking message: {}", message);
- message.acknowledge();
- }
- }
- catch (JMSException e) {
- }
- }
- });
-
- TimeUnit.SECONDS.sleep(5);
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- // Now we just ack each and see if our counters come out right in the end.
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- try {
- LOG.debug("Acking message: {}", message);
- message.acknowledge();
- }
- catch (JMSException e) {
- }
- }
- });
-
- for (int i = 0; i < 200; i++) {
- producer.send(session.createTextMessage());
- }
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
- LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
- LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
- LOG.info("Expired Count: {}", queueView.getExpiredCount());
- LOG.info("InFlight Count: {}", queueView.getInFlightCount());
- }
-
- @Test
- public void testExpiredBatchBetweenNonExpiredMessages() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- connection.getPrefetchPolicy().setQueuePrefetch(400);
-
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- connection.start();
-
- MessageProducer producer = session.createProducer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
-
- // Send one that doesn't expire so we can ack it.
- producer.send(session.createTextMessage());
-
- // send a batch that expires in a short time.
- for (int i = 0; i < 100; i++) {
- producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 4, 4000);
- }
-
- // and send one that doesn't expire so we can ack it.
- producer.send(session.createTextMessage());
-
- // wait long enough so the first batch times out.
- TimeUnit.SECONDS.sleep(5);
-
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
- assertEquals(102, queueView.getInFlightCount());
-
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- try {
- message.acknowledge();
- }
- catch (JMSException e) {
- }
- }
- });
-
- TimeUnit.SECONDS.sleep(5);
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- for (int i = 0; i < 200; i++) {
- producer.send(session.createTextMessage());
- }
-
- assertTrue("Inflight count should reach zero, currently: " + queueView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return queueView.getInFlightCount() == 0;
- }
- }));
-
- LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
- LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
- LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
- LOG.info("Expired Count: {}", queueView.getExpiredCount());
- LOG.info("InFlight Count: {}", queueView.getInFlightCount());
- }
-
- @Test
- public void testConsumeExpiredQueueAndDlq() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- Connection connection = factory.createConnection();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producerNormal = session.createProducer(queue);
- MessageProducer producerExpire = session.createProducer(queue);
- producerExpire.setTimeToLive(500);
-
- MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("ActiveMQ.DLQ"));
- connection.start();
-
- Connection consumerConnection = factory.createConnection();
- ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
- prefetchPolicy.setAll(10);
- ((ActiveMQConnection) consumerConnection).setPrefetchPolicy(prefetchPolicy);
- Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
-
- String msgBody = new String(new byte[20 * 1024]);
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(msgBody);
- producerExpire.send(queue, message);
- }
-
- for (int i = 0; i < data.length; i++) {
- Message message = session.createTextMessage(msgBody);
- producerNormal.send(queue, message);
- }
-
- ArrayList<Message> messages = new ArrayList<>();
- Message received;
- while ((received = consumer.receive(1000)) != null) {
- messages.add(received);
- if (messages.size() == 1) {
- TimeUnit.SECONDS.sleep(1);
- }
- received.acknowledge();
- }
-
- assertEquals("got messages", messageCount + 1, messages.size());
-
- ArrayList<Message> dlqMessages = new ArrayList<>();
- while ((received = dlqConsumer.receive(1000)) != null) {
- dlqMessages.add(received);
- }
-
- assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
-
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
-
- LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
- LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
- LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
- LOG.info("Expired Count: {}", queueView.getExpiredCount());
- LOG.info("InFlight Count: {}", queueView.getInFlightCount());
- }
-
- private QueueViewMBean getProxyToQueueViewMBean() throws Exception {
- final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
- final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java
deleted file mode 100644
index e894b70..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4092Test.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4092Test extends TestCase {
-
- private static final Logger log = LoggerFactory.getLogger(AMQ4092Test.class);
-
- static final String QUEUE_NAME = "TEST";
-
- // increase limits to expedite failure
- static final int NUM_TO_SEND_PER_PRODUCER = 1000; // 10000
- static final int NUM_PRODUCERS = 5; // 40
-
- static final ActiveMQQueue[] DESTINATIONS = new ActiveMQQueue[]{new ActiveMQQueue("A"), new ActiveMQQueue("B")
- // A/B seems to be sufficient for concurrentStoreAndDispatch=true
- };
-
- static final boolean debug = false;
-
- private BrokerService brokerService;
-
- private ActiveMQQueue destination;
- private HashMap<Thread, Throwable> exceptions = new HashMap<>();
- private ExceptionListener exceptionListener = new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- exceptions.put(Thread.currentThread(), exception);
- }
- };
-
- @Override
- protected void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setDeleteAllMessagesOnStartup(true);
- ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
- brokerService.addConnector("tcp://localhost:0");
- brokerService.start();
- destination = new ActiveMQQueue();
- destination.setCompositeDestinations(DESTINATIONS);
- Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- exceptions.put(t, e);
- }
- });
- }
-
- @Override
- protected void tearDown() throws Exception {
- // Stop any running threads.
- brokerService.stop();
- }
-
- public void testConcurrentGroups() throws Exception {
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.submit(new TestConsumer());
- for (int i = 0; i < NUM_PRODUCERS; i++) {
- executorService.submit(new TestProducer());
- }
- executorService.shutdown();
- executorService.awaitTermination(5, TimeUnit.MINUTES);
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- }
-
- class TestProducer implements Runnable {
-
- public void produceMessages() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
- connectionFactory.setExceptionListener(exceptionListener);
- connectionFactory.setUseAsyncSend(true);
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- String name = new String(new byte[2 * 1024]);
- for (int i = 1; i <= NUM_TO_SEND_PER_PRODUCER; i++) {
-
- TextMessage message = session.createTextMessage(name + "_" + i);
- for (int j = 0; j < 100; j++) {
- message.setStringProperty("Prop" + j, "" + j);
- }
- message.setStringProperty("JMSXGroupID", Thread.currentThread().getName() + i);
- message.setIntProperty("JMSXGroupSeq", 1);
- producer.send(message);
- }
-
- producer.close();
- session.close();
- connection.close();
- }
-
- @Override
- public void run() {
- try {
- produceMessages();
- }
- catch (Exception e) {
- e.printStackTrace();
- exceptions.put(Thread.currentThread(), e);
- }
- }
- }
-
- class TestConsumer implements Runnable {
-
- private CountDownLatch finishLatch = new CountDownLatch(1);
-
- public void consume() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getConnectUri().toString());
-
- connectionFactory.setExceptionListener(exceptionListener);
- final int totalMessageCount = NUM_TO_SEND_PER_PRODUCER * DESTINATIONS.length * NUM_PRODUCERS;
- final AtomicInteger counter = new AtomicInteger();
- final MessageListener listener = new MessageListener() {
- @Override
- public void onMessage(Message message) {
-
- if (debug) {
- try {
- log.info(((TextMessage) message).getText());
- }
- catch (JMSException e) {
- e.printStackTrace();
- }
- }
-
- boolean first = false;
- try {
- first = message.getBooleanProperty("JMSXGroupFirstForConsumer");
- }
- catch (JMSException e) {
- e.printStackTrace();
- exceptions.put(Thread.currentThread(), e);
- }
- assertTrue("Always is first message", first);
- if (counter.incrementAndGet() == totalMessageCount) {
- log.info("Got all:" + counter.get());
- finishLatch.countDown();
-
- }
- }
- };
-
- int consumerCount = DESTINATIONS.length * 100;
- Connection[] connections = new Connection[consumerCount];
-
- Session[] sessions = new Session[consumerCount];
- MessageConsumer[] consumers = new MessageConsumer[consumerCount];
-
- for (int i = 0; i < consumerCount; i++) {
- connections[i] = connectionFactory.createConnection();
- connections[i].start();
-
- sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- consumers[i] = sessions[i].createConsumer(DESTINATIONS[i % DESTINATIONS.length], null);
- consumers[i].setMessageListener(listener);
- }
-
- log.info("received " + counter.get() + " messages");
-
- assertTrue("got all messages in time", finishLatch.await(4, TimeUnit.MINUTES));
-
- log.info("received " + counter.get() + " messages");
-
- for (MessageConsumer consumer : consumers) {
- consumer.close();
- }
-
- for (Session session : sessions) {
- session.close();
- }
-
- for (Connection connection : connections) {
- connection.close();
- }
- }
-
- @Override
- public void run() {
- try {
- consume();
- }
- catch (Exception e) {
- e.printStackTrace();
- exceptions.put(Thread.currentThread(), e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
deleted file mode 100644
index b87fd1b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4116Test.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.Assert;
-
-public class AMQ4116Test extends EmbeddedBrokerTestSupport {
-
- private final String tcpAddr = "tcp://localhost:0";
- private String connectionUri;
-
- /**
- * In this test, a message is produced and consumed from the test queue.
- * Memory usage on the test queue should be reset to 0. The memory that was
- * consumed is then sent to a second queue. Memory usage on the original
- * test queue should remain 0, but actually increased when the second
- * enqueue occurs.
- */
- public void testVMTransport() throws Exception {
- runTest(connectionFactory);
- }
-
- /**
- * This is an analog to the previous test, but occurs over TCP and passes.
- */
- public void testTCPTransport() throws Exception {
- runTest(new ActiveMQConnectionFactory(connectionUri));
- }
-
- private void runTest(ConnectionFactory connFactory) throws Exception {
- // Verify that test queue is empty and not using any memory.
- Destination physicalDestination = broker.getDestination(destination);
- Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
-
- // Enqueue a single message and verify that the test queue is using
- // memory.
- Connection conn = connFactory.createConnection();
- conn.start();
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = session.createProducer(destination);
-
- producer.send(new ActiveMQMessage());
-
- // Commit, which ensures message is in queue and memory usage updated.
- session.commit();
- Assert.assertTrue(physicalDestination.getMemoryUsage().getUsage() > 0);
-
- // Consume the message and verify that the test queue is no longer using
- // any memory.
- MessageConsumer consumer = session.createConsumer(destination);
- Message received = consumer.receive();
- Assert.assertNotNull(received);
-
- // Commit, which ensures message is removed from queue and memory usage
- // updated.
- session.commit();
- Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
-
- // Resend the message to a different queue and verify that the original
- // test queue is still not using any memory.
- ActiveMQQueue secondDestination = new ActiveMQQueue(AMQ4116Test.class + ".second");
- MessageProducer secondPproducer = session.createProducer(secondDestination);
-
- secondPproducer.send(received);
-
- // Commit, which ensures message is in queue and memory usage updated.
- // NOTE: This assertion fails due to bug.
- session.commit();
- Assert.assertEquals(0, physicalDestination.getMemoryUsage().getUsage());
-
- conn.stop();
- }
-
- /**
- * Create an embedded broker that has both TCP and VM connectors.
- */
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = super.createBroker();
- connectionUri = broker.addConnector(tcpAddr).getPublishableConnectString();
- return broker;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
deleted file mode 100644
index d47c7c8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.Socket;
-import java.net.URI;
-
-import javax.management.ObjectName;
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLSocketFactory;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQSslConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompConnection;
-import org.apache.activemq.transport.stomp.StompFrame;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- *
- */
-public class AMQ4126Test {
-
- protected BrokerService broker;
-
- protected String java_security_auth_login_config = "java.security.auth.login.config";
- protected String xbean = "xbean:";
- protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126";
- protected String certBase = "src/test/resources/org/apache/activemq/security";
- protected String JaasStompSSLBroker_xml = "JaasStompSSLBroker.xml";
- protected StompConnection stompConnection = new StompConnection();
- private final static String destinationName = "TEST.QUEUE";
- protected String oldLoginConf = null;
-
- @Before
- public void before() throws Exception {
- if (System.getProperty(java_security_auth_login_config) != null) {
- oldLoginConf = System.getProperty(java_security_auth_login_config);
- }
- System.setProperty(java_security_auth_login_config, confBase + "/login.config");
- broker = BrokerFactory.createBroker(xbean + confBase + "/" + JaasStompSSLBroker_xml);
-
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setUseJmx(true);
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void after() throws Exception {
- broker.stop();
-
- if (oldLoginConf != null) {
- System.setProperty(java_security_auth_login_config, oldLoginConf);
- }
- }
-
- public Socket createSocket(String host, int port) throws Exception {
- System.setProperty("javax.net.ssl.trustStore", certBase + "/broker1.ks");
- System.setProperty("javax.net.ssl.trustStorePassword", "password");
- System.setProperty("javax.net.ssl.trustStoreType", "jks");
- System.setProperty("javax.net.ssl.keyStore", certBase + "/client.ks");
- System.setProperty("javax.net.ssl.keyStorePassword", "password");
- System.setProperty("javax.net.ssl.keyStoreType", "jks");
-
- SocketFactory factory = SSLSocketFactory.getDefault();
- return factory.createSocket(host, port);
- }
-
- public void stompConnectTo(String connectorName, String extraHeaders) throws Exception {
- String host = broker.getConnectorByName(connectorName).getConnectUri().getHost();
- int port = broker.getConnectorByName(connectorName).getConnectUri().getPort();
- stompConnection.open(createSocket(host, port));
- String extra = extraHeaders != null ? extraHeaders : "\n";
- stompConnection.sendFrame("CONNECT\n" + extra + "\n" + Stomp.NULL);
-
- StompFrame f = stompConnection.receive();
- TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction());
- stompConnection.close();
- }
-
- @Test
- public void testStompSSLWithUsernameAndPassword() throws Exception {
- stompConnectTo("stomp+ssl", "login:system\n" + "passcode:manager\n");
- }
-
- @Test
- public void testStompSSLWithCertificate() throws Exception {
- stompConnectTo("stomp+ssl", null);
- }
-
- @Test
- public void testStompNIOSSLWithUsernameAndPassword() throws Exception {
- stompConnectTo("stomp+nio+ssl", "login:system\n" + "passcode:manager\n");
- }
-
- @Test
- public void testStompNIOSSLWithCertificate() throws Exception {
- stompConnectTo("stomp+nio+ssl", null);
- }
-
- public void openwireConnectTo(String connectorName, String username, String password) throws Exception {
- URI brokerURI = broker.getConnectorByName(connectorName).getConnectUri();
- String uri = "ssl://" + brokerURI.getHost() + ":" + brokerURI.getPort();
- ActiveMQSslConnectionFactory cf = new ActiveMQSslConnectionFactory(uri);
- cf.setTrustStore("org/apache/activemq/security/broker1.ks");
- cf.setTrustStorePassword("password");
- cf.setKeyStore("org/apache/activemq/security/client.ks");
- cf.setKeyStorePassword("password");
- ActiveMQConnection connection = null;
- if (username != null || password != null) {
- connection = (ActiveMQConnection) cf.createConnection(username, password);
- }
- else {
- connection = (ActiveMQConnection) cf.createConnection();
- }
- TestCase.assertNotNull(connection);
- connection.start();
- connection.stop();
- }
-
- @Test
- public void testOpenwireSSLWithUsernameAndPassword() throws Exception {
- openwireConnectTo("openwire+ssl", "system", "manager");
- }
-
- @Test
- public void testOpenwireSSLWithCertificate() throws Exception {
- openwireConnectTo("openwire+ssl", null, null);
- }
-
- @Test
- public void testOpenwireNIOSSLWithUsernameAndPassword() throws Exception {
- openwireConnectTo("openwire+nio+ssl", "system", "mmanager");
- }
-
- @Test
- public void testOpenwireNIOSSLWithCertificate() throws Exception {
- openwireConnectTo("openwire+nio+ssl", null, null);
- }
-
- @Test
- public void testJmx() throws Exception {
- TestCase.assertFalse(findDestination(destinationName));
- broker.getAdminView().addQueue(destinationName);
- TestCase.assertTrue(findDestination(destinationName));
- broker.getAdminView().removeQueue(destinationName);
- TestCase.assertFalse(findDestination(destinationName));
- }
-
- private boolean findDestination(String name) throws Exception {
- ObjectName[] destinations = broker.getAdminView().getQueues();
- for (ObjectName destination : destinations) {
- if (destination.toString().contains(name)) {
- return true;
- }
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
deleted file mode 100644
index 123413f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.Socket;
-
-import javax.net.SocketFactory;
-import javax.net.ssl.SSLSocketFactory;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompConnection;
-import org.apache.activemq.transport.stomp.StompFrame;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4133Test {
-
- protected String java_security_auth_login_config = "java.security.auth.login.config";
- protected String xbean = "xbean:";
- protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126";
- protected String certBase = "src/test/resources/org/apache/activemq/security";
- protected String activemqXml = "InconsistentConnectorPropertiesBehaviour.xml";
- protected BrokerService broker;
-
- protected String oldLoginConf = null;
-
- @Before
- public void before() throws Exception {
- if (System.getProperty(java_security_auth_login_config) != null) {
- oldLoginConf = System.getProperty(java_security_auth_login_config);
- }
- System.setProperty(java_security_auth_login_config, confBase + "/" + "login.config");
- broker = BrokerFactory.createBroker(xbean + confBase + "/" + activemqXml);
-
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void after() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test
- public void stompSSLTransportNeedClientAuthTrue() throws Exception {
- stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl").getConnectUri().getPort());
- }
-
- @Test
- public void stompSSLNeedClientAuthTrue() throws Exception {
- stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl+special").getConnectUri().getPort());
- }
-
- @Test
- public void stompNIOSSLTransportNeedClientAuthTrue() throws Exception {
- stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl").getConnectUri().getPort());
- }
-
- @Test
- public void stompNIOSSLNeedClientAuthTrue() throws Exception {
- stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl+special").getConnectUri().getPort());
- }
-
- public Socket createSocket(String host, int port) throws Exception {
- System.setProperty("javax.net.ssl.trustStore", certBase + "/" + "broker1.ks");
- System.setProperty("javax.net.ssl.trustStorePassword", "password");
- System.setProperty("javax.net.ssl.trustStoreType", "jks");
- System.setProperty("javax.net.ssl.keyStore", certBase + "/" + "client.ks");
- System.setProperty("javax.net.ssl.keyStorePassword", "password");
- System.setProperty("javax.net.ssl.keyStoreType", "jks");
-
- SocketFactory factory = SSLSocketFactory.getDefault();
- return factory.createSocket(host, port);
- }
-
- public void stompConnectTo(String host, int port) throws Exception {
- StompConnection stompConnection = new StompConnection();
- stompConnection.open(createSocket(host, port));
- stompConnection.sendFrame("CONNECT\n" + "\n" + Stomp.NULL);
- StompFrame f = stompConnection.receive();
- TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction());
- stompConnection.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
deleted file mode 100644
index d0096f1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-import java.util.concurrent.Semaphore;
-
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DemandForwardingBridgeSupport;
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.util.Wait;
-
-/**
- * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} when
- * bridges are VM-to-VM. Specifically, memory usage from the local broker is
- * manipulated by the remote broker.
- */
-public class AMQ4147Test extends JmsMultipleBrokersTestSupport {
-
- /**
- * This test demonstrates the bug: namely, when a message is bridged over
- * the VMTransport, its memory usage continues to refer to the originating
- * broker. As a result, memory usage is never accounted for on the remote
- * broker, and the local broker's memory usage is only decreased once the
- * message is consumed on the remote broker.
- */
- public void testVMTransportRemoteMemoryUsage() throws Exception {
- BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
-
- BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
- startAllBrokers();
-
- // Forward messages from broker1 to broker2 over the VM transport.
- bridgeBrokers("broker1", "broker2").start();
-
- // Verify that broker1 and broker2's test queues have no memory usage.
- ActiveMQDestination testQueue = createDestination(AMQ4147Test.class.getSimpleName() + ".queue", false);
- final Destination broker1TestQueue = broker1.getDestination(testQueue);
- final Destination broker2TestQueue = broker2.getDestination(testQueue);
-
- assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
- assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
- // Produce a message to broker1's test queue and verify that broker1's
- // memory usage has increased, but broker2 still has no memory usage.
- sendMessages("broker1", testQueue, 1);
- assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
- assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
- // Create a consumer on broker2 that is synchronized to allow detection
- // of "in flight" messages to the consumer.
- MessageIdList broker2Messages = getBrokerMessages("broker2");
- final Semaphore consumerReady = new Semaphore(0);
- final Semaphore consumerProceed = new Semaphore(0);
-
- broker2Messages.setParent(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- consumerReady.release();
- try {
- consumerProceed.acquire();
- }
- catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- });
-
- createConsumer("broker2", testQueue);
-
- // Verify that when broker2's consumer receives the message, the memory
- // usage has moved broker1 to broker2. The first assertion is expected
- // to fail due to the bug; the try/finally ensures the consumer is
- // released prior to failure so that the broker can shut down.
- consumerReady.acquire();
-
- try {
- assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker1TestQueue.getMemoryUsage().getUsage() == 0;
- }
- }));
- assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
- }
- finally {
- // Consume the message and verify that there is no more memory
- // usage.
- consumerProceed.release();
- }
-
- assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker1TestQueue.getMemoryUsage().getUsage() == 0;
- }
- }));
- assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker2TestQueue.getMemoryUsage().getUsage() == 0;
- }
- }));
- }
-
- /**
- * This test demonstrates that the bug is VMTransport-specific and does not
- * occur when bridges occur using other protocols.
- */
- public void testTcpTransportRemoteMemoryUsage() throws Exception {
- BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
-
- BrokerService broker2 = createBroker(new URI("broker:(tcp://localhost:61616)/broker2?persistent=false"));
-
- startAllBrokers();
-
- // Forward messages from broker1 to broker2 over the TCP transport.
- bridgeBrokers("broker1", "broker2").start();
-
- // Verify that broker1 and broker2's test queues have no memory usage.
- ActiveMQDestination testQueue = createDestination(AMQ4147Test.class.getSimpleName() + ".queue", false);
- final Destination broker1TestQueue = broker1.getDestination(testQueue);
- final Destination broker2TestQueue = broker2.getDestination(testQueue);
-
- assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
- assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
- // Produce a message to broker1's test queue and verify that broker1's
- // memory usage has increased, but broker2 still has no memory usage.
- sendMessages("broker1", testQueue, 1);
- assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
- assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
-
- // Create a consumer on broker2 that is synchronized to allow detection
- // of "in flight" messages to the consumer.
- MessageIdList broker2Messages = getBrokerMessages("broker2");
- final Semaphore consumerReady = new Semaphore(0);
- final Semaphore consumerProceed = new Semaphore(0);
-
- broker2Messages.setParent(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- consumerReady.release();
- try {
- consumerProceed.acquire();
- }
- catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
- });
-
- createConsumer("broker2", testQueue);
-
- // Verify that when broker2's consumer receives the message, the memory
- // usage has moved broker1 to broker2.
- consumerReady.acquire();
-
- try {
- assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker1TestQueue.getMemoryUsage().getUsage() == 0;
- }
- }));
- assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
- }
- finally {
- // Consume the message and verify that there is no more memory
- // usage.
- consumerProceed.release();
- }
-
- // Pause to allow ACK to be processed.
- assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker1TestQueue.getMemoryUsage().getUsage() == 0;
- }
- }));
- assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return broker2TestQueue.getMemoryUsage().getUsage() == 0;
- }
- }));
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
deleted file mode 100644
index 8558f48..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DemandForwardingBridgeSupport;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.util.Wait;
-import org.junit.Assert;
-
-/**
- * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby
- * a static subscription from broker1 to broker2 is forwarded to broker3 even
- * though the network TTL is 1. This results in duplicate subscriptions on
- * broker3.
- */
-public class AMQ4148Test extends JmsMultipleBrokersTestSupport {
-
- public void test() throws Exception {
- // Create a hub-and-spoke network where each hub-spoke pair share
- // messages on a test queue.
- BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false"));
-
- final BrokerService[] spokes = new BrokerService[4];
- for (int i = 0; i < spokes.length; i++) {
- spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false"));
-
- }
- startAllBrokers();
-
- ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName() + ".queue", false);
-
- NetworkConnector[] ncs = new NetworkConnector[spokes.length];
- for (int i = 0; i < spokes.length; i++) {
- NetworkConnector nc = bridgeBrokers("hub", "spoke" + i);
- nc.setNetworkTTL(1);
- nc.setDuplex(true);
- nc.setConduitSubscriptions(false);
- nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue));
- nc.start();
-
- ncs[i] = nc;
- }
-
- waitForBridgeFormation();
-
- // Pause to allow subscriptions to be created.
- TimeUnit.SECONDS.sleep(5);
-
- // Verify that the hub has a subscription from each spoke, but that each
- // spoke has a single subscription from the hub (since the network TTL is 1).
- final Destination hubTestQueue = hub.getDestination(testQueue);
- assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size() + "}", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return spokes.length == hubTestQueue.getConsumers().size();
- }
- }));
-
- // Now check each spoke has exactly one consumer on the Queue.
- for (int i = 0; i < 4; i++) {
- Destination spokeTestQueue = spokes[i].getDestination(testQueue);
- Assert.assertEquals(1, spokeTestQueue.getConsumers().size());
- }
-
- for (NetworkConnector nc : ncs) {
- nc.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
deleted file mode 100644
index f932a49..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionControl;
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4157Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ4157Test.class);
- private BrokerService broker;
- private ActiveMQConnectionFactory connectionFactory;
- private final Destination destination = new ActiveMQQueue("Test");
- private final String payloadString = new String(new byte[8 * 1024]);
- private final boolean useBytesMessage = true;
- private final int parallelProducer = 20;
- private final int parallelConsumer = 100;
-
- private final Vector<Exception> exceptions = new Vector<>();
- long toSend = 1000;
-
- @Test
- public void testPublishCountsWithRollbackConsumer() throws Exception {
-
- startBroker(true);
-
- final AtomicLong sharedCount = new AtomicLong(toSend);
- ExecutorService executorService = Executors.newCachedThreadPool();
-
- for (int i = 0; i < parallelConsumer; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- consumeOneAndRollback();
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
- }
-
- for (int i = 0; i < parallelProducer; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- publishMessages(sharedCount, 0);
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
- }
-
- executorService.shutdown();
- executorService.awaitTermination(30, TimeUnit.MINUTES);
- assertTrue("Producers done in time", executorService.isTerminated());
- assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
-
- restartBroker(500);
-
- LOG.info("Attempting consume of {} messages", toSend);
-
- consumeMessages(toSend);
- }
-
- private void consumeOneAndRollback() throws Exception {
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(destination);
- Message message = null;
- while (message == null) {
- message = consumer.receive(1000);
- }
- session.rollback();
- connection.close();
- }
-
- private void consumeMessages(long count) throws Exception {
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i = 0; i < count; i++) {
- assertNotNull("got message " + i, consumer.receive(20000));
- }
- assertNull("none left over", consumer.receive(2000));
- }
-
- private void restartBroker(int restartDelay) throws Exception {
- stopBroker();
- TimeUnit.MILLISECONDS.sleep(restartDelay);
- startBroker(false);
- }
-
- @After
- public void stopBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- private void publishMessages(AtomicLong count, int expiry) throws Exception {
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setWatchTopicAdvisories(false);
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = session.createProducer(destination);
- while ((count.getAndDecrement()) > 0) {
- Message message = null;
- if (useBytesMessage) {
- message = session.createBytesMessage();
- ((BytesMessage) message).writeBytes(payloadString.getBytes());
- }
- else {
- message = session.createTextMessage(payloadString);
- }
- producer.send(message, DeliveryMode.PERSISTENT, 5, expiry);
- }
- connection.syncSendPacket(new ConnectionControl());
- connection.close();
- }
-
- public void startBroker(boolean deleteAllMessages) throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
- broker.addConnector("tcp://0.0.0.0:0");
- broker.start();
-
- String options = "?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.prefetchPolicy.all=1000&jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
- connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
deleted file mode 100644
index 0cd8e0b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.ObjectName;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerFilter;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.DiscoveryEvent;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkBridge;
-import org.apache.activemq.network.NetworkBridgeListener;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.discovery.DiscoveryAgent;
-import org.apache.activemq.transport.discovery.DiscoveryListener;
-import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
-import org.junit.Assert;
-
-/**
- * This test demonstrates a number of race conditions in
- * {@link DiscoveryNetworkConnector} that can result in an active bridge no
- * longer being reported as active and vice-versa, an inactive bridge still
- * being reported as active.
- */
-public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
-
- final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2);
-
- /**
- * Since these tests involve wait conditions, protect against indefinite
- * waits (due to unanticipated issues).
- */
- @Override
- public void setUp() throws Exception {
- setAutoFail(true);
- setMaxTestTime(MAX_TEST_TIME);
- super.setUp();
- }
-
- /**
- * This test demonstrates how concurrent attempts to establish a bridge to
- * the same remote broker are allowed to occur. Connection uniqueness will
- * cause whichever bridge creation attempt is second to fail. However, this
- * failure erases the entry in
- * {@link DiscoveryNetworkConnector#activeBridges()} that represents the
- * successful first bridge creation attempt.
- */
- public void testLostActiveBridge() throws Exception {
- final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15);
-
- // Start two brokers with a bridge from broker1 to broker2.
- BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
- final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
- // Allow the concurrent local bridge connections to be made even though
- // they are duplicated; this prevents both of the bridge attempts from
- // failing in the case that the local and remote bridges are established
- // out-of-order.
- BrokerPlugin ignoreAddConnectionPlugin = new BrokerPlugin() {
- @Override
- public Broker installPlugin(Broker broker) throws Exception {
- return new BrokerFilter(broker) {
- @Override
- public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
- // ignore
- }
- };
- }
- };
-
- broker1.setPlugins(new BrokerPlugin[]{ignoreAddConnectionPlugin});
-
- startAllBrokers();
-
- // Start a bridge from broker1 to broker2. The discovery agent attempts
- // to create the bridge concurrently with two threads, and the
- // synchronization in createBridge ensures that pre-patch both threads
- // actually attempt to start bridges. Post-patch, only one thread is
- // allowed to start the bridge.
- final CountDownLatch attemptLatch = new CountDownLatch(2);
- final CountDownLatch createLatch = new CountDownLatch(2);
-
- DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
- @Override
- public void onServiceAdd(DiscoveryEvent event) {
- // Pre-and-post patch, two threads attempt to establish a bridge
- // to the same remote broker.
- attemptLatch.countDown();
- super.onServiceAdd(event);
- }
-
- @Override
- protected NetworkBridge createBridge(Transport localTransport,
- Transport remoteTransport,
- final DiscoveryEvent event) {
- // Pre-patch, the two threads are allowed to create the bridge.
- // Post-patch, only the first thread is allowed. Wait a
- // reasonable delay once both attempts are detected to allow
- // the two bridge creations to occur concurrently (pre-patch).
- // Post-patch, the wait will timeout and allow the first (and
- // only) bridge creation to occur.
- try {
- attemptLatch.await();
- createLatch.countDown();
- createLatch.await(ATTEMPT_TO_CREATE_DELAY, TimeUnit.MILLISECONDS);
- return super.createBridge(localTransport, remoteTransport, event);
- }
- catch (InterruptedException e) {
- Thread.interrupted();
- return null;
- }
- }
- };
-
- nc.setDiscoveryAgent(new DiscoveryAgent() {
- TaskRunnerFactory taskRunner = new TaskRunnerFactory();
- DiscoveryListener listener;
-
- @Override
- public void start() throws Exception {
- taskRunner.init();
- taskRunner.execute(new Runnable() {
- @Override
- public void run() {
- listener.onServiceAdd(new DiscoveryEvent(broker2.getVmConnectorURI().toString()));
- }
- });
- taskRunner.execute(new Runnable() {
- @Override
- public void run() {
- listener.onServiceAdd(new DiscoveryEvent(broker2.getVmConnectorURI().toString()));
- }
- });
- }
-
- @Override
- public void stop() throws Exception {
- taskRunner.shutdown();
- }
-
- @Override
- public void setDiscoveryListener(DiscoveryListener listener) {
- this.listener = listener;
- }
-
- @Override
- public void registerService(String name) throws IOException {
- }
-
- @Override
- public void serviceFailed(DiscoveryEvent event) throws IOException {
- listener.onServiceRemove(event);
- }
- });
-
- broker1.addNetworkConnector(nc);
- nc.start();
-
- // Wait for the bridge to be formed by the first attempt.
- waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS);
-
- // Pre-patch, the second bridge creation attempt fails and removes the
- // first (successful) bridge creation attempt from the
- // list of active bridges. Post-patch, the second bridge creation
- // attempt is prevented, so the first bridge creation attempt
- // remains "active". This assertion is expected to fail pre-patch and
- // pass post-patch.
- Assert.assertFalse(nc.activeBridges().isEmpty());
- }
-
- /**
- * This test demonstrates a race condition where a failed bridge can be
- * removed from the list of active bridges in
- * {@link DiscoveryNetworkConnector} before it has been added. Eventually,
- * the failed bridge is added, but never removed, which causes subsequent
- * bridge creation attempts to be ignored. The result is a network connector
- * that thinks it has an active bridge, when in fact it doesn't.
- */
- public void testInactiveBridgStillActive() throws Exception {
- // Start two brokers with a bridge from broker1 to broker2.
- BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
- final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
- // Force bridge failure by having broker1 disallow connections.
- BrokerPlugin disallowAddConnectionPlugin = new BrokerPlugin() {
- @Override
- public Broker installPlugin(Broker broker) throws Exception {
- return new BrokerFilter(broker) {
- @Override
- public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
- throw new Exception("Test exception to force bridge failure");
- }
- };
- }
- };
-
- broker1.setPlugins(new BrokerPlugin[]{disallowAddConnectionPlugin});
-
- startAllBrokers();
-
- // Start a bridge from broker1 to broker2. The bridge delays returning
- // from start until after the bridge failure has been processed;
- // this leaves the first bridge creation attempt recorded as active,
- // even though it failed.
- final SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
- da.setServices(new URI[]{broker2.getVmConnectorURI()});
-
- final CountDownLatch attemptLatch = new CountDownLatch(3);
- final CountDownLatch removedLatch = new CountDownLatch(1);
-
- DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() {
- @Override
- public void onServiceAdd(DiscoveryEvent event) {
- attemptLatch.countDown();
- super.onServiceAdd(event);
- }
-
- @Override
- public void onServiceRemove(DiscoveryEvent event) {
- super.onServiceRemove(event);
- removedLatch.countDown();
- }
-
- @Override
- protected NetworkBridge createBridge(Transport localTransport,
- Transport remoteTransport,
- final DiscoveryEvent event) {
- final NetworkBridge next = super.createBridge(localTransport, remoteTransport, event);
- return new NetworkBridge() {
-
- @Override
- public void start() throws Exception {
- next.start();
- // Delay returning until the failed service has been
- // removed.
- removedLatch.await();
- }
-
- @Override
- public void stop() throws Exception {
- next.stop();
- }
-
- @Override
- public void serviceRemoteException(Throwable error) {
- next.serviceRemoteException(error);
- }
-
- @Override
- public void serviceLocalException(Throwable error) {
- next.serviceLocalException(error);
- }
-
- @Override
- public void setNetworkBridgeListener(NetworkBridgeListener listener) {
- next.setNetworkBridgeListener(listener);
- }
-
- @Override
- public String getRemoteAddress() {
- return next.getRemoteAddress();
- }
-
- @Override
- public String getRemoteBrokerName() {
- return next.getRemoteBrokerName();
- }
-
- @Override
- public String getRemoteBrokerId() {
- return next.getRemoteBrokerId();
- }
-
- @Override
- public String getLocalAddress() {
- return next.getLocalAddress();
- }
-
- @Override
- public String getLocalBrokerName() {
- return next.getLocalBrokerName();
- }
-
- @Override
- public long getEnqueueCounter() {
- return next.getEnqueueCounter();
- }
-
- @Override
- public long getDequeueCounter() {
- return next.getDequeueCounter();
- }
-
- @Override
- public void setMbeanObjectName(ObjectName objectName) {
- next.setMbeanObjectName(objectName);
- }
-
- @Override
- public ObjectName getMbeanObjectName() {
- return next.getMbeanObjectName();
- }
-
- @Override
- public void resetStats() {
- next.resetStats();
- }
- };
- }
- };
- nc.setDiscoveryAgent(da);
-
- broker1.addNetworkConnector(nc);
- nc.start();
-
- // All bridge attempts should fail, so the attempt latch should get
- // triggered. However, because of the race condition, the first attempt
- // is considered successful and causes further attempts to stop.
- // Therefore, this wait will time out and cause the test to fail.
- Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS));
- }
-
- /**
- * This test verifies that when a network connector is restarted, any
- * bridges that were active at the time of the stop are allowed to be
- * re-established (i.e., the "active events" data structure in
- * {@link DiscoveryNetworkConnector} is reset.
- */
- public void testAllowAttemptsAfterRestart() throws Exception {
- final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10);
-
- // Start two brokers with a bridge from broker1 to broker2.
- BrokerService broker1 = createBroker(new URI("broker:(vm://broker1)/broker1?persistent=false"));
- final BrokerService broker2 = createBroker(new URI("broker:(vm://broker2)/broker2?persistent=false"));
-
- startAllBrokers();
-
- // Start a bridge from broker1 to broker2.
- NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(), broker2.getBrokerName());
- nc.start();
-
- waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS);
-
- // Restart the network connector and verify that the bridge is
- // re-established. The pause between start/stop is to account for the
- // asynchronous closure.
- nc.stop();
- Thread.sleep(STOP_DELAY);
- nc.start();
-
- waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), MAX_TEST_TIME, TimeUnit.MILLISECONDS);
- }
-}