You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/21 23:55:32 UTC
[16/68] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
deleted file mode 100644
index 8d94998..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4517Test.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4517Test {
-
- private BrokerService brokerService;
- private String connectionUri;
-
- @Before
- public void setup() throws Exception {
- brokerService = new BrokerService();
-
- connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
-
- // Configure Dead Letter Strategy
- DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
- ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true);
- ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ.");
- strategy.setProcessNonPersistent(false);
- strategy.setProcessExpired(false);
-
- // Add policy and individual DLQ strategy
- PolicyEntry policy = new PolicyEntry();
- policy.setTimeBeforeDispatchStarts(3000);
- policy.setDeadLetterStrategy(strategy);
-
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
-
- brokerService.setDestinationPolicy(pMap);
- brokerService.setPersistent(false);
- brokerService.start();
- }
-
- @After
- public void stop() throws Exception {
- brokerService.stop();
- }
-
- @Test(timeout = 360000)
- public void test() throws Exception {
-
- final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
-
- final AtomicBoolean advised = new AtomicBoolean(false);
- Connection connection = cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dlqDestination = session.createTopic(AdvisorySupport.MESSAGE_DLQ_TOPIC_PREFIX + ">");
- MessageConsumer consumer = session.createConsumer(dlqDestination);
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- advised.set(true);
- }
- });
- connection.start();
-
- ExecutorService service = Executors.newSingleThreadExecutor();
-
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTemporaryQueue();
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.setTimeToLive(400);
- producer.send(session.createTextMessage());
- producer.send(session.createTextMessage());
- TimeUnit.MILLISECONDS.sleep(500);
- connection.close();
- }
- catch (Exception e) {
- }
- }
- });
-
- service.shutdown();
- assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
- assertFalse("Should not get any Advisories for DLQ'd Messages", advised.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
deleted file mode 100644
index 92021bf..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4518Test.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4518Test {
-
- private BrokerService brokerService;
- private String connectionUri;
-
- @Before
- public void setup() throws Exception {
- brokerService = new BrokerService();
-
- connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
-
- // Configure Dead Letter Strategy
- DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
- ((IndividualDeadLetterStrategy) strategy).setUseQueueForQueueMessages(true);
- ((IndividualDeadLetterStrategy) strategy).setQueuePrefix("DLQ.");
- strategy.setProcessNonPersistent(false);
- strategy.setProcessExpired(false);
-
- // Add policy and individual DLQ strategy
- PolicyEntry policy = new PolicyEntry();
- policy.setTimeBeforeDispatchStarts(3000);
- policy.setDeadLetterStrategy(strategy);
-
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
-
- brokerService.setDestinationPolicy(pMap);
- brokerService.setPersistent(false);
- brokerService.start();
- }
-
- @After
- public void stop() throws Exception {
- brokerService.stop();
- }
-
- @Test(timeout = 360000)
- public void test() throws Exception {
-
- final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
-
- final AtomicBoolean advised = new AtomicBoolean(false);
- Connection connection = cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dlqDestination = session.createTopic(AdvisorySupport.EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + ">");
- MessageConsumer consumer = session.createConsumer(dlqDestination);
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- advised.set(true);
- }
- });
- connection.start();
-
- ExecutorService service = Executors.newSingleThreadExecutor();
-
- service.execute(new Runnable() {
- @Override
- public void run() {
- try {
- ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTemporaryQueue();
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.setTimeToLive(400);
- producer.send(session.createTextMessage());
- producer.send(session.createTextMessage());
- TimeUnit.MILLISECONDS.sleep(500);
- connection.close();
- }
- catch (Exception e) {
- }
- }
- });
-
- service.shutdown();
- assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
- assertFalse("Should not get any Advisories for Expired Messages", advised.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java
deleted file mode 100644
index d57501e..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularDataSupport;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.CompositeDataConstants;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4530Test {
-
- private static BrokerService brokerService;
- private static String TEST_QUEUE = "testQueue";
- private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
- private static String BROKER_ADDRESS = "tcp://localhost:0";
- private static String KEY = "testproperty";
- private static String VALUE = "propvalue";
-
- private ActiveMQConnectionFactory connectionFactory;
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- brokerService.start();
- brokerService.waitUntilStarted();
-
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- sendMessage();
- }
-
- public void sendMessage() throws Exception {
- final Connection conn = connectionFactory.createConnection();
- try {
- conn.start();
- final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Destination queue = session.createQueue(TEST_QUEUE);
- final Message toSend = session.createMessage();
- toSend.setStringProperty(KEY, VALUE);
- final MessageProducer producer = session.createProducer(queue);
- producer.send(queue, toSend);
- }
- finally {
- conn.close();
- }
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testStringPropertiesFromCompositeData() throws Exception {
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
- final CompositeData message = queueView.browse()[0];
- assertNotNull(message);
- TabularDataSupport stringProperties = (TabularDataSupport) message.get(CompositeDataConstants.STRING_PROPERTIES);
- assertNotNull(stringProperties);
- assertThat(stringProperties.size(), is(greaterThan(0)));
- Map.Entry<Object, Object> compositeDataEntry = (Map.Entry<Object, Object>) stringProperties.entrySet().toArray()[0];
- CompositeData stringEntry = (CompositeData) compositeDataEntry.getValue();
- assertThat(String.valueOf(stringEntry.get("key")), equalTo(KEY));
- assertThat(String.valueOf(stringEntry.get("value")), equalTo(VALUE));
- }
-
- private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, NullPointerException, JMSException {
- final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
- final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
deleted file mode 100644
index d303561..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4531Test.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.io.Writer;
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.CountDownLatch;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit test for simple App.
- */
-public class AMQ4531Test extends TestCase {
-
- private final Logger LOG = LoggerFactory.getLogger(AMQ4531Test.class);
-
- private String connectionURI;
- private MBeanServer mbeanServer;
- private BrokerService broker;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- broker = new BrokerService();
- connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString();
- broker.setPersistent(false);
- broker.start();
- mbeanServer = ManagementFactory.getPlatformMBeanServer();
- }
-
- @Override
- protected void tearDown() throws Exception {
- broker.stop();
- super.tearDown();
- }
-
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AMQ4531Test(String testName) {
- super(testName);
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite() {
- return new TestSuite(AMQ4531Test.class);
- }
-
- public void testFDSLeak() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
- ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
- connection.start();
-
- int connections = 100;
- final long original = openFileDescriptorCount();
- LOG.info("FD count: " + original);
- final CountDownLatch done = new CountDownLatch(connections);
- for (int i = 0; i < connections; i++) {
- new Thread("worker: " + i) {
- @Override
- public void run() {
- ActiveMQConnection connection = null;
- try {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
- connection = (ActiveMQConnection) factory.createConnection();
- connection.start();
- }
- catch (Exception e) {
- LOG.debug(getStack(e));
- }
- finally {
- try {
- connection.close();
- }
- catch (Exception e) {
- LOG.debug(getStack(e));
- }
- done.countDown();
- LOG.debug("Latch count down called.");
- }
- }
- }.start();
- }
-
- // Wait for all the clients to finish
- LOG.info("Waiting for latch...");
- done.await();
- LOG.info("Latch complete.");
- LOG.info("FD count: " + openFileDescriptorCount());
-
- assertTrue("Too many open file descriptors: " + openFileDescriptorCount(), Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- long openFDs = openFileDescriptorCount();
- LOG.info("Current FD count [{}], original FD count[{}]", openFDs, original);
- return (openFDs - original) < 10;
- }
- }));
- }
-
- private long openFileDescriptorCount() throws Exception {
- return ((Long) mbeanServer.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "OpenFileDescriptorCount")).longValue();
- }
-
- private String getStack(Throwable aThrowable) {
- final Writer result = new StringWriter();
- final PrintWriter printWriter = new PrintWriter(result);
- aThrowable.printStackTrace(printWriter);
- return result.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java
deleted file mode 100644
index 1113ee4..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4554Test.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit test for simple App.
- */
-public class AMQ4554Test extends TestCase {
-
- private final Logger LOG = LoggerFactory.getLogger(AMQ4554Test.class);
-
- private String connectionURI;
- private BrokerService broker;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- broker = new BrokerService();
- connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString();
- broker.setPersistent(false);
- broker.start();
- }
-
- @Override
- protected void tearDown() throws Exception {
- broker.stop();
- super.tearDown();
- }
-
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AMQ4554Test(String testName) {
- super(testName);
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite() {
- return new TestSuite(AMQ4554Test.class);
- }
-
- public void testMSXProducerTXID() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
- Connection connection = factory.createConnection();
- connection.start();
-
- Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSession.createProducer(producerSession.createQueue("myQueue"));
- TextMessage producerMessage = producerSession.createTextMessage("Test Message");
- producer.send(producerMessage);
- producer.close();
- producerSession.commit();
- producerSession.close();
-
- Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue("myQueue"));
- Message consumerMessage = consumer.receive(1000);
- try {
- String txId = consumerMessage.getStringProperty("JMSXProducerTXID");
- assertNotNull(txId);
- }
- catch (Exception e) {
- LOG.info("Caught Exception that was not expected:", e);
- fail("Should not throw");
- }
- consumer.close();
- consumerSession.commit();
- consumerSession.close();
- connection.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java
deleted file mode 100644
index 9612a34..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-
-import javax.jms.Connection;
-import javax.jms.Session;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.ConsumerThread;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4582Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4582Test.class);
-
- BrokerService broker;
- Connection connection;
- Session session;
-
- public static final String KEYSTORE_TYPE = "jks";
- public static final String PASSWORD = "password";
- public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
- public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
-
- public static final int PRODUCER_COUNT = 10;
- public static final int CONSUMER_COUNT = 10;
- public static final int MESSAGE_COUNT = 1000;
-
- final ConsumerThread[] consumers = new ConsumerThread[CONSUMER_COUNT];
-
- @Before
- public void setUp() throws Exception {
- System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
- System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
- System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
- System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
- System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
- System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- try {
- broker.stop();
- }
- catch (Exception e) {
- }
- }
- }
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void simpleTest() throws Exception {
- thrown.expect(IOException.class);
- thrown.expectMessage("enabledCipherSuites=BADSUITE");
-
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(false);
- try {
- broker.addConnector("ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=BADSUITE");
- broker.start();
- broker.waitUntilStarted();
- }
- catch (Exception e) {
- LOG.info("BrokerService threw:", e);
- throw e;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
deleted file mode 100644
index 3c16bab..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-import java.util.Date;
-import java.util.Enumeration;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertEquals;
-
-public class AMQ4595Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class);
-
- private BrokerService broker;
- private URI connectUri;
- private ActiveMQConnectionFactory factory;
-
- @Before
- public void startBroker() throws Exception {
- broker = new BrokerService();
- TransportConnector connector = broker.addConnector("vm://localhost");
- broker.deleteAllMessages();
-
- //PolicyMap pMap = new PolicyMap();
- //PolicyEntry policyEntry = new PolicyEntry();
- //policyEntry.setMaxBrowsePageSize(10000);
- //pMap.put(new ActiveMQQueue(">"), policyEntry);
- // when no policy match, browserSub has maxMessages==0
- //broker.setDestinationPolicy(pMap);
-
- broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
- broker.start();
- broker.waitUntilStarted();
- connectUri = connector.getConnectUri();
- factory = new ActiveMQConnectionFactory(connectUri);
- }
-
- @After
- public void stopBroker() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- @Test(timeout = 120000)
- public void testBrowsingSmallBatch() throws JMSException {
- doTestBrowsing(100);
- }
-
- @Test(timeout = 160000)
- public void testBrowsingMediumBatch() throws JMSException {
- doTestBrowsing(1000);
- }
-
- @Test(timeout = 300000)
- public void testBrowsingLargeBatch() throws JMSException {
- doTestBrowsing(10000);
- }
-
- private void doTestBrowsing(int messageToSend) throws JMSException {
- ActiveMQQueue queue = new ActiveMQQueue("TEST");
-
- // Send the messages to the Queue.
- ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection();
- producerConnection.setUseAsyncSend(true);
- producerConnection.start();
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- for (int i = 1; i <= messageToSend; i++) {
- String msgStr = provideMessageText(i, 8192);
- producer.send(producerSession.createTextMessage(msgStr));
- if ((i % 1000) == 0) {
- LOG.info("P&C: {}", msgStr.substring(0, 100));
- }
- }
- producerConnection.close();
-
- LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
-
- // Browse the queue.
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- QueueBrowser browser = session.createBrowser(queue);
- Enumeration<?> enumeration = browser.getEnumeration();
- int browsed = 0;
- while (enumeration.hasMoreElements()) {
- TextMessage m = (TextMessage) enumeration.nextElement();
- browsed++;
- if ((browsed % 1000) == 0) {
- LOG.info("B[{}]: {}", browsed, m.getText().substring(0, 100));
- }
- }
- browser.close();
- session.close();
- connection.close();
-
- LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
-
- // The number of messages browsed should be equal to the number of messages sent.
- assertEquals(messageToSend, browsed);
-
- browser.close();
- }
-
- public String provideMessageText(int messageNumber, int messageSize) {
- StringBuilder buf = new StringBuilder();
- buf.append("Message: ");
- if (messageNumber > 0) {
- buf.append(messageNumber);
- }
- buf.append(" sent at: ").append(new Date());
-
- if (buf.length() > messageSize) {
- return buf.substring(0, messageSize);
- }
- for (int i = buf.length(); i < messageSize; i++) {
- buf.append(' ');
- }
- return buf.toString();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
deleted file mode 100644
index 527309b..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-
-import junit.framework.Test;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4607Test.class);
-
- public static final int BROKER_COUNT = 3;
- public static final int CONSUMER_COUNT = 1;
- public static final int MESSAGE_COUNT = 0;
- public static final boolean CONDUIT = true;
- public static final int TIMEOUT = 20000;
-
- public boolean duplex = true;
- protected Map<String, MessageConsumer> consumerMap;
- final Map<Thread, Throwable> unhandeledExceptions = new HashMap<>();
-
- private void assertNoUnhandeledExceptions() {
- for (Entry<Thread, Throwable> e : unhandeledExceptions.entrySet()) {
- LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
- }
- assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions, unhandeledExceptions.isEmpty());
- }
-
- public NetworkConnector bridge(String from, String to) throws Exception {
- NetworkConnector networkConnector = bridgeBrokers(from, to, true, -1, CONDUIT);
- networkConnector.setSuppressDuplicateQueueSubscriptions(true);
- networkConnector.setDecreaseNetworkConsumerPriority(true);
- networkConnector.setConsumerTTL(1);
- networkConnector.setDuplex(duplex);
- return networkConnector;
- }
-
- public static Test suite() {
- return suite(AMQ4607Test.class);
- }
-
- public void initCombos() {
- addCombinationValues("duplex", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
- }
-
- public void testMigratingConsumer() throws Exception {
- bridge("Broker0", "Broker1");
- if (!duplex)
- bridge("Broker1", "Broker0");
-
- bridge("Broker1", "Broker2");
- if (!duplex)
- bridge("Broker2", "Broker1");
-
- bridge("Broker0", "Broker2");
- if (!duplex)
- bridge("Broker2", "Broker0");
-
- startAllBrokers();
- this.waitForBridgeFormation();
-
- Destination dest = createDestination("TEST.FOO", false);
- sendMessages("Broker0", dest, 1);
-
- for (int i = 0; i < BROKER_COUNT; i++) {
- MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
-
- for (int J = 0; J < BROKER_COUNT; J++) {
- assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
- }
-
- assertNoUnhandeledExceptions();
-
- assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
-
- messageConsumer.close();
- LOG.info("Check for no consumers..");
- for (int J = 0; J < BROKER_COUNT; J++) {
- assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
- }
- }
-
- // now consume the message
- final String brokerId = "Broker2";
- MessageConsumer messageConsumer = createConsumer(brokerId, dest);
- assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
- }
- }));
- messageConsumer.close();
-
- }
-
- public void testMigratingConsumerFullCircle() throws Exception {
- bridge("Broker0", "Broker1");
- if (!duplex)
- bridge("Broker1", "Broker0");
-
- bridge("Broker1", "Broker2");
- if (!duplex)
- bridge("Broker2", "Broker1");
-
- bridge("Broker0", "Broker2");
- if (!duplex)
- bridge("Broker2", "Broker0");
-
- // allow full loop, immediate replay back to 0 from 2
- ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
- conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
- conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
- brokers.get("Broker2").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
- startAllBrokers();
- this.waitForBridgeFormation();
-
- Destination dest = createDestination("TEST.FOO", false);
-
- sendMessages("Broker0", dest, 1);
-
- for (int i = 0; i < BROKER_COUNT; i++) {
- MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
-
- for (int J = 0; J < BROKER_COUNT; J++) {
- assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
- }
-
- assertNoUnhandeledExceptions();
-
- // validate the message has been forwarded
- assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
-
- messageConsumer.close();
- LOG.info("Check for no consumers..");
- for (int J = 0; J < BROKER_COUNT; J++) {
- assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
- }
- }
-
- // now consume the message from the origin
- LOG.info("Consume from origin...");
- final String brokerId = "Broker0";
- MessageConsumer messageConsumer = createConsumer(brokerId, dest);
- assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
- }
- }));
- messageConsumer.close();
-
- }
-
- protected void assertExactMessageCount(final String brokerName,
- Destination destination,
- final int count,
- long timeout) throws Exception {
- ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
- final QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
- assertTrue("Excepected queue depth: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- long currentCount = queueViewMBean.getQueueSize();
- LOG.info("On " + brokerName + " current queue size for " + queueViewMBean + ", " + currentCount);
- if (count != currentCount) {
- LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
- }
- return currentCount == count;
- }
- }, timeout));
- }
-
- protected void assertExactConsumersConnect(final String brokerName,
- Destination destination,
- final int count,
- long timeout) throws Exception {
- final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
- assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- try {
- QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
- long currentCount = queueViewMBean.getConsumerCount();
- LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
- if (count != currentCount) {
- LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
- }
- return currentCount == count;
- }
- catch (Exception e) {
- LOG.warn("Unexpected: " + e, e);
- return false;
- }
- }
- }, timeout));
- }
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
-
- unhandeledExceptions.clear();
- Thread.setDefaultUncaughtExceptionHandler(this);
-
- // Setup n brokers
- for (int i = 0; i < BROKER_COUNT; i++) {
- createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true"));
- }
-
- consumerMap = new LinkedHashMap<>();
- }
-
- @Override
- protected void configureBroker(BrokerService brokerService) {
- PolicyEntry policyEntry = new PolicyEntry();
- policyEntry.setExpireMessagesPeriod(0);
- PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(policyEntry);
- brokerService.setDestinationPolicy(policyMap);
- }
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- synchronized (unhandeledExceptions) {
- unhandeledExceptions.put(t, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
deleted file mode 100644
index 9cb9c66..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.concurrent.CountDownLatch;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
-import org.apache.activemq.store.jdbc.TransactionContext;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.fail;
-
-/**
- * Testing how the broker reacts when a SQL Exception is thrown from
- * org.apache.activemq.store.jdbc.TransactionContext.executeBatch().
- * <br>
- * see https://issues.apache.org/jira/browse/AMQ-4636
- */
-public class AMQ4636Test {
-
- private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4636Test.class);
- private String transportUrl = "tcp://0.0.0.0:0";
- private BrokerService broker;
- EmbeddedDataSource embeddedDataSource;
- CountDownLatch throwSQLException = new CountDownLatch(0);
-
- @Before
- public void startBroker() throws Exception {
- broker = createBroker();
- broker.deleteAllMessages();
- broker.start();
- broker.waitUntilStarted();
- LOG.info("Broker started...");
- }
-
- @After
- public void stopBroker() throws Exception {
- if (broker != null) {
- LOG.info("Stopping broker...");
- broker.stop();
- broker.waitUntilStopped();
- }
- try {
- if (embeddedDataSource != null) {
- // ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup
- embeddedDataSource.setShutdownDatabase("shutdown");
- embeddedDataSource.getConnection();
- }
- }
- catch (Exception ignored) {
- }
- finally {
- embeddedDataSource.setShutdownDatabase(null);
- }
- }
-
- protected BrokerService createBroker() throws Exception {
-
- embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
- embeddedDataSource.setCreateDatabase("create");
- embeddedDataSource.getConnection().close();
-
- //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
- // method that can be configured to throw a SQL exception on demand
- JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
- jdbc.setDataSource(embeddedDataSource);
-
- jdbc.setLockKeepAlivePeriod(1000L);
- LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
- leaseDatabaseLocker.setLockAcquireSleepInterval(2000L);
- jdbc.setLocker(leaseDatabaseLocker);
-
- broker = new BrokerService();
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setExpireMessagesPeriod(0);
- policyMap.setDefaultEntry(defaultEntry);
- broker.setDestinationPolicy(policyMap);
- broker.setPersistenceAdapter(jdbc);
-
- broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler());
-
- transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
- return broker;
- }
-
- /**
- * adding a TestTransactionContext (wrapper to TransactionContext) so an SQLException is triggered
- * during TransactionContext.executeBatch() when called in the broker.
- * <br>
- * Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the
- * message. SQLException should NOT be returned to client
- */
- @Test
- public void testProducerWithDBShutdown() throws Exception {
-
- // failover but timeout in 1 seconds so the test does not hang
- String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=1000";
-
- this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
-
- this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false);
-
- }
-
- @Test
- public void testTransactedProducerCommitWithDBShutdown() throws Exception {
-
- // failover but timeout in 1 seconds so the test does not hang
- String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=1000";
-
- this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
-
- try {
- this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true);
- fail("Expect rollback after failover - inddoubt commit");
- }
- catch (javax.jms.TransactionRolledBackException expectedInDoubt) {
- LOG.info("Got rollback after failover failed commit", expectedInDoubt);
- }
- }
-
- @Test
- public void testTransactedProducerRollbackWithDBShutdown() throws Exception {
-
- // failover but timeout in 1 seconds so the test does not hang
- String failoverTransportURL = "failover:(" + transportUrl + ")?timeout=1000";
-
- this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
-
- this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false);
- }
-
- public void createDurableConsumer(String topic, String transportURL) throws JMSException {
- Connection connection = null;
- LOG.info("*** createDurableConsumer() called ...");
-
- try {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
-
- connection = factory.createConnection();
- connection.setClientID("myconn1");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(topic);
-
- TopicSubscriber topicSubscriber = session.createDurableSubscriber((Topic) destination, "MySub1");
- }
- finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit) throws JMSException {
- Connection connection = null;
-
- try {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(transportURL);
-
- connection = factory.createConnection();
- Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(topic);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- Message m = session.createTextMessage("testMessage");
- LOG.info("*** send message to broker...");
-
- // trigger SQL exception in transactionContext
- throwSQLException = new CountDownLatch(1);
- producer.send(m);
-
- if (transacted) {
- if (commit) {
- session.commit();
- }
- else {
- session.rollback();
- }
- }
-
- LOG.info("*** Finished send message to broker");
-
- }
- finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- /*
- * Mock classes used for testing
- */
-
- public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
-
- @Override
- public TransactionContext getTransactionContext() throws IOException {
- return new TestTransactionContext(this);
- }
- }
-
- public class TestTransactionContext extends TransactionContext {
-
- public TestTransactionContext(JDBCPersistenceAdapter jdbcPersistenceAdapter) throws IOException {
- super(jdbcPersistenceAdapter);
- }
-
- @Override
- public void executeBatch() throws SQLException {
- if (throwSQLException.getCount() > 0) {
- // only throw exception once
- throwSQLException.countDown();
- throw new SQLException("TEST SQL EXCEPTION");
- }
- super.executeBatch();
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java
deleted file mode 100644
index 0fb900a..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(value = Parameterized.class)
-public class AMQ4656Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4656Test.class);
- private static BrokerService brokerService;
- private static String BROKER_ADDRESS = "tcp://localhost:0";
-
- private String connectionUri;
-
- @Parameterized.Parameter
- public PendingDurableSubscriberMessageStoragePolicy pendingDurableSubPolicy;
-
- @Parameterized.Parameters(name = "{0}")
- public static Iterable<Object[]> getTestParameters() {
- return Arrays.asList(new Object[][]{{new FilePendingDurableSubscriberMessageStoragePolicy()}, {new StorePendingDurableSubscriberMessageStoragePolicy()}});
- }
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setPendingDurableSubscriberPolicy(pendingDurableSubPolicy);
- policyMap.setDefaultEntry(defaultEntry);
- brokerService.setDestinationPolicy(policyMap);
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- brokerService.setDeleteAllMessagesOnStartup(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- brokerService.start();
- brokerService.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
-
- Connection connection = connectionFactory.createConnection();
- connection.setClientID(getClass().getName());
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic("DurableTopic");
-
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
-
- BrokerView view = brokerService.getAdminView();
- view.getDurableTopicSubscribers();
-
- ObjectName subName = view.getDurableTopicSubscribers()[0];
-
- DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
-
- assertEquals(0, sub.getEnqueueCounter());
- assertEquals(0, sub.getDequeueCounter());
- assertEquals(0, sub.getPendingQueueSize());
- assertEquals(0, sub.getDispatchedCounter());
- assertEquals(0, sub.getDispatchedQueueSize());
-
- consumer.close();
-
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < 20; i++) {
- producer.send(session.createMessage());
- }
- producer.close();
-
- consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
-
- Thread.sleep(1000);
-
- assertEquals(20, sub.getEnqueueCounter());
- assertEquals(0, sub.getDequeueCounter());
- assertEquals(0, sub.getPendingQueueSize());
- assertEquals(20, sub.getDispatchedCounter());
- assertEquals(20, sub.getDispatchedQueueSize());
-
- LOG.info("Pending Queue Size with no receives: {}", sub.getPendingQueueSize());
-
- assertNotNull(consumer.receive(1000));
- assertNotNull(consumer.receive(1000));
-
- consumer.close();
-
- Thread.sleep(2000);
-
- LOG.info("Pending Queue Size with two receives: {}", sub.getPendingQueueSize());
-
- assertEquals(20, sub.getEnqueueCounter());
- assertEquals(2, sub.getDequeueCounter());
- assertEquals(18, sub.getPendingQueueSize());
- assertEquals(20, sub.getDispatchedCounter());
- assertEquals(0, sub.getDispatchedQueueSize());
-
- session.close();
- connection.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java
deleted file mode 100644
index 165d5fd..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.fail;
-
-import javax.jms.Connection;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4671Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4671Test.class);
- private static BrokerService brokerService;
- private static String BROKER_ADDRESS = "tcp://localhost:0";
-
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- brokerService.setDeleteAllMessagesOnStartup(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- connectionUri = connectionUri + "?trace=true";
- brokerService.start();
- brokerService.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testNonDurableSubscriberInvalidUnsubscribe() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
-
- Connection connection = connectionFactory.createConnection();
- connection.setClientID(getClass().getName());
- connection.start();
-
- try {
- Session ts = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- try {
- ts.unsubscribe("invalid-subscription-name");
- fail("this should fail");
- }
- catch (javax.jms.InvalidDestinationException e) {
- LOG.info("Test caught correct invalid destination exception");
- }
- }
- finally {
- connection.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java
deleted file mode 100644
index d7da045..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.*;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.apache.activemq.leveldb.LevelDBStoreViewMBean;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4677Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4677Test.class);
- private static BrokerService brokerService;
-
- @Rule
- public TestName name = new TestName();
-
- private File dataDirFile;
-
- @Before
- public void setUp() throws Exception {
-
- dataDirFile = new File("target/LevelDBCleanupTest");
-
- brokerService = new BrokerService();
- brokerService.setBrokerName("LevelDBBroker");
- brokerService.setPersistent(true);
- brokerService.setUseJmx(true);
- brokerService.setAdvisorySupport(false);
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.setDataDirectoryFile(dataDirFile);
-
- LevelDBStore persistenceFactory = new LevelDBStore();
- persistenceFactory.setDirectory(dataDirFile);
- brokerService.setPersistenceAdapter(persistenceFactory);
- brokerService.start();
- brokerService.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testSendAndReceiveAllMessages() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://LevelDBBroker");
-
- Connection connection = connectionFactory.createConnection();
- connection.setClientID(getClass().getName());
- connection.start();
-
- final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(name.toString());
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- final LevelDBStoreViewMBean levelDBView = getLevelDBStoreMBean();
- assertNotNull(levelDBView);
- levelDBView.compact();
-
- final int SIZE = 6 * 1024 * 5;
- final int MSG_COUNT = 60000;
- final CountDownLatch done = new CountDownLatch(MSG_COUNT);
-
- byte buffer[] = new byte[SIZE];
- for (int i = 0; i < SIZE; ++i) {
- buffer[i] = (byte) 128;
- }
-
- for (int i = 0; i < MSG_COUNT; ++i) {
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(buffer);
- producer.send(message);
-
- if ((i % 1000) == 0) {
- LOG.info("Sent message #{}", i);
- session.commit();
- }
- }
-
- session.commit();
-
- LOG.info("Finished sending all messages.");
-
- MessageConsumer consumer = session.createConsumer(destination);
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- if ((done.getCount() % 1000) == 0) {
- try {
- LOG.info("Received message #{}", MSG_COUNT - done.getCount());
- session.commit();
- }
- catch (JMSException e) {
- }
- }
- done.countDown();
- }
- });
-
- done.await(15, TimeUnit.MINUTES);
- session.commit();
- LOG.info("Finished receiving all messages.");
-
- assertTrue("Should < 3 logfiles left.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- levelDBView.compact();
- return countLogFiles() < 3;
- }
- }, TimeUnit.MINUTES.toMillis(5), (int) TimeUnit.SECONDS.toMillis(30)));
-
- levelDBView.compact();
- LOG.info("Current number of logs {}", countLogFiles());
- }
-
- protected long countLogFiles() {
- String[] logFiles = dataDirFile.list(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- if (name.endsWith("log")) {
- return true;
- }
- return false;
- }
- });
-
- LOG.info("Current number of logs {}", logFiles.length);
- return logFiles.length;
- }
-
- protected LevelDBStoreViewMBean getLevelDBStoreMBean() throws Exception {
- ObjectName levelDbViewMBeanQuery = new ObjectName("org.apache.activemq:type=Broker,brokerName=LevelDBBroker,service=PersistenceAdapter,instanceName=LevelDB*");
-
- Set<ObjectName> names = brokerService.getManagementContext().queryNames(null, levelDbViewMBeanQuery);
- if (names.isEmpty() || names.size() > 1) {
- throw new java.lang.IllegalStateException("Can't find levelDB store name.");
- }
-
- LevelDBStoreViewMBean proxy = (LevelDBStoreViewMBean) brokerService.getManagementContext().newProxyInstance(names.iterator().next(), LevelDBStoreViewMBean.class, true);
- return proxy;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4c717ca5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java
deleted file mode 100644
index ad04d96..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisoryBroker;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4853Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4853Test.class);
- private static BrokerService brokerService;
- private static final String BROKER_ADDRESS = "tcp://localhost:0";
- private static final ActiveMQQueue DESTINATION = new ActiveMQQueue("TEST.QUEUE");
- private CountDownLatch cycleDoneLatch;
-
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(false);
- brokerService.setAdvisorySupport(true);
- brokerService.setDeleteAllMessagesOnStartup(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
-
- brokerService.start();
- brokerService.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- /**
- * Test to shows the performance of the removing consumers while other stay active.
- *
- * @throws Exception
- */
- @Ignore
- @Test
- public void test() throws Exception {
-
- // Create a stable set of consumers to fill in the advisory broker's consumer list.
- ArrayList<Consumer> fixedConsumers = new ArrayList<>(100);
- for (int i = 0; i < 200; ++i) {
- fixedConsumers.add(new Consumer());
- }
-
- // Create a set of consumers that comes online for a short time and then
- // goes offline again. Cycles will repeat as each batch completes
- final int fixedDelayConsumers = 300;
- final int fixedDelayCycles = 25;
-
- final CountDownLatch fixedDelayCycleLatch = new CountDownLatch(fixedDelayCycles);
-
- // Update so done method can track state.
- cycleDoneLatch = fixedDelayCycleLatch;
-
- CyclicBarrier barrier = new CyclicBarrier(fixedDelayConsumers, new Runnable() {
- @Override
- public void run() {
- LOG.info("Fixed delay consumers cycle {} completed.", fixedDelayCycleLatch.getCount());
- fixedDelayCycleLatch.countDown();
- }
- });
-
- for (int i = 0; i < fixedDelayConsumers; ++i) {
- new Thread(new FixedDelyConsumer(barrier)).start();
- }
-
- fixedDelayCycleLatch.await(10, TimeUnit.MINUTES);
-
- // Clean up.
-
- for (Consumer consumer : fixedConsumers) {
- consumer.close();
- }
- fixedConsumers.clear();
- }
-
- private ConnectionInfo createConnectionInfo() {
- ConnectionId id = new ConnectionId();
- id.setValue("ID:123456789:0:1");
-
- ConnectionInfo info = new ConnectionInfo();
- info.setConnectionId(id);
- return info;
- }
-
- private SessionInfo createSessionInfo(ConnectionInfo connection) {
- SessionId id = new SessionId(connection.getConnectionId(), 1);
-
- SessionInfo info = new SessionInfo();
- info.setSessionId(id);
-
- return info;
- }
-
- public ConsumerInfo createConsumerInfo(SessionInfo session, int value, ActiveMQDestination destination) {
- ConsumerId id = new ConsumerId();
- id.setConnectionId(session.getSessionId().getConnectionId());
- id.setSessionId(1);
- id.setValue(value);
-
- ConsumerInfo info = new ConsumerInfo();
- info.setConsumerId(id);
- info.setDestination(destination);
- return info;
- }
-
- /**
- * Test to shows the performance impact of removing consumers in various scenarios.
- *
- * @throws Exception
- */
- @Ignore
- @Test
- public void testPerformanceOfRemovals() throws Exception {
- // setup
- AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
- ActiveMQDestination destination = new ActiveMQQueue("foo");
- ConnectionInfo connectionInfo = createConnectionInfo();
- ConnectionContext connectionContext = new ConnectionContext(connectionInfo);
- connectionContext.setBroker(brokerService.getBroker());
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < 200; ++i) {
-
- for (int j = 1; j <= 500; j++) {
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
- testObj.addConsumer(connectionContext, consumerInfo);
- }
-
- for (int j = 500; j > 0; j--) {
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
- testObj.removeConsumer(connectionContext, consumerInfo);
- }
-
- for (int j = 1; j <= 500; j++) {
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
- testObj.addConsumer(connectionContext, consumerInfo);
- }
-
- for (int j = 1; j <= 500; j++) {
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
- testObj.removeConsumer(connectionContext, consumerInfo);
- }
- }
-
- long finish = System.currentTimeMillis();
-
- long totalTime = finish - start;
-
- LOG.info("Total test time: {} seconds", TimeUnit.MILLISECONDS.toSeconds(totalTime));
-
- assertEquals(0, testObj.getAdvisoryConsumers().size());
- }
-
- @Test
- public void testEqualsNeeded() throws Exception {
- // setup
- AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
- ActiveMQDestination destination = new ActiveMQQueue("foo");
- ConnectionInfo connectionInfo = createConnectionInfo();
- ConnectionContext connectionContext = new ConnectionContext(connectionInfo);
- connectionContext.setBroker(brokerService.getBroker());
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
-
- for (int j = 1; j <= 5; j++) {
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
- testObj.addConsumer(connectionContext, consumerInfo);
- }
-
- for (int j = 1; j <= 5; j++) {
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination);
- testObj.removeConsumer(connectionContext, consumerInfo);
- }
-
- assertEquals(0, testObj.getAdvisoryConsumers().size());
- }
-
- private boolean done() {
- if (cycleDoneLatch == null) {
- return true;
- }
- return cycleDoneLatch.getCount() == 0;
- }
-
- class Consumer implements MessageListener {
-
- Connection connection;
- Session session;
- Destination destination;
- MessageConsumer consumer;
-
- Consumer() throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- connection = factory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session.createConsumer(DESTINATION);
- consumer.setMessageListener(this);
- connection.start();
- }
-
- @Override
- public void onMessage(Message message) {
- }
-
- public void close() {
- try {
- connection.close();
- }
- catch (Exception e) {
- }
-
- connection = null;
- session = null;
- consumer = null;
- }
- }
-
- class FixedDelyConsumer implements Runnable {
-
- private final CyclicBarrier barrier;
- private final int sleepInterval;
-
- public FixedDelyConsumer(CyclicBarrier barrier) {
- this.barrier = barrier;
- this.sleepInterval = 1000;
- }
-
- public FixedDelyConsumer(CyclicBarrier barrier, int sleepInterval) {
- this.barrier = barrier;
- this.sleepInterval = sleepInterval;
- }
-
- @Override
- public void run() {
- while (!done()) {
-
- try {
- Consumer consumer = new Consumer();
- TimeUnit.MILLISECONDS.sleep(sleepInterval);
- consumer.close();
- barrier.await();
- }
- catch (Exception ex) {
- return;
- }
- }
- }
- }
-
-}