You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/15 21:22:05 UTC
[22/59] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
deleted file mode 100644
index 362fa5c..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsMultipleClientsTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.BlockJUnit4ClassRunner;
-
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-
-@RunWith(BlockJUnit4ClassRunner.class)
-public class AMQ2910Test extends JmsMultipleClientsTestSupport {
-
- final int maxConcurrency = 60;
- final int msgCount = 200;
- final Vector<Throwable> exceptions = new Vector<>();
-
- @Override
- protected BrokerService createBroker() throws Exception {
- //persistent = true;
- BrokerService broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.addConnector("tcp://localhost:0");
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
- defaultEntry.setCursorMemoryHighWaterMark(50);
- defaultEntry.setMemoryLimit(500 * 1024);
- defaultEntry.setProducerFlowControl(false);
- policyMap.setDefaultEntry(defaultEntry);
- broker.setDestinationPolicy(policyMap);
-
- broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024);
-
- return broker;
- }
-
- @Test(timeout = 30 * 1000)
- public void testConcurrentSendToPendingCursor() throws Exception {
- final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
- factory.setCloseTimeout(30000);
- ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < maxConcurrency; i++) {
- final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessages(factory.createConnection(), dest, msgCount);
- }
- catch (Throwable t) {
- exceptions.add(t);
- }
- }
- });
- }
-
- executor.shutdown();
-
- assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS));
- assertNoExceptions();
-
- executor = Executors.newCachedThreadPool();
- for (int i = 0; i < maxConcurrency; i++) {
- final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- startConsumers(factory, dest);
- }
- catch (Throwable t) {
- exceptions.add(t);
- }
- }
- });
- }
-
- executor.shutdown();
- assertTrue("consumers completed", executor.awaitTermination(60, TimeUnit.SECONDS));
-
- allMessagesList.setMaximumDuration(120 * 1000);
- final int numExpected = maxConcurrency * msgCount;
- allMessagesList.waitForMessagesToArrive(numExpected);
-
- if (allMessagesList.getMessageCount() != numExpected) {
- dumpAllThreads(getName());
-
- }
- allMessagesList.assertMessagesReceivedNoWait(numExpected);
-
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-
- }
-
- private void assertNoExceptions() {
- if (!exceptions.isEmpty()) {
- for (Throwable t : exceptions) {
- t.printStackTrace();
- }
- }
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
deleted file mode 100644
index 573cdd3..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.RedeliveryPolicy;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ2982Test {
-
- private static final int MAX_MESSAGES = 500;
-
- private static final String QUEUE_NAME = "test.queue";
-
- private BrokerService broker;
-
- private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
-
- private CleanableKahaDBStore kahaDB;
-
- private static class CleanableKahaDBStore extends KahaDBStore {
-
- // make checkpoint cleanup accessible
- public void forceCleanup() throws IOException {
- checkpointCleanup(true);
- }
-
- public int getFileMapSize() throws IOException {
- // ensure save memory publishing, use the right lock
- indexLock.readLock().lock();
- try {
- return getJournal().getFileMap().size();
- }
- finally {
- indexLock.readLock().unlock();
- }
- }
- }
-
- @Before
- public void setup() throws Exception {
-
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(true);
-
- kahaDB = new CleanableKahaDBStore();
- kahaDB.setJournalMaxFileLength(256 * 1024);
- broker.setPersistenceAdapter(kahaDB);
-
- broker.start();
- broker.waitUntilStarted();
- }
-
- private Connection registerDLQMessageListener() throws Exception {
- ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(session.createQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- messageCountDown.countDown();
- }
- });
-
- return connection;
- }
-
- class ConsumerThread extends Thread {
-
- @Override
- public void run() {
- try {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
-
- RedeliveryPolicy policy = new RedeliveryPolicy();
- policy.setMaximumRedeliveries(0);
- policy.setInitialRedeliveryDelay(100);
- policy.setUseExponentialBackOff(false);
-
- factory.setRedeliveryPolicy(policy);
-
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
- do {
- Message message = consumer.receive(300);
- if (message != null) {
- session.rollback();
- }
- } while (messageCountDown.getCount() != 0);
- consumer.close();
- session.close();
- connection.close();
- }
- catch (Exception e) {
- Assert.fail(e.getMessage());
- }
- }
- }
-
- private void sendMessages() throws Exception {
- ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < MAX_MESSAGES; i++) {
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(new byte[1000]);
- producer.send(message);
- }
- producer.close();
- session.close();
- connection.close();
- }
-
- @Test
- public void testNoStickyKahaDbLogFilesOnLocalTransactionRollback() throws Exception {
-
- Connection dlqConnection = registerDLQMessageListener();
-
- ConsumerThread thread = new ConsumerThread();
- thread.start();
-
- sendMessages();
-
- thread.join(60 * 1000);
- assertFalse(thread.isAlive());
-
- dlqConnection.close();
-
- kahaDB.forceCleanup();
-
- assertEquals("only one active KahaDB log file after cleanup is expected", 1, kahaDB.getFileMapSize());
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
deleted file mode 100644
index 8714477..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ2983Test {
-
- private static final int MAX_CONSUMER = 10;
-
- private static final int MAX_MESSAGES = 2000;
-
- private static final String QUEUE_NAME = "test.queue";
-
- private BrokerService broker;
-
- private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
-
- private CleanableKahaDBStore kahaDB;
-
- private static class CleanableKahaDBStore extends KahaDBStore {
-
- // make checkpoint cleanup accessible
- public void forceCleanup() throws IOException {
- checkpointCleanup(true);
- }
-
- public int getFileMapSize() throws IOException {
- // ensure save memory publishing, use the right lock
- indexLock.readLock().lock();
- try {
- return getJournal().getFileMap().size();
- }
- finally {
- indexLock.readLock().unlock();
- }
- }
- }
-
- private class ConsumerThread extends Thread {
-
- @Override
- public void run() {
- try {
- ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
- do {
- Message message = consumer.receive(200);
- if (message != null) {
- session.commit();
- messageCountDown.countDown();
- }
- } while (messageCountDown.getCount() != 0);
- consumer.close();
- session.close();
- connection.close();
- }
- catch (Exception e) {
- Assert.fail(e.getMessage());
- }
- }
- }
-
- @Before
- public void setup() throws Exception {
-
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(true);
-
- kahaDB = new CleanableKahaDBStore();
- kahaDB.setJournalMaxFileLength(256 * 1024);
- broker.setPersistenceAdapter(kahaDB);
-
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- @Test
- public void testNoStickyKahaDbLogFilesOnConcurrentTransactionalConsumer() throws Exception {
-
- List<Thread> consumerThreads = new ArrayList<>();
- for (int i = 0; i < MAX_CONSUMER; i++) {
- ConsumerThread thread = new ConsumerThread();
- thread.start();
- consumerThreads.add(thread);
- }
- sendMessages();
-
- boolean allMessagesReceived = messageCountDown.await(60, TimeUnit.SECONDS);
- assertTrue(allMessagesReceived);
-
- for (Thread thread : consumerThreads) {
- thread.join(TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS));
- assertFalse(thread.isAlive());
- }
- kahaDB.forceCleanup();
- assertEquals("Expect only one active KahaDB log file after cleanup", 1, kahaDB.getFileMapSize());
- }
-
- private void sendMessages() throws Exception {
- ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0; i < MAX_MESSAGES; i++) {
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(new byte[200]);
- producer.send(message);
- }
- producer.close();
- session.close();
- connection.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
deleted file mode 100644
index 1e3c737..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.Connection;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.thread.Task;
-import org.apache.activemq.thread.TaskRunner;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.*;
-import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * This test involves the creation of a local and remote broker, both of which
- * communicate over VM and TCP. The local broker establishes a bridge to the
- * remote broker for the purposes of verifying that broker info is only
- * transferred once the local broker's ID is known to the bridge support.
- */
-public class AMQ3014Test {
-
- // Change this URL to be an unused port.
- private static final String BROKER_URL = "tcp://localhost:0";
-
- private List<BrokerInfo> remoteBrokerInfos = Collections.synchronizedList(new ArrayList<BrokerInfo>());
-
- private BrokerService localBroker = new BrokerService();
-
- // Override the "remote" broker so that it records all (remote) BrokerInfos
- // that it receives.
- private BrokerService remoteBroker = new BrokerService() {
- @Override
- protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
- TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
- return new TransportConnector(transport) {
- @Override
- protected Connection createConnection(Transport transport) throws IOException {
- Connection connection = super.createConnection(transport);
- final TransportListener proxiedListener = transport.getTransportListener();
- transport.setTransportListener(new TransportListener() {
-
- @Override
- public void onCommand(Object command) {
- if (command instanceof BrokerInfo) {
- remoteBrokerInfos.add((BrokerInfo) command);
- }
- proxiedListener.onCommand(command);
- }
-
- @Override
- public void onException(IOException error) {
- proxiedListener.onException(error);
- }
-
- @Override
- public void transportInterupted() {
- proxiedListener.transportInterupted();
- }
-
- @Override
- public void transportResumed() {
- proxiedListener.transportResumed();
- }
- });
- return connection;
- }
-
- };
- }
- };
-
- @Before
- public void init() throws Exception {
- localBroker.setBrokerName("localBroker");
- localBroker.setPersistent(false);
- localBroker.setUseJmx(false);
- localBroker.setSchedulerSupport(false);
-
- remoteBroker.setBrokerName("remoteBroker");
- remoteBroker.setPersistent(false);
- remoteBroker.setUseJmx(false);
- remoteBroker.addConnector(BROKER_URL);
- remoteBroker.setSchedulerSupport(false);
- }
-
- @After
- public void cleanup() throws Exception {
- try {
- localBroker.stop();
- }
- finally {
- remoteBroker.stop();
- }
- }
-
- /**
- * This test verifies that the local broker's ID is typically known by the
- * bridge support before the local broker's BrokerInfo is sent to the remote
- * broker.
- */
- @Test
- public void NormalCaseTest() throws Exception {
- runTest(0, 3000);
- }
-
- /**
- * This test verifies that timing can arise under which the local broker's
- * ID is not known by the bridge support before the local broker's
- * BrokerInfo is sent to the remote broker.
- */
- @Test
- public void DelayedCaseTest() throws Exception {
- runTest(500, 3000);
- }
-
- private void runTest(final long taskRunnerDelay, long timeout) throws Exception {
- // Add a network connector to the local broker that will create a bridge
- // to the remote broker.
- DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
- SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
- da.setServices(remoteBroker.getTransportConnectors().get(0).getPublishableConnectString());
- dnc.setDiscoveryAgent(da);
- localBroker.addNetworkConnector(dnc);
-
- // Before starting the local broker, intercept the task runner factory
- // so that the
- // local VMTransport dispatcher is artificially delayed.
- final TaskRunnerFactory realTaskRunnerFactory = localBroker.getTaskRunnerFactory();
- localBroker.setTaskRunnerFactory(new TaskRunnerFactory() {
- @Override
- public TaskRunner createTaskRunner(Task task, String name) {
- final TaskRunner realTaskRunner = realTaskRunnerFactory.createTaskRunner(task, name);
- if (name.startsWith("ActiveMQ Connection Dispatcher: ")) {
- return new TaskRunner() {
- @Override
- public void shutdown() throws InterruptedException {
- realTaskRunner.shutdown();
- }
-
- @Override
- public void shutdown(long timeout) throws InterruptedException {
- realTaskRunner.shutdown(timeout);
- }
-
- @Override
- public void wakeup() throws InterruptedException {
- Thread.sleep(taskRunnerDelay);
- realTaskRunner.wakeup();
- }
- };
- }
- else {
- return realTaskRunnerFactory.createTaskRunner(task, name);
- }
- }
- });
-
- // Start the brokers and wait for the bridge to be created; the remote
- // broker is started first to ensure it is available for the local
- // broker to connect to.
- remoteBroker.start();
- localBroker.start();
-
- // Wait for the remote broker to receive the local broker's BrokerInfo
- // and then verify the local broker's ID is known.
- long startTimeMillis = System.currentTimeMillis();
- while (remoteBrokerInfos.isEmpty() && (System.currentTimeMillis() - startTimeMillis) < timeout) {
- Thread.sleep(100);
- }
-
- Assert.assertFalse("Timed out waiting for bridge to form.", remoteBrokerInfos.isEmpty());
- Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get(0).getBrokerId());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
deleted file mode 100644
index 88a0db8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.apache.activemq.util.ConsumerThread;
-import org.apache.activemq.util.ProducerThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
-
-import javax.jms.*;
-
-import java.io.File;
-
-import static org.junit.Assert.assertEquals;
-
-public class AMQ3120Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ3120Test.class);
-
- BrokerService broker = null;
- File kahaDbDir = null;
- private final Destination destination = new ActiveMQQueue("AMQ3120Test");
- final String payload = new String(new byte[1024]);
-
- protected void startBroker(boolean delete) throws Exception {
- broker = new BrokerService();
-
- //Start with a clean directory
- kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
- deleteDir(kahaDbDir);
-
- broker.setSchedulerSupport(false);
- broker.setDeleteAllMessagesOnStartup(delete);
- broker.setPersistent(true);
- broker.setUseJmx(false);
- broker.addConnector("tcp://localhost:0");
-
- PolicyMap map = new PolicyMap();
- PolicyEntry entry = new PolicyEntry();
- entry.setUseCache(false);
- map.setDefaultEntry(entry);
- broker.setDestinationPolicy(map);
-
- configurePersistence(broker, delete);
-
- broker.start();
- LOG.info("Starting broker..");
- }
-
- protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
- KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
-
- // ensure there are a bunch of data files but multiple entries in each
- adapter.setJournalMaxFileLength(1024 * 20);
-
- // speed up the test case, checkpoint and cleanup early and often
- adapter.setCheckpointInterval(500);
- adapter.setCleanupInterval(500);
-
- if (!deleteAllOnStart) {
- adapter.setForceRecoverIndex(true);
- }
-
- }
-
- private boolean deleteDir(File dir) {
- if (dir.isDirectory()) {
- String[] children = dir.list();
- for (int i = 0; i < children.length; i++) {
- boolean success = deleteDir(new File(dir, children[i]));
- if (!success) {
- return false;
- }
- }
- }
-
- return dir.delete();
- }
-
- private int getFileCount(File dir) {
- if (dir.isDirectory()) {
- String[] children = dir.list();
- return children.length;
- }
-
- return 0;
- }
-
- @Test
- public void testCleanupOfFiles() throws Exception {
- final int messageCount = 500;
- startBroker(true);
- int fileCount = getFileCount(kahaDbDir);
- assertEquals(4, fileCount);
-
- Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
- connection.start();
- Session producerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Session consumerSess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- ProducerThread producer = new ProducerThread(producerSess, destination) {
- @Override
- protected Message createMessage(int i) throws Exception {
- return sess.createTextMessage(payload + "::" + i);
- }
- };
- producer.setSleep(650);
- producer.setMessageCount(messageCount);
- ConsumerThread consumer = new ConsumerThread(consumerSess, destination);
- consumer.setBreakOnNull(false);
- consumer.setMessageCount(messageCount);
-
- producer.start();
- consumer.start();
-
- producer.join();
- consumer.join();
-
- assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
-
- broker.stop();
- broker.waitUntilStopped();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
deleted file mode 100644
index 621b421..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3140Test {
-
- private static final int MESSAGES_PER_THREAD = 100;
-
- private static final int THREAD_COUNT = 10;
-
- private BrokerService broker;
-
- private static final String QUEUE_NAME = "test";
-
- private static class Sender extends Thread {
-
- private static final int DELAY = 3000;
-
- @Override
- public void run() {
- try {
- ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
- Message message = session.createTextMessage("test");
- for (int i = 0; i < MESSAGES_PER_THREAD; i++) {
- message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY);
- producer.send(message);
- }
- session.close();
- connection.close();
- }
- catch (JMSException e) {
- fail(e.getMessage());
- }
- }
- }
-
- @Before
- public void setup() throws Exception {
- File schedulerDirectory = new File("target/test/ScheduledDB");
-
- IOHelper.mkdirs(schedulerDirectory);
- IOHelper.deleteChildren(schedulerDirectory);
-
- broker = new BrokerService();
- broker.setSchedulerSupport(true);
- broker.setPersistent(true);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setDataDirectory("target");
- broker.setSchedulerDirectoryFile(schedulerDirectory);
- broker.setUseJmx(false);
- broker.addConnector("vm://localhost");
-
- broker.start();
- broker.waitUntilStarted();
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
- @Test
- public void noMessageLostOnConcurrentScheduling() throws JMSException, InterruptedException {
-
- final AtomicLong receiveCounter = new AtomicLong();
-
- ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message message) {
- receiveCounter.incrementAndGet();
- }
- });
-
- List<Sender> senderThreads = new ArrayList<>();
- for (int i = 0; i < THREAD_COUNT; i++) {
- Sender sender = new Sender();
- senderThreads.add(sender);
- }
- for (Sender sender : senderThreads) {
- sender.start();
- }
- for (Sender sender : senderThreads) {
- sender.join();
- }
-
- // wait until all scheduled messages has been received
- TimeUnit.MINUTES.sleep(2);
-
- session.close();
- connection.close();
-
- assertEquals(MESSAGES_PER_THREAD * THREAD_COUNT, receiveCounter.get());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
deleted file mode 100644
index 49db143..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ScheduledMessage;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class AMQ3141Test {
-
- private static final int MAX_MESSAGES = 100;
-
- private static final long DELAY_IN_MS = 100;
-
- private static final String QUEUE_NAME = "target.queue";
-
- private BrokerService broker;
-
- private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES);
-
- private ConnectionFactory factory;
-
- @Before
- public void setup() throws Exception {
-
- broker = new BrokerService();
- broker.setPersistent(true);
- broker.setSchedulerSupport(true);
- broker.setDataDirectory("target");
- broker.setUseJmx(false);
- broker.addConnector("vm://localhost");
-
- File schedulerDirectory = new File("target/test/ScheduledDB");
- IOHelper.mkdirs(schedulerDirectory);
- IOHelper.deleteChildren(schedulerDirectory);
- broker.setSchedulerDirectoryFile(schedulerDirectory);
-
- broker.start();
- broker.waitUntilStarted();
-
- factory = new ActiveMQConnectionFactory("vm://localhost");
- }
-
- private void sendMessages() throws Exception {
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
- for (int i = 0; i < MAX_MESSAGES; i++) {
- Message message = session.createTextMessage();
- message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY_IN_MS);
- producer.send(message);
- }
- connection.close();
- }
-
- @Test
- public void testNoMissingMessagesOnShortScheduleDelay() throws Exception {
-
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
-
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- messageCountDown.countDown();
- }
- });
- sendMessages();
-
- boolean receiveComplete = messageCountDown.await(5, TimeUnit.SECONDS);
-
- connection.close();
-
- assertTrue("expect all messages received but " + messageCountDown.getCount() + " are missing", receiveComplete);
- }
-
- @After
- public void tearDown() throws Exception {
- broker.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
deleted file mode 100644
index 7e7c959..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3145Test {
-
- private static final Logger LOG = LoggerFactory.getLogger(AMQ3145Test.class);
- private final String MESSAGE_TEXT = new String(new byte[1024]);
- BrokerService broker;
- ConnectionFactory factory;
- Connection connection;
- Session session;
- Queue queue;
- MessageConsumer consumer;
-
- @Before
- public void createBroker() throws Exception {
- createBroker(true);
- }
-
- public void createBroker(boolean deleteAll) throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(deleteAll);
- broker.setDataDirectory("target/AMQ3145Test");
- broker.setUseJmx(true);
- broker.getManagementContext().setCreateConnector(false);
- broker.addConnector("tcp://localhost:0");
- broker.start();
- broker.waitUntilStarted();
- factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
- connection = factory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- }
-
- @After
- public void tearDown() throws Exception {
- if (consumer != null) {
- consumer.close();
- }
- session.close();
- connection.stop();
- connection.close();
- broker.stop();
- }
-
- @Test
- public void testCacheDisableReEnable() throws Exception {
- createProducerAndSendMessages(1);
- QueueViewMBean proxy = getProxyToQueueViewMBean();
- assertTrue("cache is enabled", proxy.isCacheEnabled());
- tearDown();
- createBroker(false);
- proxy = getProxyToQueueViewMBean();
- assertEquals("one pending message", 1, proxy.getQueueSize());
- assertTrue("cache is disabled when there is a pending message", !proxy.isCacheEnabled());
-
- createConsumer(1);
- createProducerAndSendMessages(1);
- assertTrue("cache is enabled again on next send when there are no messages", proxy.isCacheEnabled());
- }
-
- private QueueViewMBean getProxyToQueueViewMBean() throws MalformedObjectNameException, JMSException {
- ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + ":destinationType=Queue,destinationName=" + queue.getQueueName() + ",type=Broker,brokerName=localhost");
- QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
- return proxy;
- }
-
- private void createProducerAndSendMessages(int numToSend) throws Exception {
- queue = session.createQueue("test1");
- MessageProducer producer = session.createProducer(queue);
- for (int i = 0; i < numToSend; i++) {
- TextMessage message = session.createTextMessage(MESSAGE_TEXT + i);
- if (i != 0 && i % 50000 == 0) {
- LOG.info("sent: " + i);
- }
- producer.send(message);
- }
- producer.close();
- }
-
- private void createConsumer(int numToConsume) throws Exception {
- consumer = session.createConsumer(queue);
- // wait for buffer fill out
- for (int i = 0; i < numToConsume; ++i) {
- Message message = consumer.receive(2000);
- message.acknowledge();
- }
- consumer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
deleted file mode 100644
index 34b1909..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.bugs;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-
-import org.apache.activemq.EmbeddedBrokerTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.spring.ConsumerBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AMQ3157Test extends EmbeddedBrokerTestSupport {
-
- private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3157Test.class);
- private Connection connection;
-
- public void testInactiveMirroredQueueIsCleanedUp() throws Exception {
-
- if (connection == null) {
- connection = createConnection();
- }
- connection.start();
-
- ConsumerBean messageList = new ConsumerBean();
- messageList.setVerbose(true);
-
- ActiveMQDestination consumeDestination = createConsumeDestination();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- LOG.info("Consuming from: " + consumeDestination);
-
- MessageConsumer c1 = session.createConsumer(consumeDestination);
- c1.setMessageListener(messageList);
-
- // create topic producer
- ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName());
- LOG.info("Sending to: " + sendDestination);
-
- MessageProducer producer = session.createProducer(sendDestination);
- assertNotNull(producer);
-
- final int total = 10;
- for (int i = 0; i < total; i++) {
- producer.send(session.createTextMessage("message: " + i));
- }
-
- messageList.assertMessagesArrived(total);
- LOG.info("Received: " + messageList);
- messageList.flushMessages();
-
- MessageConsumer c2 = session.createConsumer(sendDestination);
- c2.setMessageListener(messageList);
- messageList.assertMessagesArrived(total);
- LOG.info("Q Received: " + messageList);
-
- connection.close();
-
- List<ObjectName> topics = Arrays.asList(broker.getAdminView().getTopics());
- assertTrue(topics.contains(createObjectName(consumeDestination)));
- List<ObjectName> queues = Arrays.asList(broker.getAdminView().getQueues());
- assertTrue(queues.contains(createObjectName(sendDestination)));
-
- Thread.sleep(TimeUnit.SECONDS.toMillis(10));
-
- topics = Arrays.asList(broker.getAdminView().getTopics());
- if (topics != null) {
- assertFalse("Virtual Topic Desination did not get cleaned up.", topics.contains(createObjectName(consumeDestination)));
- }
- queues = Arrays.asList(broker.getAdminView().getQueues());
- if (queues != null) {
- assertFalse("Mirrored Queue Desination did not get cleaned up.", queues.contains(createObjectName(sendDestination)));
- }
- }
-
- protected ActiveMQDestination createConsumeDestination() {
- return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());
- }
-
- protected String getQueueName() {
- return "My.Queue";
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService answer = new BrokerService();
- answer.setUseMirroredQueues(true);
- answer.setPersistent(isPersistent());
- answer.setSchedulePeriodForDestinationPurge(1000);
-
- PolicyEntry entry = new PolicyEntry();
- entry.setGcInactiveDestinations(true);
- entry.setInactiveTimeoutBeforeGC(5000);
- entry.setProducerFlowControl(true);
- PolicyMap map = new PolicyMap();
- map.setDefaultEntry(entry);
-
- MirroredQueue mirrorQ = new MirroredQueue();
- mirrorQ.setCopyMessage(true);
- DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ};
- answer.setDestinationInterceptors(destinationInterceptors);
-
- answer.setDestinationPolicy(map);
- answer.addConnector(bindAddress);
-
- return answer;
- }
-
- protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
- String domain = "org.apache.activemq";
- ObjectName name;
- if (destination.isQueue()) {
- name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName());
- }
- else {
- name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName());
- }
- return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
- }
-
- protected ObjectName createObjectName(ActiveMQDestination destination) throws Exception {
- String domain = "org.apache.activemq";
- ObjectName name;
- if (destination.isQueue()) {
- name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
- "destinationType=Queue,destinationName=" + destination.getPhysicalName());
- }
- else {
- name = new ObjectName(domain + ":type=Broker,brokerName=localhost," +
- "destinationType=Topic,destinationName=" + destination.getPhysicalName());
- }
-
- return name;
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (connection != null) {
- connection.close();
- }
- super.tearDown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
deleted file mode 100644
index 6fd81b2..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.bugs;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2.
- * <br>
- * Symptoms: - 1 record is lost "early" in the stream. - no more records lost.
- * <br>
- * Test Configuration: - Broker Settings: - Destination Policy - Occurs with "Destination Policy" using Store Cursor and
- * a memory limit - Not reproduced without "Destination Policy" defined - Persistence Adapter - Memory: Does not occur.
- * - KahaDB: Occurs. - Messages - Occurs with TextMessage and BinaryMessage - Persistent messages.
- * <br>
- * Notes: - Lower memory limits increase the rate of occurrence. - Higher memory limits may prevent the problem
- * (probably because memory limits not reached). - Producers sending a number of messages before consumers come online
- * increases rate of occurrence.
- */
-
-public class AMQ3167Test {
-
- protected BrokerService embeddedBroker;
-
- protected static final int MEMORY_LIMIT = 16 * 1024;
-
- protected static boolean Debug_f = false;
-
- protected long Producer_stop_time = 0;
- protected long Consumer_stop_time = 0;
- protected long Consumer_startup_delay_ms = 2000;
- protected boolean Stop_after_error = true;
-
- protected Connection JMS_conn;
- protected long Num_error = 0;
-
- // // ////
- // // UTILITIES ////
- // // ////
-
- /**
- * Create a new, unsecured, client connection to the test broker using the given username and password. This
- * connection bypasses all security.
- * <br>
- * Don't forget to start the connection or no messages will be received by consumers even though producers will work
- * fine.
- *
- * @username name of the JMS user for the connection; may be null.
- * @password Password for the JMS user; may be null.
- */
-
- protected Connection createUnsecuredConnection(String username, String password) throws javax.jms.JMSException {
- ActiveMQConnectionFactory conn_fact;
-
- conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
-
- return conn_fact.createConnection(username, password);
- }
-
- // // ////
- // // TEST FUNCTIONALITY ////
- // // ////
-
- @Before
- public void testPrep() throws Exception {
- embeddedBroker = new BrokerService();
- configureBroker(embeddedBroker);
- embeddedBroker.start();
- embeddedBroker.waitUntilStarted();
-
- // Prepare the connection
- JMS_conn = createUnsecuredConnection(null, null);
- JMS_conn.start();
- }
-
- @After
- public void testCleanup() throws java.lang.Exception {
- JMS_conn.stop();
- embeddedBroker.stop();
- }
-
- protected void configureBroker(BrokerService broker_svc) throws Exception {
-
- broker_svc.setBrokerName("testbroker1");
-
- broker_svc.setUseJmx(false);
- broker_svc.setPersistent(true);
- broker_svc.setDataDirectory("target/AMQ3167Test");
- configureDestinationPolicy(broker_svc);
- }
-
- /**
- * NOTE: overrides any prior policy map defined for the broker service.
- */
-
- protected void configureDestinationPolicy(BrokerService broker_svc) {
- PolicyMap pol_map;
- PolicyEntry pol_ent;
- ArrayList<PolicyEntry> ent_list;
-
- ent_list = new ArrayList<>();
-
- //
- // QUEUES
- //
-
- pol_ent = new PolicyEntry();
- pol_ent.setQueue(">");
- pol_ent.setMemoryLimit(MEMORY_LIMIT);
- pol_ent.setProducerFlowControl(false);
- ent_list.add(pol_ent);
-
- //
- // COMPLETE POLICY MAP
- //
-
- pol_map = new PolicyMap();
- pol_map.setPolicyEntries(ent_list);
-
- broker_svc.setDestinationPolicy(pol_map);
- }
-
- // // ////
- // // TEST ////
- // // ////
-
- @Test
- public void testQueueLostMessage() throws Exception {
- Destination dest;
-
- dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE);
-
- // 10 seconds from now
- Producer_stop_time = java.lang.System.nanoTime() + (10L * 1000000000L);
-
- // 15 seconds from now
- Consumer_stop_time = Producer_stop_time + (5L * 1000000000L);
-
- runLostMsgTest(dest, 1000000, 1, 1, false);
-
- // Make sure failures in the threads are thoroughly reported in the JUnit framework.
- assertTrue(Num_error == 0);
- }
-
- /**
- *
- */
-
- protected static void log(String msg) {
- if (Debug_f)
- java.lang.System.err.println(msg);
- }
-
- /**
- * Main body of the lost-message test.
- */
-
- protected void runLostMsgTest(Destination dest,
- int num_msg,
- int num_send_per_sess,
- int num_recv_per_sess,
- boolean topic_f) throws Exception {
- Thread prod_thread;
- Thread cons_thread;
- String tag;
- Session sess;
- MessageProducer prod;
- MessageConsumer cons;
- int ack_mode;
-
- //
- // Start the producer
- //
-
- tag = "prod";
- log(">> Starting producer " + tag);
-
- sess = JMS_conn.createSession((num_send_per_sess > 1), Session.AUTO_ACKNOWLEDGE);
- prod = sess.createProducer(dest);
-
- prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess);
- prod_thread.start();
- log("Started producer " + tag);
-
- //
- // Delay before starting consumers
- //
-
- log("Waiting before starting consumers");
- java.lang.Thread.sleep(Consumer_startup_delay_ms);
-
- //
- // Now create and start the consumer
- //
-
- tag = "cons";
- log(">> Starting consumer");
-
- if (num_recv_per_sess > 1)
- ack_mode = Session.CLIENT_ACKNOWLEDGE;
- else
- ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- sess = JMS_conn.createSession(false, ack_mode);
- cons = sess.createConsumer(dest);
-
- cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess);
- cons_thread.start();
- log("Started consumer " + tag);
-
- //
- // Wait for the producer and consumer to finish.
- //
-
- log("< waiting for producer.");
- prod_thread.join();
-
- log("< waiting for consumer.");
- cons_thread.join();
-
- log("Shutting down");
- }
-
- // // ////
- // // INTERNAL CLASSES ////
- // // ////
-
- /**
- * Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop time is
- * reached, or a test error is detected.
- */
-
- protected class producerThread extends Thread {
-
- protected Session msgSess;
- protected MessageProducer msgProd;
- protected String producerTag;
- protected int numMsg;
- protected int numPerSess;
- protected long producer_stop_time;
-
- producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) {
- super();
-
- producer_stop_time = 0;
- msgSess = sess;
- msgProd = prod;
- producerTag = tag;
- numMsg = num_msg;
- numPerSess = sess_size;
- }
-
- public void execTest() throws Exception {
- Message msg;
- int sess_start;
- int cur;
-
- sess_start = 0;
- cur = 0;
- while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
- msg = msgSess.createTextMessage("test message from " + producerTag);
- msg.setStringProperty("testprodtag", producerTag);
- msg.setIntProperty("seq", cur);
-
- if (msg instanceof ActiveMQMessage) {
- ((ActiveMQMessage) msg).setResponseRequired(true);
- }
-
- //
- // Send the message.
- //
-
- msgProd.send(msg);
- cur++;
-
- //
- // Commit if the number of messages per session has been reached, and
- // transactions are being used (only when > 1 msg per sess).
- //
-
- if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
- msgSess.commit();
- sess_start = cur;
- }
- }
-
- // Make sure to send the final commit, if there were sends since the last commit.
- if ((numPerSess > 1) && ((cur - sess_start) > 0))
- msgSess.commit();
-
- if (cur < numMsg)
- log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() + " (stop time " + producer_stop_time + ")");
- }
-
- /**
- * Check whether it is time for the producer to terminate.
- */
-
- protected boolean didTimeOut() {
- if ((Producer_stop_time > 0) && (java.lang.System.nanoTime() >= Producer_stop_time))
- return true;
-
- return false;
- }
-
- /**
- * Run the producer.
- */
-
- @Override
- public void run() {
- try {
- log("- running producer " + producerTag);
- execTest();
- log("- finished running producer " + producerTag);
- }
- catch (Throwable thrown) {
- Num_error++;
- fail("producer " + producerTag + " failed: " + thrown.getMessage());
- throw new Error("producer " + producerTag + " failed", thrown);
- }
- }
-
- @Override
- public String toString() {
- return producerTag;
- }
- }
-
- /**
- * Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop time
- * is reached, or a test error is detected.
- */
-
- protected class consumerThread extends Thread {
-
- protected Session msgSess;
- protected MessageConsumer msgCons;
- protected String consumerTag;
- protected int numMsg;
- protected int numPerSess;
-
- consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) {
- super();
-
- msgSess = sess;
- msgCons = cons;
- consumerTag = tag;
- numMsg = num_msg;
- numPerSess = sess_size;
- }
-
- public void execTest() throws Exception {
- Message msg;
- int sess_start;
- int cur;
-
- msg = null;
- sess_start = 0;
- cur = 0;
-
- while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) {
- //
- // Use a timeout of 1 second to periodically check the consumer timeout.
- //
- msg = msgCons.receive(1000);
- if (msg != null) {
- checkMessage(msg, cur);
- cur++;
-
- if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) {
- msg.acknowledge();
- sess_start = cur;
- }
- }
- }
-
- // Acknowledge the last messages, if they were not yet acknowledged.
- if ((numPerSess > 1) && ((cur - sess_start) > 0))
- msg.acknowledge();
-
- if (cur < numMsg)
- log("* Consumer " + consumerTag + " timed out");
- }
-
- /**
- * Check whether it is time for the consumer to terminate.
- */
-
- protected boolean didTimeOut() {
- if ((Consumer_stop_time > 0) && (java.lang.System.nanoTime() >= Consumer_stop_time))
- return true;
-
- return false;
- }
-
- /**
- * Verify the message received. Sequence numbers are checked and are expected to exactly match the message
- * number (starting at 0).
- */
-
- protected void checkMessage(Message msg, int exp_seq) throws javax.jms.JMSException {
- int seq;
-
- seq = msg.getIntProperty("seq");
-
- if (exp_seq != seq) {
- Num_error++;
- fail("*** Consumer " + consumerTag + " expected seq " + exp_seq + "; received " + seq);
- }
- }
-
- /**
- * Run the consumer.
- */
-
- @Override
- public void run() {
- try {
- log("- running consumer " + consumerTag);
- execTest();
- log("- running consumer " + consumerTag);
- }
- catch (Throwable thrown) {
- Num_error++;
- fail("consumer " + consumerTag + " failed: " + thrown.getMessage());
- throw new Error("consumer " + consumerTag + " failed", thrown);
- }
- }
-
- @Override
- public String toString() {
- return consumerTag;
- }
- }
-}
\ No newline at end of file