You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:32 UTC
[23/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java
deleted file mode 100644
index 7556def..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.transport.stomp.Stomp;
-import org.apache.activemq.transport.stomp.StompConnection;
-import org.apache.activemq.util.DefaultTestAppender;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3622Test {
-
- protected BrokerService broker;
- protected AtomicBoolean failed = new AtomicBoolean(false);
- protected String connectionUri;
- protected Appender appender = new DefaultTestAppender() {
-
- @Override
- public void doAppend(LoggingEvent event) {
- System.err.println(event.getMessage());
- if (event.getThrowableInformation() != null) {
- if (event.getThrowableInformation().getThrowable() instanceof NullPointerException) {
- failed.set(true);
- }
- }
- }
- };
-
- @Before
- public void before() throws Exception {
- Logger.getRootLogger().addAppender(appender);
-
- broker = new BrokerService();
- broker.setDataDirectory("target" + File.separator + "activemq-data");
- broker.setPersistent(true);
- broker.setDeleteAllMessagesOnStartup(true);
- PolicyEntry policy = new PolicyEntry();
- policy.setTopic(">");
- policy.setProducerFlowControl(false);
- policy.setMemoryLimit(1 * 1024 * 1024);
- policy.setPendingSubscriberPolicy(new FilePendingSubscriberMessageStoragePolicy());
- policy.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy());
- policy.setExpireMessagesPeriod(500);
- List<PolicyEntry> entries = new ArrayList<>();
-
- entries.add(policy);
- PolicyMap pMap = new PolicyMap();
- pMap.setPolicyEntries(entries);
- broker.setDestinationPolicy(pMap);
-
- connectionUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString();
-
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void after() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- Logger.getRootLogger().removeAppender(appender);
- }
-
- @Test
- public void go() throws Exception {
- StompConnection connection = new StompConnection();
- Integer port = Integer.parseInt(connectionUri.split(":")[2]);
- connection.open("localhost", port);
- connection.connect("", "");
- connection.subscribe("/topic/foobar", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- connection.disconnect();
- Thread.sleep(1000);
-
- if (failed.get()) {
- fail("Received NullPointerException");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java
deleted file mode 100644
index 188b48c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.DefaultTestAppender;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- *
- */
-
-public class AMQ3625Test {
-
- protected BrokerService broker1;
- protected BrokerService broker2;
-
- protected AtomicBoolean authenticationFailed = new AtomicBoolean(false);
- protected AtomicBoolean gotNPE = new AtomicBoolean(false);
-
- protected String java_security_auth_login_config = "java.security.auth.login.config";
- protected String xbean = "xbean:";
- protected String base = "src/test/resources/org/apache/activemq/bugs/amq3625";
- protected String conf = "conf";
- protected String keys = "keys";
- protected String JaasStompSSLBroker1_xml = "JaasStompSSLBroker1.xml";
- protected String JaasStompSSLBroker2_xml = "JaasStompSSLBroker2.xml";
-
- protected String oldLoginConf = null;
-
- @Before
- public void before() throws Exception {
- if (System.getProperty(java_security_auth_login_config) != null) {
- oldLoginConf = System.getProperty(java_security_auth_login_config);
- }
- System.setProperty(java_security_auth_login_config, base + "/" + conf + "/" + "login.config");
- broker1 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker1_xml);
- broker2 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker2_xml);
-
- broker1.start();
- broker1.waitUntilStarted();
- broker2.start();
- broker2.waitUntilStarted();
- }
-
- @After
- public void after() throws Exception {
- broker1.stop();
- broker2.stop();
-
- if (oldLoginConf != null) {
- System.setProperty(java_security_auth_login_config, oldLoginConf);
- }
- }
-
- @Test
- public void go() throws Exception {
- Appender appender = new DefaultTestAppender() {
- @Override
- public void doAppend(LoggingEvent event) {
- if (event.getThrowableInformation() != null) {
- Throwable t = event.getThrowableInformation().getThrowable();
- if (t instanceof SecurityException) {
- authenticationFailed.set(true);
- }
- if (t instanceof NullPointerException) {
- gotNPE.set(true);
- }
- }
- }
- };
- Logger.getRootLogger().addAppender(appender);
-
- String connectURI = broker1.getConnectorByName("openwire").getConnectUri().toString();
- connectURI = connectURI.replace("?needClientAuth=true", "");
- broker2.addNetworkConnector("static:(" + connectURI + ")").start();
-
- Thread.sleep(10 * 1000);
-
- Logger.getRootLogger().removeAppender(appender);
-
- assertTrue(authenticationFailed.get());
- assertFalse(gotNPE.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java
deleted file mode 100644
index c691c42..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3674Test {
-
- private static Logger LOG = LoggerFactory.getLogger(AMQ3674Test.class);
-
- private final static int deliveryMode = DeliveryMode.NON_PERSISTENT;
- private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ");
-
- private ActiveMQConnectionFactory factory;
- private BrokerService broker;
-
- @Test
- public void removeSubscription() throws Exception {
-
- final Connection producerConnection = factory.createConnection();
- producerConnection.start();
- final Connection consumerConnection = factory.createConnection();
-
- consumerConnection.setClientID("subscriber1");
- Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- TopicSubscriber activeConsumer = consumerMQSession.createDurableSubscriber(destination, "myTopic");
- consumerConnection.start();
-
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- final BrokerView brokerView = broker.getAdminView();
-
- assertEquals(1, brokerView.getDurableTopicSubscribers().length);
-
- LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length);
-
- try {
- brokerView.destroyDurableSubscriber("subscriber1", "myTopic");
- fail("Expected Exception for Durable consumer is in use");
- }
- catch (Exception e) {
- LOG.info("Received expected exception: " + e.getMessage());
- }
-
- LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length);
-
- assertEquals(1, brokerView.getDurableTopicSubscribers().length);
-
- activeConsumer.close();
- consumerConnection.stop();
-
- assertTrue("The subscription should be in the inactive state.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return brokerView.getInactiveDurableTopicSubscribers().length == 1;
- }
- }));
-
- try {
- brokerView.destroyDurableSubscriber("subscriber1", "myTopic");
- }
- finally {
- producer.close();
- producerConnection.close();
- }
- }
-
- @Before
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(true);
- broker.setDeleteAllMessagesOnStartup(true);
- TransportConnector connector = broker.addConnector("tcp://localhost:0");
- broker.start();
-
- factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString());
- factory.setAlwaysSyncSend(true);
- factory.setDispatchAsync(false);
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java
deleted file mode 100644
index 6815923..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.*;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.TopicViewMBean;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3675Test {
-
- private static Logger LOG = LoggerFactory.getLogger(AMQ3675Test.class);
-
- private final static int deliveryMode = DeliveryMode.NON_PERSISTENT;
- private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ");
-
- private ActiveMQConnectionFactory factory;
- private BrokerService broker;
-
- public TopicViewMBean getTopicView() throws Exception {
- ObjectName destinationName = broker.getAdminView().getTopics()[0];
- TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
- return topicView;
- }
-
- @Test
- public void countConsumers() throws Exception {
-
- final Connection producerConnection = factory.createConnection();
- producerConnection.start();
- final Connection consumerConnection = factory.createConnection();
-
- consumerConnection.setClientID("subscriber1");
- Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- TopicSubscriber consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic");
- consumerConnection.start();
-
- Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- final BrokerView brokerView = broker.getAdminView();
- final TopicViewMBean topicView = getTopicView();
-
- assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return topicView.getConsumerCount() == 1;
- }
- }));
-
- consumer.close();
-
- assertTrue("Durable consumer should now be inactive.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokerView.getInactiveDurableTopicSubscribers().length == 1;
- }
- }));
-
- try {
- brokerView.removeTopic(destination.getTopicName());
- }
- catch (Exception e1) {
- fail("Unable to remove destination:" + destination.getPhysicalName());
- }
-
- assertTrue("Should have no topics on the broker", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokerView.getTopics().length == 0;
- }
- }));
-
- try {
- brokerView.destroyDurableSubscriber("subscriber1", "myTopic");
- }
- catch (Exception e) {
- fail("Exception not expected when attempting to delete Durable consumer.");
- }
-
- assertTrue("Should be no durable consumers active or inactive.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokerView.getInactiveDurableTopicSubscribers().length == 0 && brokerView.getDurableTopicSubscribers().length == 0;
- }
- }));
-
- consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic");
-
- consumer.close();
-
- assertTrue("Should be one consumer on the Topic.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("Number of inactive consumers: " + brokerView.getInactiveDurableTopicSubscribers().length);
- return brokerView.getInactiveDurableTopicSubscribers().length == 1;
- }
- }));
-
- final TopicViewMBean recreatedTopicView = getTopicView();
-
- assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return recreatedTopicView.getConsumerCount() == 1;
- }
- }));
- }
-
- @Before
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(true);
- broker.setAdvisorySupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
- TransportConnector connector = broker.addConnector("tcp://localhost:0");
- broker.start();
-
- factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString());
- factory.setAlwaysSyncSend(true);
- factory.setDispatchAsync(false);
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
deleted file mode 100644
index 26bef7d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.ServerSocket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQTopicSubscriber;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class AMQ3678Test implements MessageListener {
-
- public int deliveryMode = DeliveryMode.NON_PERSISTENT;
-
- private BrokerService broker;
-
- AtomicInteger messagesSent = new AtomicInteger(0);
- AtomicInteger messagesReceived = new AtomicInteger(0);
-
- ActiveMQTopic destination = new ActiveMQTopic("XYZ");
-
- int port;
- int jmxport;
-
- final CountDownLatch latch = new CountDownLatch(2);
-
- public static void main(String[] args) throws Exception {
-
- }
-
- public static int findFreePort() throws IOException {
- ServerSocket socket = null;
-
- try {
- // 0 is open a socket on any free port
- socket = new ServerSocket(0);
- return socket.getLocalPort();
- }
- finally {
- if (socket != null) {
- socket.close();
- }
- }
- }
-
- @Test
- public void countConsumers() throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + port);
- factory.setAlwaysSyncSend(true);
- factory.setDispatchAsync(false);
-
- final Connection producerConnection = factory.createConnection();
- producerConnection.start();
-
- final Connection consumerConnection = factory.createConnection();
-
- consumerConnection.setClientID("subscriber1");
- Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic?consumer.prefetchSize=1");
-
- activeConsumer.setMessageListener(this);
-
- consumerConnection.start();
-
- final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageProducer producer = producerSession.createProducer(destination);
- producer.setDeliveryMode(deliveryMode);
-
- Thread t = new Thread(new Runnable() {
-
- private boolean done = false;
-
- @Override
- public void run() {
- while (!done) {
- if (messagesSent.get() == 50) {
- try {
- broker.getAdminView().removeTopic(destination.getTopicName());
- }
- catch (Exception e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- System.err.flush();
- fail("Unable to remove destination:" + destination.getPhysicalName());
- }
- }
-
- try {
- producer.send(producerSession.createTextMessage());
- int val = messagesSent.incrementAndGet();
-
- System.out.println("sent message (" + val + ")");
- System.out.flush();
-
- if (val == 100) {
- done = true;
- latch.countDown();
- producer.close();
- producerSession.close();
-
- }
- }
- catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- });
-
- t.start();
-
- try {
- if (!latch.await(10, TimeUnit.SECONDS)) {
- fail("did not receive all the messages");
- }
- }
- catch (InterruptedException e) {
- // TODO Auto-generated catch block
- fail("did not receive all the messages, exception waiting for latch");
- e.printStackTrace();
- }
-
- //
-
- }
-
- @Before
- public void setUp() throws Exception {
-
- try {
- port = findFreePort();
- jmxport = findFreePort();
- }
- catch (Exception e) {
- fail("Unable to obtain a free port on which to start the broker");
- }
-
- System.out.println("Starting broker");
- System.out.flush();
- broker = new BrokerService();
- broker.setPersistent(false);
- ManagementContext ctx = new ManagementContext(ManagementFactory.getPlatformMBeanServer());
- ctx.setConnectorPort(jmxport);
- broker.setManagementContext(ctx);
- broker.setUseJmx(true);
- // broker.setAdvisorySupport(false);
- // broker.setDeleteAllMessagesOnStartup(true);
-
- broker.addConnector("tcp://localhost:" + port).setName("Default");
- broker.start();
-
- System.out.println("End of Broker Setup");
- System.out.flush();
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- @Override
- public void onMessage(Message message) {
- try {
- message.acknowledge();
- int val = messagesReceived.incrementAndGet();
- System.out.println("received message (" + val + ")");
- System.out.flush();
- if (messagesReceived.get() == 100) {
- latch.countDown();
- }
- }
- catch (JMSException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
deleted file mode 100644
index d0f6692..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Random;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3732Test {
-
- private static Logger LOG = LoggerFactory.getLogger(AMQ3732Test.class);
-
- private ActiveMQConnectionFactory connectionFactory;
- private Connection connection;
- private Session session;
- private BrokerService broker;
- private String connectionUri;
-
- private final Random pause = new Random();
- private final long NUM_MESSAGES = 25000;
- private final AtomicLong totalConsumed = new AtomicLong();
-
- @Before
- public void startBroker() throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.addConnector("tcp://0.0.0.0:0");
- broker.start();
- broker.waitUntilStarted();
-
- connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
-
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- connectionFactory.getPrefetchPolicy().setAll(0);
- }
-
- @After
- public void stopBroker() throws Exception {
- connection.close();
-
- broker.stop();
- broker.waitUntilStopped();
- }
-
- @Test(timeout = 1200000)
- public void testInterruptionAffects() throws Exception {
-
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
-
- Queue queue = session.createQueue("AMQ3732Test");
-
- final LinkedBlockingQueue<Message> workQueue = new LinkedBlockingQueue<>();
-
- final MessageConsumer consumer1 = session.createConsumer(queue);
- final MessageConsumer consumer2 = session.createConsumer(queue);
- final MessageProducer producer = session.createProducer(queue);
-
- Thread consumer1Thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- while (totalConsumed.get() < NUM_MESSAGES) {
- Message message = consumer1.receiveNoWait();
- if (message != null) {
- workQueue.add(message);
- }
- }
- }
- catch (Exception e) {
- LOG.error("Caught an unexpected error: ", e);
- }
- }
- });
- consumer1Thread.start();
-
- Thread consumer2Thread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- while (totalConsumed.get() < NUM_MESSAGES) {
- Message message = consumer2.receive(50);
- if (message != null) {
- workQueue.add(message);
- }
- }
- }
- catch (Exception e) {
- LOG.error("Caught an unexpected error: ", e);
- }
- }
- });
- consumer2Thread.start();
-
- Thread producerThread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- for (int i = 0; i < NUM_MESSAGES; ++i) {
- producer.send(session.createTextMessage("TEST"));
- TimeUnit.MILLISECONDS.sleep(pause.nextInt(10));
- }
- }
- catch (Exception e) {
- LOG.error("Caught an unexpected error: ", e);
- }
- }
- });
- producerThread.start();
-
- Thread ackingThread = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- while (totalConsumed.get() < NUM_MESSAGES) {
- Message message = workQueue.take();
- message.acknowledge();
- totalConsumed.incrementAndGet();
- if ((totalConsumed.get() % 100) == 0) {
- LOG.info("Consumed " + totalConsumed.get() + " messages so far.");
- }
- }
- }
- catch (Exception e) {
- LOG.error("Caught an unexpected error: ", e);
- }
- }
- });
- ackingThread.start();
-
- producerThread.join();
- consumer1Thread.join();
- consumer2Thread.join();
- ackingThread.join();
-
- assertEquals(NUM_MESSAGES, totalConsumed.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
deleted file mode 100644
index fa354c9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.util.LoggingBrokerPlugin;
-import org.apache.activemq.util.DefaultTestAppender;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Logger;
-import org.apache.log4j.spi.LoggingEvent;
-
-public class AMQ3779Test extends AutoFailTestSupport {
-
- private static final Logger logger = Logger.getLogger(AMQ3779Test.class);
- private static final String qName = "QNameToFind";
-
- public void testLogPerDest() throws Exception {
-
- final AtomicBoolean ok = new AtomicBoolean(false);
- Appender appender = new DefaultTestAppender() {
- @Override
- public void doAppend(LoggingEvent event) {
- if (event.getLoggerName().toString().contains(qName)) {
- ok.set(true);
- }
- }
- };
- Logger.getRootLogger().addAppender(appender);
-
- try {
-
- BrokerService broker = new BrokerService();
- LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin();
- loggingBrokerPlugin.setPerDestinationLogger(true);
- loggingBrokerPlugin.setLogAll(true);
- broker.setPlugins(new LoggingBrokerPlugin[]{loggingBrokerPlugin});
- broker.start();
-
- Connection connection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
- messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- connection.start();
-
- messageProducer.send(session.createTextMessage("Hi"));
- connection.close();
-
- assertTrue("got expected log message", ok.get());
- }
- finally {
- logger.removeAppender(appender);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
deleted file mode 100644
index 4824855..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3841Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ3841Test.class);
- private final static int maxFileLength = 1024 * 1024 * 32;
- private final static String destinationName = "TEST.QUEUE";
- BrokerService broker;
-
- @Before
- public void setUp() throws Exception {
- prepareBrokerWithMultiStore(true);
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setUseJmx(true);
- broker.setBrokerName("localhost");
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- @Test
- public void testRestartAfterQueueDelete() throws Exception {
-
- // Ensure we have an Admin View.
- assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return (broker.getAdminView()) != null;
- }
- }));
-
- broker.getAdminView().addQueue(destinationName);
-
- assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
-
- broker.getAdminView().removeQueue(destinationName);
-
- broker.stop();
- broker.waitUntilStopped();
-
- prepareBrokerWithMultiStore(false);
- broker.start();
-
- broker.getAdminView().addQueue(destinationName);
- assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
-
- }
-
- protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- kaha.setJournalMaxFileLength(maxFileLength);
- kaha.setCleanupInterval(5000);
- if (delete) {
- kaha.deleteAllMessages();
- }
- return kaha;
- }
-
- public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
-
- MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
- if (deleteAllMessages) {
- multiKahaDBPersistenceAdapter.deleteAllMessages();
- }
- ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
-
- FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
- template.setPersistenceAdapter(createStore(deleteAllMessages));
- template.setPerDestination(true);
- adapters.add(template);
-
- multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
- broker = createBroker(multiKahaDBPersistenceAdapter);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java
deleted file mode 100644
index 071897c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3879Test.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3879Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ3841Test.class);
- private BrokerService broker;
-
- private ActiveMQConnectionFactory factory;
-
- @Before
- public void setUp() throws Exception {
- broker = createBroker();
- broker.start();
- broker.waitUntilStarted();
-
- factory = new ActiveMQConnectionFactory("vm://localhost");
- factory.setAlwaysSyncSend(true);
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.setBrokerName("localhost");
- broker.addConnector("vm://localhost");
- return broker;
- }
-
- @Test
- public void testConnectionDletesWrongTempDests() throws Exception {
-
- final Connection connection1 = factory.createConnection();
- final Connection connection2 = factory.createConnection();
-
- Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Destination tempDestAdvisory = AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC;
-
- MessageConsumer advisoryConsumer = session1.createConsumer(tempDestAdvisory);
- connection1.start();
-
- Destination tempQueue = session2.createTemporaryQueue();
- MessageProducer tempProducer = session2.createProducer(tempQueue);
-
- assertNotNull(advisoryConsumer.receive(5000));
-
- Thread t = new Thread(new Runnable() {
-
- @Override
- public void run() {
- try {
- Thread.sleep(20);
- connection1.close();
- }
- catch (Exception e) {
- }
- }
- });
-
- t.start();
-
- for (int i = 0; i < 256; ++i) {
- Message msg = session2.createTextMessage("Temp Data");
- tempProducer.send(msg);
- Thread.sleep(2);
- }
-
- t.join();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java
deleted file mode 100644
index c7b4bdb..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3903Test.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3903Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3903Test.class);
-
- private static final String bindAddress = "tcp://0.0.0.0:0";
- private BrokerService broker;
- private ActiveMQConnectionFactory cf;
-
- private static final int MESSAGE_COUNT = 100;
-
- @Before
- public void setUp() throws Exception {
- broker = this.createBroker();
- String address = broker.getTransportConnectors().get(0).getPublishableConnectString();
- broker.start();
- broker.waitUntilStarted();
-
- cf = new ActiveMQConnectionFactory(address);
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test
- public void testAdvisoryForFastGenericProducer() throws Exception {
- doTestAdvisoryForFastProducer(true);
- }
-
- @Test
- public void testAdvisoryForFastDedicatedProducer() throws Exception {
- doTestAdvisoryForFastProducer(false);
- }
-
- public void doTestAdvisoryForFastProducer(boolean genericProducer) throws Exception {
-
- Connection connection = cf.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final TemporaryQueue queue = session.createTemporaryQueue();
-
- final Topic advisoryTopic = AdvisorySupport.getFastProducerAdvisoryTopic((ActiveMQDestination) queue);
- final Topic advisoryWhenFullTopic = AdvisorySupport.getFullAdvisoryTopic((ActiveMQDestination) queue);
-
- MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
- MessageConsumer advisoryWhenFullConsumer = session.createConsumer(advisoryWhenFullTopic);
-
- MessageProducer producer = session.createProducer(genericProducer ? null : queue);
-
- try {
- // send lots of messages to the tempQueue
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes(new byte[1024]);
- if (genericProducer) {
- producer.send(queue, m, DeliveryMode.PERSISTENT, 4, 0);
- }
- else {
- producer.send(m);
- }
- }
- }
- catch (ResourceAllocationException expectedOnLimitReachedAfterFastAdvisory) {
- }
-
- // check one advisory message has produced on the advisoryTopic
- Message advCmsg = advisoryConsumer.receive(4000);
- assertNotNull(advCmsg);
-
- advCmsg = advisoryWhenFullConsumer.receive(4000);
- assertNotNull(advCmsg);
-
- connection.close();
- LOG.debug("Connection closed, destinations should now become inactive.");
- }
-
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setPersistent(false);
- answer.setUseJmx(false);
-
- PolicyEntry entry = new PolicyEntry();
- entry.setAdvisoryForFastProducers(true);
- entry.setAdvisoryWhenFull(true);
- entry.setMemoryLimit(10000);
- PolicyMap map = new PolicyMap();
- map.setDefaultEntry(entry);
-
- answer.setDestinationPolicy(map);
- answer.addConnector(bindAddress);
-
- answer.getSystemUsage().setSendFailIfNoSpace(true);
-
- return answer;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java
deleted file mode 100644
index f29ad94..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3932Test.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3932Test {
-
- static final Logger LOG = LoggerFactory.getLogger(AMQ3932Test.class);
- private Connection connection;
- private BrokerService broker;
-
- @Before
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setPersistent(false);
- broker.setUseJmx(false);
- TransportConnector tcpConnector = broker.addConnector("tcp://localhost:0");
- broker.start();
-
- ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(" + tcpConnector.getPublishableConnectString() + ")?jms.prefetchPolicy.queuePrefetch=0");
- connection = factory.createConnection();
- connection.start();
- }
-
- @After
- public void tearDown() throws Exception {
- connection.close();
-
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
- }
- }
-
- @Test
- public void testPlainReceiveBlocks() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName()));
-
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
-
- final CountDownLatch done = new CountDownLatch(1);
- final CountDownLatch started = new CountDownLatch(1);
- ExecutorService executor = Executors.newSingleThreadExecutor();
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- started.countDown();
- LOG.info("Entering into a Sync receive call");
- consumer.receive();
- }
- catch (JMSException e) {
- }
- done.countDown();
- }
- });
-
- assertTrue(started.await(10, TimeUnit.SECONDS));
- assertFalse(done.await(20, TimeUnit.SECONDS));
- }
-
- @Test
- public void testHungReceiveNoWait() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName()));
-
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
-
- final CountDownLatch done = new CountDownLatch(1);
- final CountDownLatch started = new CountDownLatch(1);
- ExecutorService executor = Executors.newSingleThreadExecutor();
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- started.countDown();
- LOG.info("Entering into a Sync receiveNoWait call");
- consumer.receiveNoWait();
- }
- catch (JMSException e) {
- }
- done.countDown();
- }
- });
-
- assertTrue(started.await(10, TimeUnit.SECONDS));
- assertTrue(done.await(20, TimeUnit.SECONDS));
- }
-
- @Test
- public void testHungReceiveTimed() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = session.createConsumer(session.createQueue(getClass().getName()));
-
- broker.stop();
- broker.waitUntilStopped();
- broker = null;
-
- final CountDownLatch done = new CountDownLatch(1);
- final CountDownLatch started = new CountDownLatch(1);
- ExecutorService executor = Executors.newSingleThreadExecutor();
-
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- started.countDown();
- LOG.info("Entering into a timed Sync receive call");
- consumer.receive(10);
- }
- catch (JMSException e) {
- }
- done.countDown();
- }
- });
-
- assertTrue(started.await(10, TimeUnit.SECONDS));
- assertTrue(done.await(20, TimeUnit.SECONDS));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java
deleted file mode 100644
index 3287085..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public class AMQ3934Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3934Test.class);
- private static BrokerService brokerService;
- private static String TEST_QUEUE = "testQueue";
- private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
- private static String BROKER_ADDRESS = "tcp://localhost:0";
-
- private ActiveMQConnectionFactory connectionFactory;
- private String connectionUri;
- private String messageID;
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- brokerService.start();
- brokerService.waitUntilStarted();
-
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- sendMessage();
- }
-
- public void sendMessage() throws Exception {
- final Connection conn = connectionFactory.createConnection();
- try {
- conn.start();
- final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Destination queue = session.createQueue(TEST_QUEUE);
- final Message toSend = session.createMessage();
- final MessageProducer producer = session.createProducer(queue);
- producer.send(queue, toSend);
- }
- finally {
- conn.close();
- }
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void getMessage() throws Exception {
- final QueueViewMBean queueView = getProxyToQueueViewMBean();
- final CompositeData messages[] = queueView.browse();
- messageID = (String) messages[0].get("JMSMessageID");
- assertNotNull(messageID);
- assertNotNull(queueView.getMessage(messageID));
- LOG.debug("Attempting to remove message ID: " + messageID);
- queueView.removeMessage(messageID);
- assertNull(queueView.getMessage(messageID));
- }
-
- private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, NullPointerException, JMSException {
- final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
- final QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java
deleted file mode 100644
index c39cabf..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3961Test.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.jms.ConnectionConsumer;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.ServerSession;
-import javax.jms.ServerSessionPool;
-import javax.jms.Session;
-import javax.jms.TopicConnection;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3961Test {
-
- private static BrokerService brokerService;
- private static String BROKER_ADDRESS = "tcp://localhost:0";
-
- private ActiveMQConnectionFactory connectionFactory;
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- brokerService.setDeleteAllMessagesOnStartup(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- brokerService.start();
- brokerService.waitUntilStarted();
-
- connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- public class TestServerSessionPool implements ServerSessionPool {
-
- private final TopicConnection connection;
-
- public TestServerSessionPool(final TopicConnection connection) {
- this.connection = connection;
- }
-
- @Override
- public ServerSession getServerSession() throws JMSException {
- final TopicSession topicSession = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
- return new TestServerSession(topicSession);
- }
- }
-
- public class TestServerSession implements ServerSession, MessageListener {
-
- private final TopicSession session;
-
- public TestServerSession(final TopicSession session) throws JMSException {
- this.session = session;
- session.setMessageListener(this);
- }
-
- @Override
- public Session getSession() throws JMSException {
- return session;
- }
-
- @Override
- public void start() throws JMSException {
- session.run();
- }
-
- @Override
- public void onMessage(final Message message) {
- synchronized (processedSessions) {
- processedSessions.add(this);
- }
- }
- }
-
- public static final int MESSAGE_COUNT = 16;
- private final List<TestServerSession> processedSessions = new LinkedList<>();
- private final List<TestServerSession> committedSessions = new LinkedList<>();
-
- @Test
- public void testPrefetchInDurableSubscription() throws Exception {
- final ActiveMQTopic topic = new ActiveMQTopic("TestTopic");
-
- final TopicConnection initialSubConnection = connectionFactory.createTopicConnection();
- initialSubConnection.setClientID("TestClient");
- initialSubConnection.start();
- final TopicSession initialSubSession = initialSubConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
- final TopicSubscriber initialSubscriber = initialSubSession.createDurableSubscriber(topic, "TestSubscriber");
-
- initialSubscriber.close();
- initialSubSession.close();
- initialSubConnection.close();
-
- final TopicConnection publisherConnection = connectionFactory.createTopicConnection();
- publisherConnection.start();
- final TopicSession publisherSession = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- final TopicPublisher publisher = publisherSession.createPublisher(topic);
- for (int i = 1; i <= MESSAGE_COUNT; i++) {
- final Message msg = publisherSession.createTextMessage("Message #" + i);
- publisher.publish(msg);
- }
- publisher.close();
- publisherSession.close();
- publisherConnection.close();
-
- final TopicConnection connection = connectionFactory.createTopicConnection();
- connection.setClientID("TestClient");
- connection.start();
- final TestServerSessionPool pool = new TestServerSessionPool(connection);
- final ConnectionConsumer connectionConsumer = connection.createDurableConnectionConsumer(topic, "TestSubscriber", null, pool, 1);
- while (true) {
- int lastMsgCount = 0;
- int msgCount = 0;
- do {
- lastMsgCount = msgCount;
- Thread.sleep(200L);
- synchronized (processedSessions) {
- msgCount = processedSessions.size();
- }
- } while (lastMsgCount < msgCount);
-
- if (lastMsgCount == 0) {
- break;
- }
-
- final LinkedList<TestServerSession> collected;
- synchronized (processedSessions) {
- collected = new LinkedList<>(processedSessions);
- processedSessions.clear();
- }
-
- final Iterator<TestServerSession> sessions = collected.iterator();
- while (sessions.hasNext()) {
- final TestServerSession session = sessions.next();
- committedSessions.add(session);
- session.getSession().commit();
- session.getSession().close();
- }
- }
-
- connectionConsumer.close();
- final TopicSession finalSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- finalSession.unsubscribe("TestSubscriber");
- finalSession.close();
- connection.close();
- assertEquals(MESSAGE_COUNT, committedSessions.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java
deleted file mode 100644
index 4fe8ba1..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3992Test.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3992Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3992Test.class);
- private static BrokerService brokerService;
- private static String BROKER_ADDRESS = "tcp://localhost:0";
-
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.setUseJmx(true);
- brokerService.setDeleteAllMessagesOnStartup(true);
- connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
- brokerService.start();
- brokerService.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- connectionFactory.getPrefetchPolicy().setAll(0);
-
- Connection connection = connectionFactory.createConnection();
- connection.setClientID(getClass().getName());
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic("DurableTopic");
-
- MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub");
-
- BrokerView view = brokerService.getAdminView();
- view.getDurableTopicSubscribers();
-
- ObjectName subName = view.getDurableTopicSubscribers()[0];
-
- DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
-
- assertEquals(0, sub.getEnqueueCounter());
-
- LOG.info("Enqueue counter for sub before pull requests: " + sub.getEnqueueCounter());
-
- // Trigger some pull Timeouts.
- consumer.receive(500);
- consumer.receive(500);
- consumer.receive(500);
- consumer.receive(500);
- consumer.receive(500);
-
- // Let them all timeout.
- Thread.sleep(600);
-
- LOG.info("Enqueue counter for sub after pull requests: " + sub.getEnqueueCounter());
- assertEquals(0, sub.getEnqueueCounter());
-
- consumer.close();
- session.close();
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
deleted file mode 100644
index 8272aef..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.TopicRegion;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.SubscriptionKey;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ4062Test {
-
- private BrokerService service;
- private PolicyEntry policy;
- private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions;
-
- private static final int PREFETCH_SIZE_5 = 5;
- private String connectionUri;
-
- @Before
- public void startBroker() throws IOException, Exception {
- service = new BrokerService();
- service.setPersistent(true);
- service.setDeleteAllMessagesOnStartup(true);
- service.setUseJmx(false);
-
- KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter();
- File dataFile = new File("createData");
- pa.setDirectory(dataFile);
- pa.setJournalMaxFileLength(1024 * 1024 * 32);
-
- service.setPersistenceAdapter(pa);
-
- policy = new PolicyEntry();
- policy.setTopic(">");
- policy.setDurableTopicPrefetch(PREFETCH_SIZE_5);
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
-
- service.setDestinationPolicy(pMap);
-
- service.addConnector("tcp://localhost:0");
-
- service.start();
- service.waitUntilStarted();
-
- connectionUri = service.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- public void restartBroker() throws IOException, Exception {
- service = new BrokerService();
- service.setPersistent(true);
- service.setUseJmx(false);
- service.setKeepDurableSubsActive(false);
-
- KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter();
- File dataFile = new File("createData");
- pa.setDirectory(dataFile);
- pa.setJournalMaxFileLength(1024 * 1024 * 32);
-
- service.setPersistenceAdapter(pa);
-
- policy = new PolicyEntry();
- policy.setTopic(">");
- policy.setDurableTopicPrefetch(PREFETCH_SIZE_5);
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
-
- service.setDestinationPolicy(pMap);
- service.addConnector("tcp://localhost:0");
- service.start();
- service.waitUntilStarted();
-
- connectionUri = service.getTransportConnectors().get(0).getPublishableConnectString();
- }
-
- @After
- public void stopBroker() throws Exception {
- service.stop();
- service.waitUntilStopped();
- service = null;
- }
-
- @Test
- public void testDirableSubPrefetchRecovered() throws Exception {
-
- PrefetchConsumer consumer = new PrefetchConsumer(true, connectionUri);
- consumer.receive();
- durableSubscriptions = getDurableSubscriptions();
- ConsumerInfo info = getConsumerInfo(durableSubscriptions);
-
- //check if the prefetchSize equals to the size we set in the PolicyEntry
- assertEquals(PREFETCH_SIZE_5, info.getPrefetchSize());
-
- consumer.a.countDown();
- Producer p = new Producer(connectionUri);
- p.send();
- p = null;
-
- service.stop();
- service.waitUntilStopped();
- durableSubscriptions = null;
-
- consumer = null;
- stopBroker();
-
- restartBroker();
-
- getDurableSubscriptions();
- info = null;
- info = getConsumerInfo(durableSubscriptions);
-
- //check if the prefetchSize equals to 0 after persistent storage recovered
- //assertEquals(0, info.getPrefetchSize());
-
- consumer = new PrefetchConsumer(false, connectionUri);
- consumer.receive();
- consumer.a.countDown();
-
- info = null;
- info = getConsumerInfo(durableSubscriptions);
-
- //check if the prefetchSize is the default size for durable consumer and the PolicyEntry
- //we set earlier take no effect
- //assertEquals(100, info.getPrefetchSize());
- //info.getPrefetchSize() is 100,it should be 5,because I set the PolicyEntry as follows,
- //policy.setDurableTopicPrefetch(PREFETCH_SIZE_5);
- assertEquals(5, info.getPrefetchSize());
- }
-
- @SuppressWarnings("unchecked")
- private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException {
- if (durableSubscriptions != null)
- return durableSubscriptions;
- RegionBroker regionBroker = (RegionBroker) service.getRegionBroker();
- TopicRegion region = (TopicRegion) regionBroker.getTopicRegion();
- Field field = TopicRegion.class.getDeclaredField("durableSubscriptions");
- field.setAccessible(true);
- durableSubscriptions = (ConcurrentMap<SubscriptionKey, DurableTopicSubscription>) field.get(region);
- return durableSubscriptions;
- }
-
- private ConsumerInfo getConsumerInfo(ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) {
- ConsumerInfo info = null;
- for (Iterator<DurableTopicSubscription> it = durableSubscriptions.values().iterator(); it.hasNext(); ) {
- Subscription sub = it.next();
- info = sub.getConsumerInfo();
- if (info.getSubscriptionName().equals(PrefetchConsumer.SUBSCRIPTION_NAME)) {
- return info;
- }
- }
- return null;
- }
-
- public class PrefetchConsumer implements MessageListener {
-
- public static final String SUBSCRIPTION_NAME = "A_NAME_ABC_DEF";
- private final String user = ActiveMQConnection.DEFAULT_USER;
- private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
- private final String uri;
- private boolean transacted;
- ActiveMQConnection connection;
- Session session;
- MessageConsumer consumer;
- private boolean needAck = false;
- CountDownLatch a = new CountDownLatch(1);
-
- public PrefetchConsumer(boolean needAck, String uri) {
- this.needAck = needAck;
- this.uri = uri;
- }
-
- public void receive() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri);
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setClientID("3");
- connection.start();
-
- session = connection.createSession(transacted, Session.CLIENT_ACKNOWLEDGE);
- Destination destination = session.createTopic("topic2");
- consumer = session.createDurableSubscriber((Topic) destination, SUBSCRIPTION_NAME);
- consumer.setMessageListener(this);
- }
-
- @Override
- public void onMessage(Message message) {
- try {
- a.await();
- }
- catch (InterruptedException e1) {
- }
- if (needAck) {
- try {
- message.acknowledge();
- consumer.close();
- session.close();
- connection.close();
- }
- catch (JMSException e) {
- }
- }
- }
- }
-
- public class Producer {
-
- protected final String user = ActiveMQConnection.DEFAULT_USER;
-
- private final String password = ActiveMQConnection.DEFAULT_PASSWORD;
- private final String uri;
- private boolean transacted;
-
- public Producer(String uri) {
- this.uri = uri;
- }
-
- public void send() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, uri);
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
-
- ActiveMQSession session = (ActiveMQSession) connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic("topic2");
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < 100; i++) {
- TextMessage om = session.createTextMessage("hello from producer");
- producer.send(om);
- }
- producer.close();
- session.close();
- connection.close();
- }
- }
-}