You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:27 UTC
[18/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java
deleted file mode 100644
index 657d7a2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4887Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4887Test.class);
- private static final Integer ITERATIONS = 10;
-
- @Rule
- public TestName name = new TestName();
-
- @Test
- public void testBytesMessageSetPropertyBeforeCopy() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- doTestBytesMessageSetPropertyBeforeCopy(connection);
- }
-
- @Test
- public void testBytesMessageSetPropertyBeforeCopyCompressed() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- connectionFactory.setUseCompression(true);
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- doTestBytesMessageSetPropertyBeforeCopy(connection);
- }
-
- public void doTestBytesMessageSetPropertyBeforeCopy(Connection connection) throws Exception {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(name.toString());
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- BytesMessage message = session.createBytesMessage();
-
- for (int i = 0; i < ITERATIONS; i++) {
-
- long sendTime = System.currentTimeMillis();
- message.setLongProperty("sendTime", sendTime);
- producer.send(message);
-
- LOG.debug("Receiving message " + i);
- Message receivedMessage = consumer.receive(5000);
- assertNotNull("On message " + i, receivedMessage);
- assertTrue("On message " + i, receivedMessage instanceof BytesMessage);
-
- BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
-
- int numElements = 0;
- try {
- while (true) {
- receivedBytesMessage.readBoolean();
- numElements++;
- }
- }
- catch (Exception ex) {
- }
-
- LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements);
- assertEquals(i, numElements);
-
- long receivedSendTime = receivedBytesMessage.getLongProperty("sendTime");
- assertEquals("On message " + i, receivedSendTime, sendTime);
-
- // Add a new bool value on each iteration.
- message.writeBoolean(true);
- }
- }
-
- @Test
- public void testStreamMessageSetPropertyBeforeCopy() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- doTestStreamMessageSetPropertyBeforeCopy(connection);
- }
-
- @Test
- public void testStreamMessageSetPropertyBeforeCopyCompressed() throws Exception {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- connectionFactory.setUseCompression(true);
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.start();
- doTestStreamMessageSetPropertyBeforeCopy(connection);
- }
-
- public void doTestStreamMessageSetPropertyBeforeCopy(Connection connection) throws Exception {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue(name.toString());
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer = session.createProducer(destination);
-
- StreamMessage message = session.createStreamMessage();
-
- for (int i = 0; i < ITERATIONS; i++) {
-
- long sendTime = System.currentTimeMillis();
- message.setLongProperty("sendTime", sendTime);
- producer.send(message);
-
- LOG.debug("Receiving message " + i);
- Message receivedMessage = consumer.receive(5000);
- assertNotNull("On message " + i, receivedMessage);
- assertTrue("On message " + i, receivedMessage instanceof StreamMessage);
-
- StreamMessage receivedStreamMessage = (StreamMessage) receivedMessage;
-
- int numElements = 0;
- try {
- while (true) {
- receivedStreamMessage.readBoolean();
- numElements++;
- }
- }
- catch (Exception ex) {
- }
-
- LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements);
- assertEquals(i, numElements);
-
- long receivedSendTime = receivedStreamMessage.getLongProperty("sendTime");
- assertEquals("On message " + i, receivedSendTime, sendTime);
-
- // Add a new bool value on each iteration.
- message.writeBoolean(true);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
deleted file mode 100644
index ba65ab7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.util.Map;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.ByteSequence;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4893Test {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4893Test.class);
-
- @Test
- public void testPropertiesInt() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setIntProperty("TestProp", 333);
- fakeUnmarshal(message);
- roundTripProperties(message);
- }
-
- @Test
- public void testPropertiesString() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setStringProperty("TestProp", "Value");
- fakeUnmarshal(message);
- roundTripProperties(message);
- }
-
- @Test
- public void testPropertiesObject() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setObjectProperty("TestProp", "Value");
- fakeUnmarshal(message);
- roundTripProperties(message);
- }
-
- @Test
- public void testPropertiesObjectNoMarshalling() throws Exception {
- ActiveMQObjectMessage message = new ActiveMQObjectMessage();
- message.setObjectProperty("TestProp", "Value");
- roundTripProperties(message);
- }
-
- private void roundTripProperties(ActiveMQObjectMessage message) throws IOException, JMSException {
- ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
- for (Map.Entry<String, Object> prop : message.getProperties().entrySet()) {
- LOG.debug("{} -> {}", prop.getKey(), prop.getValue().getClass());
- copy.setObjectProperty(prop.getKey(), prop.getValue());
- }
- }
-
- private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException {
- // we need to force the unmarshalled property field to be set so it
- // gives us a hawtbuffer for the string
- OpenWireFormat format = new OpenWireFormat();
- message.beforeMarshall(format);
- message.afterMarshall(format);
-
- ByteSequence seq = message.getMarshalledProperties();
- message.clearProperties();
- message.setMarshalledProperties(seq);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
deleted file mode 100644
index fe336eb..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
-import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-
-public class AMQ4899Test {
-
- protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class);
- private static final String QUEUE_NAME = "AMQ4899TestQueue";
- private static final String CONSUMER_QUEUE = "Consumer.Orders.VirtualOrders." + QUEUE_NAME;
- private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME;
-
- private static final Integer MESSAGE_LIMIT = 20;
- public static final String CONSUMER_A_SELECTOR = "Order < " + 10;
- public static String CONSUMER_B_SELECTOR = "Order >= " + 10;
- private CountDownLatch consumersStarted = new CountDownLatch(2);
- private CountDownLatch consumerAtoConsumeCount = new CountDownLatch(10);
- private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10);
-
- private BrokerService broker;
-
- @Before
- public void setUp() {
- setupBroker("broker://()/localhost?");
- }
-
- @After
- public void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- @Test(timeout = 60 * 1000)
- public void testVirtualTopicMultipleSelectors() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue consumerQueue = session.createQueue(CONSUMER_QUEUE);
-
- MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount);
- MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
- consumerA.setMessageListener(listenerA);
-
- MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount);
- MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
- consumerB.setMessageListener(listenerB);
-
- consumersStarted.await(10, TimeUnit.SECONDS);
- assertEquals("Not all consumers started in time", 0, consumersStarted.getCount());
-
- Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME);
- MessageProducer producer = session.createProducer(producerDestination);
- int messageIndex = 0;
- for (int i = 0; i < MESSAGE_LIMIT; i++) {
- if (i == 3) {
- LOG.debug("Stopping consumerA");
- consumerA.close();
- }
-
- if (i == 14) {
- LOG.debug("Stopping consumer B");
- consumerB.close();
- }
- String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString();
- TextMessage message = session.createTextMessage(messageText);
- message.setIntProperty("Order", i);
- LOG.debug("Sending message [{}]", messageText);
- producer.send(message);
- Thread.sleep(100);
- }
- Thread.sleep(1 * 1000);
-
- // restart consumerA
- LOG.debug("Restarting consumerA");
- consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
- consumerA.setMessageListener(listenerA);
-
- // restart consumerB
- LOG.debug("restarting consumerB");
- consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
- consumerB.setMessageListener(listenerB);
-
- consumerAtoConsumeCount.await(5, TimeUnit.SECONDS);
- consumerBtoConsumeCount.await(5, TimeUnit.SECONDS);
-
- LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount());
-
- assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount());
- assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount());
-
- connection.close();
- }
-
- /**
- * Setup broker with VirtualTopic configured
- */
- private void setupBroker(String uri) {
- try {
- broker = BrokerFactory.createBroker(uri);
-
- VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
- VirtualTopic virtualTopic = new VirtualTopic();
- virtualTopic.setName("VirtualOrders.>");
- virtualTopic.setSelectorAware(true);
- VirtualDestination[] virtualDestinations = {virtualTopic};
- interceptor.setVirtualDestinations(virtualDestinations);
- broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
-
- SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
- BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin};
- broker.setPlugins(updatedPlugins);
-
- broker.start();
- broker.waitUntilStarted();
- }
- catch (Exception e) {
- LOG.error("Failed creating broker", e);
- }
- }
-}
-
-class AMQ4899Listener implements MessageListener {
-
- Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class);
- CountDownLatch toConsume;
- String id;
-
- public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) {
- this.id = id;
- this.toConsume = toConsume;
- started.countDown();
- }
-
- @Override
- public void onMessage(Message message) {
- toConsume.countDown();
- try {
- if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
- LOG.debug("Listener {} received [{}]", id, textMessage.getText());
- }
- else {
- LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName());
- }
- }
- catch (JMSException e) {
- LOG.error("Unexpected JMSException in Listener " + id, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
deleted file mode 100644
index 4805873..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ4930Test extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class);
- final int messageCount = 150;
- final int messageSize = 1024 * 1024;
- final int maxBrowsePageSize = 50;
- final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG");
- BrokerService broker;
- ActiveMQConnectionFactory factory;
-
- protected void configureBroker() throws Exception {
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setAdvisorySupport(false);
- broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024);
-
- PolicyMap pMap = new PolicyMap();
- PolicyEntry policy = new PolicyEntry();
- // disable expriy processing as this will call browse in parallel
- policy.setExpireMessagesPeriod(0);
- policy.setMaxPageSize(maxBrowsePageSize);
- policy.setMaxBrowsePageSize(maxBrowsePageSize);
- pMap.setDefaultEntry(policy);
-
- broker.setDestinationPolicy(pMap);
- }
-
- public void testBrowsePendingNonPersistent() throws Exception {
- doTestBrowsePending(DeliveryMode.NON_PERSISTENT);
- }
-
- public void testBrowsePendingPersistent() throws Exception {
- doTestBrowsePending(DeliveryMode.PERSISTENT);
- }
-
- public void testWithStatsDisabled() throws Exception {
- ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().setEnabled(false);
- doTestBrowsePending(DeliveryMode.PERSISTENT);
- }
-
- public void doTestBrowsePending(int deliveryMode) throws Exception {
-
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(bigQueue);
- producer.setDeliveryMode(deliveryMode);
- BytesMessage bytesMessage = session.createBytesMessage();
- bytesMessage.writeBytes(new byte[messageSize]);
-
- for (int i = 0; i < messageCount; i++) {
- producer.send(bigQueue, bytesMessage);
- }
-
- final QueueViewMBean queueViewMBean = (QueueViewMBean) broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
-
- LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount());
-
- connection.close();
-
- assertFalse("Cache disabled on q", queueViewMBean.isCacheEnabled());
-
- // ensure repeated browse does now blow mem
-
- final Queue underTest = (Queue) ((RegionBroker) broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue);
-
- // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit
- Message[] browsed = underTest.browse();
- LOG.info("Browsed: " + browsed.length);
- assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
- browsed = underTest.browse();
- LOG.info("Browsed: " + browsed.length);
- assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
- Runtime.getRuntime().gc();
- long free = Runtime.getRuntime().freeMemory() / 1024;
- LOG.info("free at start of check: " + free);
- // check for memory growth
- for (int i = 0; i < 10; i++) {
- LOG.info("free: " + Runtime.getRuntime().freeMemory() / 1024);
- browsed = underTest.browse();
- LOG.info("Browsed: " + browsed.length);
- assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
- Runtime.getRuntime().gc();
- Runtime.getRuntime().gc();
- assertTrue("No growth: " + Runtime.getRuntime().freeMemory() / 1024 + " >= " + (free - (free * 0.2)), Runtime.getRuntime().freeMemory() / 1024 >= (free - (free * 0.2)));
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- broker = new BrokerService();
- broker.setBrokerName("thisOne");
- configureBroker();
- broker.start();
- factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
- factory.setWatchTopicAdvisories(false);
-
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (broker != null) {
- broker.stop();
- broker = null;
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
deleted file mode 100644
index 74d0817..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.XASession;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.ActiveMQXAConnection;
-import org.apache.activemq.ActiveMQXAConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerRestartTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.TransactionBroker;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
-import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.transport.failover.FailoverTransport;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test for AMQ-4950.
- * Simulates an error during XA prepare call.
- */
-public class AMQ4950Test extends BrokerRestartTestSupport {
-
- protected static final Logger LOG = LoggerFactory.getLogger(AMQ4950Test.class);
- protected static final String simulatedExceptionMessage = "Simulating error inside tx prepare().";
- public boolean prioritySupport = false;
- protected String connectionUri = null;
-
- @Override
- protected void configureBroker(BrokerService broker) throws Exception {
- broker.setDestinationPolicy(policyMap);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setUseJmx(false);
- connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-
- @Override
- public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
- getNext().prepareTransaction(context, xid);
- LOG.debug("BrokerPlugin.prepareTransaction() will throw an exception.");
- throw new XAException(simulatedExceptionMessage);
- }
-
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid,
- boolean onePhase) throws Exception {
- LOG.debug("BrokerPlugin.commitTransaction().");
- super.commitTransaction(context, xid, onePhase);
- }
- }});
- }
-
- /**
- * Creates XA transaction and invokes XA prepare().
- * Due to registered BrokerFilter prepare will be handled by broker
- * but then throw an exception.
- * Prior to fixing AMQ-4950, this resulted in a ClassCastException
- * in ConnectionStateTracker.PrepareReadonlyTransactionAction.onResponse()
- * causing the failover transport to reconnect and replay the XA prepare().
- */
- public void testXAPrepareFailure() throws Exception {
-
- assertNotNull(connectionUri);
- ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + connectionUri + ")");
- ActiveMQXAConnection xaConnection = (ActiveMQXAConnection) cf.createConnection();
- xaConnection.start();
- XASession session = xaConnection.createXASession();
- XAResource resource = session.getXAResource();
- Xid tid = createXid();
- resource.start(tid, XAResource.TMNOFLAGS);
-
- MessageProducer producer = session.createProducer(session.createQueue(this.getClass().getName()));
- Message message = session.createTextMessage("Sample Message");
- producer.send(message);
- resource.end(tid, XAResource.TMSUCCESS);
- try {
- LOG.debug("Calling XA prepare(), expecting an exception");
- int ret = resource.prepare(tid);
- if (XAResource.XA_OK == ret)
- resource.commit(tid, false);
- }
- catch (XAException xae) {
- LOG.info("Received excpected XAException: {}", xae.getMessage());
- LOG.info("Rolling back transaction {}", tid);
-
- // with bug AMQ-4950 the thrown error reads "Cannot call prepare now"
- // we check that we receive the original exception message as
- // thrown by the BrokerPlugin
- assertEquals(simulatedExceptionMessage, xae.getMessage());
- resource.rollback(tid);
- }
- // couple of assertions
- assertTransactionGoneFromBroker(tid);
- assertTransactionGoneFromConnection(broker.getBrokerName(), xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
- assertTransactionGoneFromFailoverState(xaConnection, tid);
-
- //cleanup
- producer.close();
- session.close();
- xaConnection.close();
- LOG.debug("testXAPrepareFailure() finished.");
- }
-
- public Xid createXid() throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream os = new DataOutputStream(baos);
- os.writeLong(++txGenerator);
- os.close();
- final byte[] bs = baos.toByteArray();
-
- return new Xid() {
- @Override
- public int getFormatId() {
- return 86;
- }
-
- @Override
- public byte[] getGlobalTransactionId() {
- return bs;
- }
-
- @Override
- public byte[] getBranchQualifier() {
- return bs;
- }
- };
- }
-
- private void assertTransactionGoneFromFailoverState(ActiveMQXAConnection connection1, Xid tid) throws Exception {
-
- FailoverTransport transport = connection1.getTransport().narrow(FailoverTransport.class);
- TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
- assertNull("transaction should not exist in the state tracker", transport.getStateTracker().processCommitTransactionOnePhase(info));
- }
-
- private void assertTransactionGoneFromBroker(Xid tid) throws Exception {
- BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
- TransactionBroker transactionBroker = (TransactionBroker) broker.getBroker().getAdaptor(TransactionBroker.class);
- try {
- transactionBroker.getTransaction(null, new XATransactionId(tid), false);
- fail("expected exception on tx not found");
- }
- catch (XAException expectedOnNotFound) {
- }
- }
-
- private void assertTransactionGoneFromConnection(String brokerName,
- String clientId,
- ConnectionId connectionId,
- Xid tid) throws Exception {
- BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
- CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections();
- for (TransportConnection connection : connections) {
- if (connection.getConnectionId().equals(clientId)) {
- try {
- connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
- fail("did not get expected excepton on missing transaction, it must be still there in error!");
- }
- catch (IllegalStateException expectedOnNoTransaction) {
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
deleted file mode 100644
index 0b74979..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
+++ /dev/null
@@ -1,511 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.net.URI;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.sql.DataSource;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.Broker;
-import org.apache.activemq.broker.BrokerFilter;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.Wait;
-import org.apache.derby.jdbc.EmbeddedDataSource;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-/**
- * Test creates a broker network with two brokers - producerBroker (with a
- * message producer attached) and consumerBroker (with consumer attached)
- * <br>
- * Simulates network duplicate message by stopping and restarting the
- * consumerBroker after message (with message ID ending in 120) is persisted to
- * consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the
- * network connection. When the network connection is reestablished the
- * producerBroker resends message (with messageID ending in 120).
- * <br>
- * Expectation:
- * <br>
- * With the following policy entries set, would expect the duplicate message to
- * be read from the store and dispatched to the consumer - where the duplicate
- * could be detected by consumer.
- * <br>
- * PolicyEntry policy = new PolicyEntry(); policy.setQueue(">");
- * policy.setEnableAudit(false); policy.setUseCache(false);
- * policy.setExpireMessagesPeriod(0);
- * <br>
- * <br>
- * Note 1: Network needs to use replaywhenNoConsumers so enabling the
- * networkAudit to avoid this scenario is not feasible.
- * <br>
- * NOTE 2: Added a custom plugin to the consumerBroker so that the
- * consumerBroker shutdown will occur after a message has been persisted to
- * consumerBroker store but before an ACK is sent back to ProducerBroker. This
- * is just a hack to ensure producerBroker will resend the message after
- * shutdown.
- */
-
-@RunWith(value = Parameterized.class)
-public class AMQ4952Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class);
-
- protected static final int MESSAGE_COUNT = 1;
-
- protected BrokerService consumerBroker;
- protected BrokerService producerBroker;
-
- protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store");
-
- private final CountDownLatch stopConsumerBroker = new CountDownLatch(1);
- private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1);
- private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1);
-
- private EmbeddedDataSource localDataSource;
-
- @Parameterized.Parameter(0)
- public boolean enableCursorAudit;
-
- @Parameterized.Parameters(name = "enableAudit={0}")
- public static Iterable<Object[]> getTestParameters() {
- return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
- }
-
- @Test
- public void testConsumerBrokerRestart() throws Exception {
-
- Callable consumeMessageTask = new Callable() {
- @Override
- public Object call() throws Exception {
-
- int receivedMessageCount = 0;
-
- ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false");
- Connection consumerConnection = consumerFactory.createConnection();
-
- try {
-
- consumerConnection.setClientID("consumer");
- consumerConnection.start();
-
- Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME);
-
- while (true) {
- TextMessage textMsg = (TextMessage) messageConsumer.receive(5000);
-
- if (textMsg == null) {
- return receivedMessageCount;
- }
-
- receivedMessageCount++;
- LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID());
-
- // on first delivery ensure the message is pending an
- // ack when it is resent from the producer broker
- if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) {
- LOG.info("Waiting for restart...");
- consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS);
- }
-
- textMsg.acknowledge();
- }
- }
- finally {
- consumerConnection.close();
- }
- }
- };
-
- Runnable consumerBrokerResetTask = new Runnable() {
- @Override
- public void run() {
-
- try {
- // wait for signal
- stopConsumerBroker.await();
-
- LOG.info("********* STOPPING CONSUMER BROKER");
-
- consumerBroker.stop();
- consumerBroker.waitUntilStopped();
-
- LOG.info("***** STARTING CONSUMER BROKER");
- // do not delete messages on startup
- consumerBroker = createConsumerBroker(false);
-
- LOG.info("***** CONSUMER BROKER STARTED!!");
- consumerBrokerRestarted.countDown();
-
- assertTrue("message forwarded on time", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("ProducerBroker totalMessageCount: " + producerBroker.getAdminView().getTotalMessageCount());
- return producerBroker.getAdminView().getTotalMessageCount() == 0;
- }
- }));
- consumerRestartedAndMessageForwarded.countDown();
-
- }
- catch (Exception e) {
- LOG.error("Exception when stopping/starting the consumerBroker ", e);
- }
-
- }
- };
-
- ExecutorService executor = Executors.newFixedThreadPool(2);
-
- // start consumerBroker start/stop task
- executor.execute(consumerBrokerResetTask);
-
- // start consuming messages
- Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask);
-
- produceMessages();
-
- // Wait for consumer to finish
- int totalMessagesConsumed = numberOfConsumedMessage.get();
-
- StringBuffer contents = new StringBuffer();
- boolean messageInStore = isMessageInJDBCStore(localDataSource, contents);
- LOG.debug("****number of messages received " + totalMessagesConsumed);
-
- assertEquals("number of messages received", 2, totalMessagesConsumed);
- assertEquals("messages left in store", true, messageInStore);
- assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ"));
- }
-
- private void produceMessages() throws JMSException {
-
- ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false");
- Connection producerConnection = producerFactory.createConnection();
-
- try {
- producerConnection.setClientID("producer");
- producerConnection.start();
-
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- final MessageProducer remoteProducer = producerSession.createProducer(QUEUE_NAME);
-
- int i = 0;
- while (MESSAGE_COUNT > i) {
- String payload = "test msg " + i;
- TextMessage msg = producerSession.createTextMessage(payload);
- remoteProducer.send(msg);
- i++;
- }
-
- }
- finally {
- producerConnection.close();
- }
- }
-
- @Before
- public void setUp() throws Exception {
- LOG.debug("Running with enableCursorAudit set to {}", this.enableCursorAudit);
- doSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- doTearDown();
- }
-
- protected void doTearDown() throws Exception {
-
- try {
- producerBroker.stop();
- }
- catch (Exception ex) {
- }
- try {
- consumerBroker.stop();
- }
- catch (Exception ex) {
- }
- }
-
- protected void doSetUp() throws Exception {
- producerBroker = createProducerBroker();
- consumerBroker = createConsumerBroker(true);
- }
-
- /**
- * Producer broker listens on localhost:2003 networks to consumerBroker -
- * localhost:2006
- *
- * @return
- * @throws Exception
- */
- protected BrokerService createProducerBroker() throws Exception {
-
- String networkToPorts[] = new String[]{"2006"};
- HashMap<String, String> networkProps = new HashMap<>();
-
- networkProps.put("networkTTL", "10");
- networkProps.put("conduitSubscriptions", "true");
- networkProps.put("decreaseNetworkConsumerPriority", "true");
- networkProps.put("dynamicOnly", "true");
-
- BrokerService broker = new BrokerService();
- broker.getManagementContext().setCreateConnector(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setBrokerName("BP");
- broker.setAdvisorySupport(false);
-
- // lazy init listener on broker start
- TransportConnector transportConnector = new TransportConnector();
- transportConnector.setUri(new URI("tcp://localhost:2003"));
- List<TransportConnector> transportConnectors = new ArrayList<>();
- transportConnectors.add(transportConnector);
- broker.setTransportConnectors(transportConnectors);
-
- // network to consumerBroker
-
- if (networkToPorts.length > 0) {
- StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false");
- NetworkConnector nc = broker.addNetworkConnector(builder.toString());
- IntrospectionSupport.setProperties(nc, networkProps);
- nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQQueue[]{QUEUE_NAME}));
- }
-
- // Persistence adapter
-
- JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
- EmbeddedDataSource remoteDataSource = new EmbeddedDataSource();
- remoteDataSource.setDatabaseName("target/derbyDBRemoteBroker");
- remoteDataSource.setCreateDatabase("create");
- jdbc.setDataSource(remoteDataSource);
- broker.setPersistenceAdapter(jdbc);
-
- // set Policy entries
- PolicyEntry policy = new PolicyEntry();
-
- policy.setQueue(">");
- policy.setEnableAudit(false);
- policy.setUseCache(false);
- policy.setExpireMessagesPeriod(0);
-
- // set replay with no consumers
- ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
- conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
- policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
-
- PolicyMap pMap = new PolicyMap();
- pMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(pMap);
-
- broker.start();
- broker.waitUntilStarted();
-
- return broker;
- }
-
- /**
- * consumerBroker - listens on localhost:2006
- *
- * @param deleteMessages - drop messages when broker instance is created
- * @return
- * @throws Exception
- */
- protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception {
-
- String scheme = "tcp";
- String listenPort = "2006";
-
- BrokerService broker = new BrokerService();
- broker.getManagementContext().setCreateConnector(false);
- broker.setDeleteAllMessagesOnStartup(deleteMessages);
- broker.setBrokerName("BC");
- // lazy init listener on broker start
- TransportConnector transportConnector = new TransportConnector();
- transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort));
- List<TransportConnector> transportConnectors = new ArrayList<>();
- transportConnectors.add(transportConnector);
- broker.setTransportConnectors(transportConnectors);
-
- // policy entries
-
- PolicyEntry policy = new PolicyEntry();
-
- policy.setQueue(">");
- policy.setEnableAudit(enableCursorAudit);
- policy.setExpireMessagesPeriod(0);
-
- // set replay with no consumers
- ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
- conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
- policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
-
- PolicyMap pMap = new PolicyMap();
-
- pMap.setDefaultEntry(policy);
- broker.setDestinationPolicy(pMap);
-
- // Persistence adapter
- JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter();
- EmbeddedDataSource localDataSource = new EmbeddedDataSource();
- localDataSource.setDatabaseName("target/derbyDBLocalBroker");
- localDataSource.setCreateDatabase("create");
- localJDBCPersistentAdapter.setDataSource(localDataSource);
- broker.setPersistenceAdapter(localJDBCPersistentAdapter);
-
- if (deleteMessages) {
- // no plugin on restart
- broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()});
- }
-
- this.localDataSource = localDataSource;
-
- broker.start();
- broker.waitUntilStarted();
-
- return broker;
- }
-
- /**
- * Query JDBC Store to see if messages are left
- *
- * @param dataSource
- * @return
- * @throws SQLException
- */
- private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException {
-
- boolean tableHasData = false;
- String query = "select * from ACTIVEMQ_MSGS";
-
- java.sql.Connection conn = dataSource.getConnection();
- PreparedStatement s = conn.prepareStatement(query);
-
- ResultSet set = null;
-
- try {
- StringBuffer headers = new StringBuffer();
- set = s.executeQuery();
- ResultSetMetaData metaData = set.getMetaData();
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
-
- if (i == 1) {
- headers.append("||");
- }
- headers.append(metaData.getColumnName(i) + "||");
- }
- LOG.error(headers.toString());
-
- while (set.next()) {
- tableHasData = true;
-
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- if (i == 1) {
- stringBuffer.append("|");
- }
- stringBuffer.append(set.getString(i) + "|");
- }
- LOG.error(stringBuffer.toString());
- }
- }
- finally {
- try {
- set.close();
- }
- catch (Throwable ignore) {
- }
- try {
- s.close();
- }
- catch (Throwable ignore) {
- }
-
- conn.close();
- }
-
- return tableHasData;
- }
-
- /**
- * plugin used to ensure consumerbroker is restared before the network
- * message from producerBroker is acked
- */
- class MyTestPlugin implements BrokerPlugin {
-
- @Override
- public Broker installPlugin(Broker broker) throws Exception {
- return new MyTestBroker(broker);
- }
- }
-
- class MyTestBroker extends BrokerFilter {
-
- public MyTestBroker(Broker next) {
- super(next);
- }
-
- @Override
- public void send(ProducerBrokerExchange producerExchange,
- org.apache.activemq.command.Message messageSend) throws Exception {
-
- super.send(producerExchange, messageSend);
- LOG.error("Stopping broker on send: " + messageSend.getMessageId().getProducerSequenceId());
- stopConsumerBroker.countDown();
- producerExchange.getConnectionContext().setDontSendReponse(true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
deleted file mode 100644
index beab4c3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.BrokerViewMBean;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ5035Test {
-
- private static final String CLIENT_ID = "amq-test-client-id";
- private static final String DURABLE_SUB_NAME = "testDurable";
-
- private final String xbean = "xbean:";
- private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035";
-
- private static BrokerService brokerService;
- private String connectionUri;
-
- @Before
- public void setUp() throws Exception {
- brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml");
- connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
- brokerService.setDeleteAllMessagesOnStartup(true);
- brokerService.start();
- brokerService.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- brokerService.waitUntilStopped();
- }
-
- @Test
- public void testFoo() throws Exception {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
- Connection connection = factory.createConnection();
- connection.setClientID(CLIENT_ID);
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("Test.Topic");
- MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME);
- consumer.close();
-
- BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME);
- brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME);
- }
-
- private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException {
- ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
- BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
- assertNotNull(view);
- return view;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java
deleted file mode 100644
index 8596683..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ5136Test {
-
- BrokerService brokerService;
-
- @Before
- public void startBroker() throws Exception {
- brokerService = new BrokerService();
- brokerService.setPersistent(false);
- brokerService.start();
- }
-
- @After
- public void stopBroker() throws Exception {
- brokerService.stop();
- }
-
- @Test
- public void memoryUsageOnCommit() throws Exception {
- sendMessagesAndAssertMemoryUsage(new TransactionHandler() {
- @Override
- public void finishTransaction(Session session) throws JMSException {
- session.commit();
- }
- });
- }
-
- @Test
- public void memoryUsageOnRollback() throws Exception {
- sendMessagesAndAssertMemoryUsage(new TransactionHandler() {
- @Override
- public void finishTransaction(Session session) throws JMSException {
- session.rollback();
- }
- });
- }
-
- private void sendMessagesAndAssertMemoryUsage(TransactionHandler transactionHandler) throws Exception {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Topic destination = session.createTopic("ActiveMQBug");
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < 100; i++) {
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(generateBytes());
- producer.send(message);
- transactionHandler.finishTransaction(session);
- }
- connection.close();
- org.junit.Assert.assertEquals(0, BrokerRegistry.getInstance().findFirst().getSystemUsage().getMemoryUsage().getPercentUsage());
- }
-
- private byte[] generateBytes() {
- byte[] bytes = new byte[100000];
- for (int i = 0; i < 100000; i++) {
- bytes[i] = (byte) i;
- }
- return bytes;
- }
-
- private static interface TransactionHandler {
-
- void finishTransaction(Session session) throws JMSException;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
deleted file mode 100644
index dc37c79..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQMessageProducer;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.Wait;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-@RunWith(value = Parameterized.class)
-public class AMQ5212Test {
-
- BrokerService brokerService;
-
- @Parameterized.Parameter(0)
- public boolean concurrentStoreAndDispatchQ = true;
-
- @Parameterized.Parameters(name = "concurrentStoreAndDispatch={0}")
- public static Iterable<Object[]> getTestParameters() {
- return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
- }
-
- @Before
- public void setUp() throws Exception {
- start(true);
- }
-
- public void start(boolean deleteAllMessages) throws Exception {
- brokerService = new BrokerService();
- if (deleteAllMessages) {
- brokerService.deleteAllMessages();
- }
- ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQ);
- brokerService.addConnector("tcp://localhost:0");
- brokerService.setAdvisorySupport(false);
- brokerService.start();
- }
-
- @After
- public void tearDown() throws Exception {
- brokerService.stop();
- }
-
- @Test
- public void verifyDuplicateSuppressionWithConsumer() throws Exception {
- doVerifyDuplicateSuppression(100, 100, true);
- }
-
- @Test
- public void verifyDuplicateSuppression() throws Exception {
- doVerifyDuplicateSuppression(100, 100, false);
- }
-
- public void doVerifyDuplicateSuppression(final int numToSend,
- final int expectedTotalEnqueue,
- final boolean demand) throws Exception {
- final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
- connectionFactory.setCopyMessageOnSend(false);
- connectionFactory.setWatchTopicAdvisories(false);
-
- final int concurrency = 40;
- final AtomicInteger workCount = new AtomicInteger(numToSend);
- ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
- for (int i = 0; i < concurrency; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- int i;
- while ((i = workCount.getAndDecrement()) > 0) {
- ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
- activeMQConnection.start();
- ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- ActiveMQQueue dest = new ActiveMQQueue("queue-" + i + "-" + AMQ5212Test.class.getSimpleName());
- ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest);
- if (demand) {
- // create demand so page in will happen
- activeMQSession.createConsumer(dest);
- }
- ActiveMQTextMessage message = new ActiveMQTextMessage();
- message.setDestination(dest);
- activeMQMessageProducer.send(message, null);
-
- // send a duplicate
- activeMQConnection.syncSendPacket(message);
- activeMQConnection.close();
-
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- TimeUnit.SECONDS.sleep(1);
- executorService.shutdown();
- executorService.awaitTermination(5, TimeUnit.MINUTES);
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return expectedTotalEnqueue == brokerService.getAdminView().getTotalEnqueueCount();
- }
- });
- assertEquals("total enqueue as expected", expectedTotalEnqueue, brokerService.getAdminView().getTotalEnqueueCount());
- }
-
- @Test
- public void verifyConsumptionOnDuplicate() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
- connectionFactory.setCopyMessageOnSend(false);
- connectionFactory.setWatchTopicAdvisories(false);
-
- ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
- activeMQConnection.start();
- ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ActiveMQQueue dest = new ActiveMQQueue("Q");
- ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest);
- ActiveMQTextMessage message = new ActiveMQTextMessage();
- message.setDestination(dest);
- activeMQMessageProducer.send(message, null);
-
- // send a duplicate
- activeMQConnection.syncSendPacket(message);
-
- activeMQConnection.close();
-
- // verify original can be consumed after restart
- brokerService.stop();
- brokerService.start(false);
-
- connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
- connectionFactory.setCopyMessageOnSend(false);
- connectionFactory.setWatchTopicAdvisories(false);
-
- activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
- activeMQConnection.start();
- activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer messageConsumer = activeMQSession.createConsumer(dest);
- Message received = messageConsumer.receive(4000);
- assertNotNull("Got message", received);
- assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID());
-
- activeMQConnection.close();
- }
-
- @Test
- public void verifyClientAckConsumptionOnDuplicate() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
- connectionFactory.setCopyMessageOnSend(false);
- connectionFactory.setWatchTopicAdvisories(false);
-
- ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
- activeMQConnection.start();
- ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- ActiveMQQueue dest = new ActiveMQQueue("Q");
-
- MessageConsumer messageConsumer = activeMQSession.createConsumer(dest);
-
- ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest);
- ActiveMQTextMessage message = new ActiveMQTextMessage();
- message.setDestination(dest);
- activeMQMessageProducer.send(message, null);
-
- // send a duplicate
- activeMQConnection.syncSendPacket(message);
-
- Message received = messageConsumer.receive(4000);
- assertNotNull("Got message", received);
- assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID());
- messageConsumer.close();
-
- messageConsumer = activeMQSession.createConsumer(dest);
- received = messageConsumer.receive(4000);
- assertNotNull("Got message", received);
- assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID());
- received.acknowledge();
-
- activeMQConnection.close();
- }
-}