You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:18 UTC
[09/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java
deleted file mode 100644
index 2575afd..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.perf;
-
-import java.io.File;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.perf.SimpleQueueTest;
-import org.apache.activemq.store.kahadb.TempKahaDBStore;
-
-/**
- *
- */
-public class TempKahaStoreQueueTest extends SimpleQueueTest {
-
- @Override
- protected void configureBroker(BrokerService answer, String uri) throws Exception {
- File dataFileDir = new File("target/test-amq-data/perfTest/temp-amqdb");
- dataFileDir.mkdirs();
- answer.setDeleteAllMessagesOnStartup(true);
-
- TempKahaDBStore adaptor = new TempKahaDBStore();
- adaptor.setDirectory(dataFileDir);
-
- answer.setDataDirectoryFile(dataFileDir);
- answer.setPersistenceAdapter(adaptor);
- answer.addConnector(uri);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
deleted file mode 100644
index 6626603..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.plist;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.*;
-import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
-import org.apache.activemq.broker.region.cursors.FilePendingMessageCursorTestSupport;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.usage.SystemUsage;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursorTestSupport {
-
- @Test
- public void testAddRemoveAddIndexSize() throws Exception {
- brokerService = new BrokerService();
- brokerService.setUseJmx(false);
- SystemUsage usage = brokerService.getSystemUsage();
- usage.getMemoryUsage().setLimit(1024 * 150);
- String body = new String(new byte[1024]);
- Destination destination = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null);
-
- underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
- underTest.setSystemUsage(usage);
-
- LOG.info("start");
- final PageFile pageFile = ((PListImpl) underTest.getDiskList()).getPageFile();
- LOG.info("page count: " + pageFile.getPageCount());
- LOG.info("free count: " + pageFile.getFreePageCount());
- LOG.info("content size: " + pageFile.getPageContentSize());
-
- final long initialPageCount = pageFile.getPageCount();
-
- final int numMessages = 1000;
-
- for (int j = 0; j < 10; j++) {
- // ensure free pages are reused
- for (int i = 0; i < numMessages; i++) {
- ActiveMQMessage mqMessage = new ActiveMQMessage();
- mqMessage.setStringProperty("body", body);
- mqMessage.setMessageId(new MessageId("1:2:3:" + i));
- mqMessage.setMemoryUsage(usage.getMemoryUsage());
- mqMessage.setRegionDestination(destination);
- underTest.addMessageLast(new IndirectMessageReference(mqMessage));
- }
- assertFalse("cursor is not full " + usage.getTempUsage(), underTest.isFull());
-
- underTest.reset();
- long receivedCount = 0;
- while (underTest.hasNext()) {
- MessageReference ref = underTest.next();
- underTest.remove();
- ref.decrementReferenceCount();
- assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
- }
- assertEquals("got all messages back", receivedCount, numMessages);
- LOG.info("page count: " + pageFile.getPageCount());
- LOG.info("free count: " + pageFile.getFreePageCount());
- LOG.info("content size: " + pageFile.getPageContentSize());
- }
-
- assertEquals("expected page usage", initialPageCount, pageFile.getPageCount() - pageFile.getFreePageCount());
-
- LOG.info("Destroy");
- underTest.destroy();
- LOG.info("page count: " + pageFile.getPageCount());
- LOG.info("free count: " + pageFile.getFreePageCount());
- LOG.info("content size: " + pageFile.getPageContentSize());
- assertEquals("expected page usage", initialPageCount - 1, pageFile.getPageCount() - pageFile.getFreePageCount());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
deleted file mode 100644
index 73adb52..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
+++ /dev/null
@@ -1,669 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.plist;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.store.PList;
-import org.apache.activemq.store.PListEntry;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PListTest {
-
- static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
- private PListStoreImpl store;
- private PListImpl plist;
- final ByteSequence payload = new ByteSequence(new byte[400]);
- final String idSeed = new String("Seed" + Arrays.toString(new byte[1024]));
- final Vector<Throwable> exceptions = new Vector<>();
- ExecutorService executor;
-
- private PListEntry getFirst(PList plist) throws IOException {
- PList.PListIterator iterator = plist.iterator();
- try {
- if (iterator.hasNext()) {
- return iterator.next();
- }
- else {
- return null;
- }
- }
- finally {
- iterator.release();
- }
- }
-
- @Test
- public void testAddLast() throws Exception {
- final int COUNT = 1000;
- Map<String, ByteSequence> map = new LinkedHashMap<>();
- for (int i = 0; i < COUNT; i++) {
- String test = new String("test" + i);
- ByteSequence bs = new ByteSequence(test.getBytes());
- map.put(test, bs);
- plist.addLast(test, bs);
- }
- assertEquals(plist.size(), COUNT);
- int count = 0;
- for (ByteSequence bs : map.values()) {
- String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
- PListEntry entry = plist.get(count);
- String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength());
- assertEquals(origStr, plistString);
- count++;
- }
- }
-
- @Test
- public void testAddFirst() throws Exception {
- final int COUNT = 1000;
- Map<String, ByteSequence> map = new LinkedHashMap<>();
- for (int i = 0; i < COUNT; i++) {
- String test = new String("test" + i);
- ByteSequence bs = new ByteSequence(test.getBytes());
- map.put(test, bs);
- plist.addFirst(test, bs);
- }
- assertEquals(plist.size(), COUNT);
- long count = plist.size() - 1;
- for (ByteSequence bs : map.values()) {
- String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
- PListEntry entry = plist.get(count);
- String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength());
- assertEquals(origStr, plistString);
- count--;
- }
- }
-
- @Test
- public void testRemove() throws IOException {
- doTestRemove(2000);
- }
-
- protected void doTestRemove(final int COUNT) throws IOException {
- Map<String, ByteSequence> map = new LinkedHashMap<>();
- for (int i = 0; i < COUNT; i++) {
- String test = new String("test" + i);
- ByteSequence bs = new ByteSequence(test.getBytes());
- map.put(test, bs);
- plist.addLast(test, bs);
- }
- assertEquals(plist.size(), COUNT);
- PListEntry entry = plist.getFirst();
- while (entry != null) {
- plist.remove(entry.getId());
- entry = plist.getFirst();
- }
- assertEquals(0, plist.size());
- }
-
- @Test
- public void testDestroy() throws Exception {
- doTestRemove(1);
- plist.destroy();
- assertEquals(0, plist.size());
- }
-
- @Test
- public void testDestroyNonEmpty() throws Exception {
- final int COUNT = 1000;
- Map<String, ByteSequence> map = new LinkedHashMap<>();
- for (int i = 0; i < COUNT; i++) {
- String test = new String("test" + i);
- ByteSequence bs = new ByteSequence(test.getBytes());
- map.put(test, bs);
- plist.addLast(test, bs);
- }
- plist.destroy();
- assertEquals(0, plist.size());
- }
-
- @Test
- public void testRemoveSecond() throws Exception {
- plist.addLast("First", new ByteSequence("A".getBytes()));
- plist.addLast("Second", new ByteSequence("B".getBytes()));
-
- assertTrue(plist.remove("Second"));
- assertTrue(plist.remove("First"));
- assertFalse(plist.remove("doesNotExist"));
- }
-
- @Test
- public void testRemoveSingleEntry() throws Exception {
- plist.addLast("First", new ByteSequence("A".getBytes()));
-
- Iterator<PListEntry> iterator = plist.iterator();
- while (iterator.hasNext()) {
- iterator.next();
- iterator.remove();
- }
- }
-
- @Test
- public void testRemoveSecondPosition() throws Exception {
- plist.addLast("First", new ByteSequence("A".getBytes()));
- plist.addLast("Second", new ByteSequence("B".getBytes()));
-
- assertTrue(plist.remove(1));
- assertTrue(plist.remove(0));
- assertFalse(plist.remove(0));
- }
-
- @Test
- public void testConcurrentAddRemove() throws Exception {
- File directory = store.getDirectory();
- store.stop();
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
- store = new PListStoreImpl();
- store.setCleanupInterval(400);
- store.setDirectory(directory);
- store.setJournalMaxFileLength(1024 * 5);
- store.setLazyInit(false);
- store.start();
-
- final ByteSequence payload = new ByteSequence(new byte[1024 * 2]);
-
- final Vector<Throwable> exceptions = new Vector<>();
- final int iterations = 1000;
- final int numLists = 10;
-
- final PList[] lists = new PList[numLists];
- String threadName = Thread.currentThread().getName();
- for (int i = 0; i < numLists; i++) {
- Thread.currentThread().setName("C:" + String.valueOf(i));
- lists[i] = store.getPList(String.valueOf(i));
- }
- Thread.currentThread().setName(threadName);
-
- executor = Executors.newFixedThreadPool(100);
- class A implements Runnable {
-
- @Override
- public void run() {
- final String threadName = Thread.currentThread().getName();
- try {
- for (int i = 0; i < iterations; i++) {
- PList candidate = lists[i % numLists];
- Thread.currentThread().setName("ALRF:" + candidate.getName());
- synchronized (plistLocks(candidate)) {
- Object locator = candidate.addLast(String.valueOf(i), payload);
- getFirst(candidate);
- assertTrue(candidate.remove(locator));
- }
- }
- }
- catch (Exception error) {
- LOG.error("Unexpcted ex", error);
- error.printStackTrace();
- exceptions.add(error);
- }
- finally {
- Thread.currentThread().setName(threadName);
- }
- }
- }
-
- class B implements Runnable {
-
- @Override
- public void run() {
- final String threadName = Thread.currentThread().getName();
- try {
- for (int i = 0; i < iterations; i++) {
- PList candidate = lists[i % numLists];
- Thread.currentThread().setName("ALRF:" + candidate.getName());
- synchronized (plistLocks(candidate)) {
- Object locator = candidate.addLast(String.valueOf(i), payload);
- getFirst(candidate);
- assertTrue(candidate.remove(locator));
- }
- }
- }
- catch (Exception error) {
- error.printStackTrace();
- exceptions.add(error);
- }
- finally {
- Thread.currentThread().setName(threadName);
- }
- }
- }
-
- executor.execute(new A());
- executor.execute(new A());
- executor.execute(new A());
- executor.execute(new B());
- executor.execute(new B());
- executor.execute(new B());
-
- executor.shutdown();
- boolean finishedInTime = executor.awaitTermination(30, TimeUnit.SECONDS);
-
- assertTrue("no exceptions", exceptions.isEmpty());
- assertTrue("finished ok", finishedInTime);
- }
-
- @Test
- public void testConcurrentAddLast() throws Exception {
- File directory = store.getDirectory();
- store.stop();
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
- store = new PListStoreImpl();
- store.setDirectory(directory);
- store.start();
-
- final int numThreads = 20;
- final int iterations = 1000;
- executor = Executors.newFixedThreadPool(100);
- for (int i = 0; i < numThreads; i++) {
- new Job(i, PListTest.TaskType.ADD, iterations).run();
- }
-
- for (int i = 0; i < numThreads; i++) {
- executor.execute(new Job(i, PListTest.TaskType.ITERATE, iterations));
- }
-
- for (int i = 0; i < 100; i++) {
- executor.execute(new Job(i + 20, PListTest.TaskType.ADD, 100));
- }
-
- executor.shutdown();
- boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS);
- assertTrue("finished ok", finishedInTime);
- }
-
- @Test
- public void testOverFlow() throws Exception {
- File directory = store.getDirectory();
- store.stop();
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
- store = new PListStoreImpl();
- store.setDirectory(directory);
- store.start();
-
- for (int i = 0; i < 2000; i++) {
- new Job(i, PListTest.TaskType.ADD, 5).run();
-
- }
- LOG.info("After Load index file: " + store.pageFile.getFile().length());
- LOG.info("After remove index file: " + store.pageFile.getFile().length());
- }
-
- @Test
- public void testConcurrentAddRemoveWithPreload() throws Exception {
- File directory = store.getDirectory();
- store.stop();
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
- store = new PListStoreImpl();
- store.setDirectory(directory);
- store.setJournalMaxFileLength(1024 * 5);
- store.setCleanupInterval(5000);
- store.setIndexWriteBatchSize(500);
- store.start();
-
- final int iterations = 500;
- final int numLists = 10;
-
- // prime the store
-
- // create/delete
- LOG.info("create");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.CREATE, iterations).run();
- }
-
- LOG.info("delete");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.DELETE, iterations).run();
- }
-
- LOG.info("fill");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.ADD, iterations).run();
- }
- LOG.info("remove");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.REMOVE, iterations).run();
- }
-
- LOG.info("check empty");
- for (int i = 0; i < numLists; i++) {
- assertEquals("empty " + i, 0, store.getPList("List-" + i).size());
- }
-
- LOG.info("delete again");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.DELETE, iterations).run();
- }
-
- LOG.info("fill again");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.ADD, iterations).run();
- }
-
- LOG.info("parallel add and remove");
- executor = Executors.newFixedThreadPool(numLists * 2);
- for (int i = 0; i < numLists * 2; i++) {
- executor.execute(new Job(i, i >= numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
- }
-
- executor.shutdown();
- LOG.info("wait for parallel work to complete");
- boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS);
- assertTrue("no exceptions", exceptions.isEmpty());
- assertTrue("finished ok", finishedInTime);
- }
-
- // for non determinant issues, increasing this may help diagnose
- final int numRepeats = 1;
-
- @Test
- public void testRepeatStressWithCache() throws Exception {
- for (int i = 0; i < numRepeats; i++) {
- do_testConcurrentAddIterateRemove(true);
- }
- }
-
- @Test
- public void testRepeatStressWithOutCache() throws Exception {
- for (int i = 0; i < numRepeats; i++) {
- do_testConcurrentAddIterateRemove(false);
- }
- }
-
- public void do_testConcurrentAddIterateRemove(boolean enablePageCache) throws Exception {
- File directory = store.getDirectory();
- store.stop();
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
- store = new PListStoreImpl();
- store.setIndexEnablePageCaching(enablePageCache);
- store.setIndexPageSize(2 * 1024);
- store.setDirectory(directory);
- store.start();
-
- final int iterations = 500;
- final int numLists = 10;
-
- LOG.info("create");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.CREATE, iterations).run();
- }
-
- LOG.info("fill");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.ADD, iterations).run();
- }
-
- LOG.info("parallel add and remove");
- executor = Executors.newFixedThreadPool(400);
- final int numProducer = 5;
- final int numConsumer = 10;
- for (int i = 0; i < numLists; i++) {
- for (int j = 0; j < numProducer; j++) {
- executor.execute(new Job(i, PListTest.TaskType.ADD, iterations * 2));
- }
- for (int k = 0; k < numConsumer; k++) {
- executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations / 4));
- }
- }
-
- for (int i = numLists; i < numLists * 10; i++) {
- executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
- }
-
- executor.shutdown();
- LOG.info("wait for parallel work to complete");
- boolean shutdown = executor.awaitTermination(60 * 60, TimeUnit.SECONDS);
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- assertTrue("test did not timeout ", shutdown);
- }
-
- @Test
- public void testConcurrentAddIterate() throws Exception {
- File directory = store.getDirectory();
- store.stop();
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
- store = new PListStoreImpl();
- store.setIndexPageSize(2 * 1024);
- store.setJournalMaxFileLength(1024 * 1024);
- store.setDirectory(directory);
- store.setCleanupInterval(-1);
- store.setIndexEnablePageCaching(false);
- store.setIndexWriteBatchSize(100);
- store.start();
-
- final int iterations = 250;
- final int numLists = 10;
-
- LOG.info("create");
- for (int i = 0; i < numLists; i++) {
- new Job(i, PListTest.TaskType.CREATE, iterations).run();
- }
-
- LOG.info("parallel add and iterate");
- // We want a lot of adds occurring so that new free pages get created
- // along
- // with overlapping seeks from the iterators so that we are likely to
- // seek into
- // some bad area in the page file.
- executor = Executors.newFixedThreadPool(100);
- final int numProducer = 30;
- final int numConsumer = 10;
- for (int i = 0; i < numLists; i++) {
- for (int j = 0; j < numProducer; j++) {
- executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
- }
- for (int k = 0; k < numConsumer; k++) {
- executor.execute(new Job(i, TaskType.ITERATE, iterations * 2));
- }
- }
-
- executor.shutdown();
- LOG.info("wait for parallel work to complete");
- boolean shutdown = executor.awaitTermination(5 * 60, TimeUnit.SECONDS);
- assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
- assertTrue("test did not timeout ", shutdown);
- LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
- }
-
- enum TaskType {
- CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE
- }
-
- class Job implements Runnable {
-
- int id;
- TaskType task;
- int iterations;
-
- public Job(int id, TaskType t, int iterations) {
- this.id = id;
- this.task = t;
- this.iterations = iterations;
- }
-
- @Override
- public void run() {
- final String threadName = Thread.currentThread().getName();
- try {
- PListImpl plist = null;
- switch (task) {
- case CREATE:
- Thread.currentThread().setName("C:" + id);
- plist = store.getPList(String.valueOf(id));
- LOG.info("Job-" + id + ", CREATE");
- break;
- case DELETE:
- Thread.currentThread().setName("D:" + id);
- store.removePList(String.valueOf(id));
- break;
- case ADD:
- Thread.currentThread().setName("A:" + id);
- plist = store.getPList(String.valueOf(id));
-
- for (int j = 0; j < iterations; j++) {
- synchronized (plistLocks(plist)) {
- if (exceptions.isEmpty()) {
- plist.addLast("PL>" + id + idSeed + "-" + j, payload);
- }
- else {
- break;
- }
- }
- }
-
- if (exceptions.isEmpty()) {
- LOG.info("Job-" + id + ", Add, done: " + iterations);
- }
- break;
- case REMOVE:
- Thread.currentThread().setName("R:" + id);
- plist = store.getPList(String.valueOf(id));
- synchronized (plistLocks(plist)) {
-
- for (int j = iterations - 1; j >= 0; j--) {
- plist.remove("PL>" + id + idSeed + "-" + j);
- if (j > 0 && j % (iterations / 2) == 0) {
- LOG.info("Job-" + id + " Done remove: " + j);
- }
- }
- }
- break;
- case ITERATE:
- Thread.currentThread().setName("I:" + id);
- plist = store.getPList(String.valueOf(id));
- int iterateCount = 0;
- synchronized (plistLocks(plist)) {
- if (exceptions.isEmpty()) {
- Iterator<PListEntry> iterator = plist.iterator();
- while (iterator.hasNext() && exceptions.isEmpty()) {
- iterator.next();
- iterateCount++;
- }
-
- // LOG.info("Job-" + id + " Done iterate: it=" +
- // iterator + ", count:" + iterateCount +
- // ", size:" + plist.size());
- if (plist.size() != iterateCount) {
- System.err.println("Count Wrong: " + iterator);
- }
- assertEquals("iterate got all " + id + " iterator:" + iterator, plist.size(), iterateCount);
- }
- }
- break;
-
- case ITERATE_REMOVE:
- Thread.currentThread().setName("IRM:" + id);
- plist = store.getPList(String.valueOf(id));
-
- int removeCount = 0;
- synchronized (plistLocks(plist)) {
-
- Iterator<PListEntry> removeIterator = plist.iterator();
-
- while (removeIterator.hasNext()) {
- removeIterator.next();
- removeIterator.remove();
- if (removeCount++ > iterations) {
- break;
- }
- }
- }
- LOG.info("Job-" + id + " Done remove: " + removeCount);
- break;
-
- default:
- }
-
- }
- catch (Exception e) {
- LOG.warn("Job[" + id + "] caught exception: " + e.getMessage());
- e.printStackTrace();
- exceptions.add(e);
- if (executor != null) {
- executor.shutdownNow();
- }
- }
- finally {
- Thread.currentThread().setName(threadName);
- }
- }
- }
-
- final Map<PList, Object> locks = new HashMap<>();
-
- private Object plistLocks(PList plist) {
- Object lock = null;
- synchronized (locks) {
- if (locks.containsKey(plist)) {
- lock = locks.get(plist);
- }
- else {
- lock = new Object();
- locks.put(plist, lock);
- }
- }
- return lock;
- }
-
- @Before
- public void setUp() throws Exception {
- File directory = new File("target/test/PlistDB");
- IOHelper.mkdirs(directory);
- IOHelper.deleteChildren(directory);
- startStore(directory);
-
- }
-
- protected void startStore(File directory) throws Exception {
- store = new PListStoreImpl();
- store.setDirectory(directory);
- store.start();
- plist = store.getPList("main");
- }
-
- @After
- public void tearDown() throws Exception {
- if (executor != null) {
- executor.shutdownNow();
- }
- store.stop();
- exceptions.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml
deleted file mode 100644
index 5042df8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
- <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
- useLoggingForShutdownErrors="true" useJmx="true"
- persistent="true" vmConnectorURI="vm://javacoola"
- useShutdownHook="false" deleteAllMessagesOnStartup="true">
-
- <amq:persistenceAdapter>
- <amq:kahaDB directory = "target/activemq-data">
- <amq:locker>
- <amq:shared-file-locker lockAcquireSleepInterval="5000"/>
- </amq:locker>
- </amq:kahaDB>
- </amq:persistenceAdapter>
-
- <amq:systemUsage>
- <amq:systemUsage>
- <amq:memoryUsage>
- <amq:memoryUsage limit="10 mb" percentUsageMinDelta="20"/>
- </amq:memoryUsage>
- <amq:storeUsage>
- <amq:storeUsage limit="1 gb" name="foo"/>
- </amq:storeUsage>
- <amq:tempUsage>
- <amq:tempUsage limit="100 mb"/>
- </amq:tempUsage>
- </amq:systemUsage>
- </amq:systemUsage>
-
- <amq:transportConnectors>
- <amq:transportConnector uri="tcp://localhost:61635"/>
- </amq:transportConnectors>
-
- </amq:broker>
-
-</beans>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java
deleted file mode 100644
index 3c28b3d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.leveldb;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.cursors.NegativeQueueTest;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.apache.activemq.util.IOHelper;
-
-import java.io.File;
-
-public class LevelDBNegativeQueueTest extends NegativeQueueTest {
-
- @Override
- protected void configureBroker(BrokerService answer) throws Exception {
- super.configureBroker(answer);
- LevelDBStore levelDBStore = new LevelDBStore();
- File directory = new File("target/activemq-data/leveldb");
- IOHelper.deleteChildren(directory);
- levelDBStore.setDirectory(directory);
- levelDBStore.deleteAllMessages();
- answer.setPersistenceAdapter(levelDBStore);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java
deleted file mode 100644
index 99583d5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.leveldb;
-
-import java.io.File;
-
-import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerTest;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.leveldb.LevelDBStore;
-
-/**
- * Once the wire format is completed we can test against real persistence storage.
- */
-public class LevelDBStoreBrokerTest extends BrokerTest {
-
- @Override
- protected void setUp() throws Exception {
- this.setAutoFail(true);
- super.setUp();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- LevelDBStore levelDBStore = new LevelDBStore();
- File directory = new File("target/activemq-data/leveldb");
- IOHelper.deleteChildren(directory);
- levelDBStore.setDirectory(directory);
- levelDBStore.deleteAllMessages();
- broker.setPersistenceAdapter(levelDBStore);
- return broker;
- }
-
- protected BrokerService createRestartedBroker() throws Exception {
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File("target/activemq-data/leveldb"));
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- public static Test suite() {
- return suite(LevelDBStoreBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
deleted file mode 100644
index 38f0213..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.streams;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQInputStream;
-import org.apache.activemq.ActiveMQOutputStream;
-import org.apache.activemq.JmsTestSupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-
-/**
- * JMSInputStreamTest
- */
-@Deprecated
-public class JMSInputStreamTest extends JmsTestSupport {
-
- public Destination destination;
- protected DataOutputStream out;
- protected DataInputStream in;
- private ActiveMQConnection connection2;
-
- private ActiveMQInputStream amqIn;
- private ActiveMQOutputStream amqOut;
-
- public static Test suite() {
- return suite(JMSInputStreamTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- public void initCombos() {
- addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC")});
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setAutoFail(true);
- super.setUp();
- }
-
- private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException {
- connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
- connections.add(connection2);
- if (props != null) {
- amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- else {
- amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination);
- }
-
- out = new DataOutputStream(amqOut);
- if (timeout == -1) {
- amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
- }
- else {
- amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout);
- }
- in = new DataInputStream(amqIn);
- }
-
- /*
- * @see TestCase#tearDown()
- */
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- }
-
- /**
- * Test for AMQ-3010
- */
- public void testInputStreamTimeout() throws Exception {
- long timeout = 500;
-
- setUpConnection(null, timeout);
- try {
- in.read();
- fail();
- }
- catch (ActiveMQInputStream.ReadTimeoutException e) {
- // timeout reached, everything ok
- }
- in.close();
- }
-
- // Test for AMQ-2988
- public void testStreamsWithProperties() throws Exception {
- String name1 = "PROPERTY_1";
- String name2 = "PROPERTY_2";
- String value1 = "VALUE_1";
- String value2 = "VALUE_2";
- Map<String, Object> jmsProperties = new HashMap<>();
- jmsProperties.put(name1, value1);
- jmsProperties.put(name2, value2);
- setUpConnection(jmsProperties, -1);
-
- out.writeInt(4);
- out.flush();
- assertTrue(in.readInt() == 4);
- out.writeFloat(2.3f);
- out.flush();
- assertTrue(in.readFloat() == 2.3f);
- String str = "this is a test string";
- out.writeUTF(str);
- out.flush();
- assertTrue(in.readUTF().equals(str));
- for (int i = 0; i < 100; i++) {
- out.writeLong(i);
- }
- out.flush();
-
- // check properties before we try to read the stream
- checkProperties(jmsProperties);
-
- for (int i = 0; i < 100; i++) {
- assertTrue(in.readLong() == i);
- }
-
- // check again after read was done
- checkProperties(jmsProperties);
- }
-
- public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception {
- String name1 = "PROPERTY_1";
- String name2 = "PROPERTY_2";
- String value1 = "VALUE_1";
- String value2 = "VALUE_2";
- Map<String, Object> jmsProperties = new HashMap<>();
- jmsProperties.put(name1, value1);
- jmsProperties.put(name2, value2);
-
- ActiveMQDestination dest = (ActiveMQDestination) destination;
-
- if (dest.isQueue()) {
- destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
- }
- else {
- destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
- }
-
- setUpConnection(jmsProperties, -1);
-
- assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly());
-
- out.writeInt(4);
- out.flush();
- assertTrue(in.readInt() == 4);
- out.writeFloat(2.3f);
- out.flush();
- assertTrue(in.readFloat() == 2.3f);
- String str = "this is a test string";
- out.writeUTF(str);
- out.flush();
- assertTrue(in.readUTF().equals(str));
- for (int i = 0; i < 100; i++) {
- out.writeLong(i);
- }
- out.flush();
-
- // check properties before we try to read the stream
- checkProperties(jmsProperties);
-
- for (int i = 0; i < 100; i++) {
- assertTrue(in.readLong() == i);
- }
-
- // check again after read was done
- checkProperties(jmsProperties);
- }
-
- // check if the received stream has the properties set
- // Test for AMQ-2988
- private void checkProperties(Map<String, Object> jmsProperties) throws IOException {
- Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
-
- // we should at least have the same amount or more properties
- assertTrue(jmsProperties.size() <= receivedJmsProps.size());
-
- // check the properties to see if we have everything in there
- Iterator<String> propsIt = jmsProperties.keySet().iterator();
- while (propsIt.hasNext()) {
- String key = propsIt.next();
- assertTrue(receivedJmsProps.containsKey(key));
- assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
- }
- }
-
- public void testLarge() throws Exception {
- setUpConnection(null, -1);
-
- final int testData = 23;
- final int dataLength = 4096;
- final int count = 1024;
- byte[] data = new byte[dataLength];
- for (int i = 0; i < data.length; i++) {
- data[i] = testData;
- }
- final AtomicBoolean complete = new AtomicBoolean(false);
- Thread runner = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- for (int x = 0; x < count; x++) {
- byte[] b = new byte[2048];
- in.readFully(b);
- for (int i = 0; i < b.length; i++) {
- assertTrue(b[i] == testData);
- }
- }
- complete.set(true);
- synchronized (complete) {
- complete.notify();
- }
- }
- catch (Exception ex) {
- ex.printStackTrace();
- }
- }
- });
- runner.start();
- for (int i = 0; i < count; i++) {
- out.write(data);
- }
- out.flush();
- synchronized (complete) {
- while (!complete.get()) {
- complete.wait(30000);
- }
- }
- assertTrue(complete.get());
- }
-
- public void testStreams() throws Exception {
- setUpConnection(null, -1);
- out.writeInt(4);
- out.flush();
- assertTrue(in.readInt() == 4);
- out.writeFloat(2.3f);
- out.flush();
- assertTrue(in.readFloat() == 2.3f);
- String str = "this is a test string";
- out.writeUTF(str);
- out.flush();
- assertTrue(in.readUTF().equals(str));
- for (int i = 0; i < 100; i++) {
- out.writeLong(i);
- }
- out.flush();
-
- for (int i = 0; i < 100; i++) {
- assertTrue(in.readLong() == i);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
new file mode 100755
index 0000000..45be088
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ *
+ */
+public class JmsResourceProvider {
+
+ private String serverUri = "vm://localhost?broker.persistent=false";
+ private boolean transacted;
+ private int ackMode = Session.AUTO_ACKNOWLEDGE;
+ private boolean isTopic;
+ private int deliveryMode = DeliveryMode.PERSISTENT;
+ private String durableName = "DummyName";
+ private String clientID = getClass().getName();
+
+ /**
+ * Creates a connection factory.
+ *
+ * @see org.apache.activemq.test.JmsResourceProvider#createConnectionFactory()
+ */
+ public ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(serverUri);
+ }
+
+ /**
+ * Creates a connection.
+ *
+ * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+ */
+ public Connection createConnection(ConnectionFactory cf) throws JMSException {
+ Connection connection = cf.createConnection();
+ if (getClientID() != null) {
+ connection.setClientID(getClientID());
+ }
+ return connection;
+ }
+
+ /**
+ * @see org.apache.activemq.test.JmsResourceProvider#createSession(javax.jms.Connection)
+ */
+ public Session createSession(Connection conn) throws JMSException {
+ return conn.createSession(transacted, ackMode);
+ }
+
+ /**
+ * @see org.apache.activemq.test.JmsResourceProvider#createConsumer(javax.jms.Session,
+ * javax.jms.Destination)
+ */
+ public MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
+ if (isDurableSubscriber()) {
+ return session.createDurableSubscriber((Topic)destination, durableName);
+ }
+ return session.createConsumer(destination);
+ }
+
+ /**
+ * Creates a connection for a consumer.
+ *
+ * @param ssp - ServerSessionPool
+ * @return ConnectionConsumer
+ */
+ public ConnectionConsumer createConnectionConsumer(Connection connection, Destination destination, ServerSessionPool ssp) throws JMSException {
+ return connection.createConnectionConsumer(destination, null, ssp, 1);
+ }
+
+ /**
+ * Creates a producer.
+ *
+ * @see org.apache.activemq.test.JmsResourceProvider#createProducer(javax.jms.Session,
+ * javax.jms.Destination)
+ */
+ public MessageProducer createProducer(Session session, Destination destination) throws JMSException {
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+ return producer;
+ }
+
+ /**
+ * Creates a destination, which can either a topic or a queue.
+ *
+ * @see org.apache.activemq.test.JmsResourceProvider#createDestination(javax.jms.Session,
+ * String)
+ */
+ public Destination createDestination(Session session, String name) throws JMSException {
+ if (isTopic) {
+ return session.createTopic("TOPIC." + name);
+ } else {
+ return session.createQueue("QUEUE." + name);
+ }
+ }
+
+ /**
+ * Returns true if the subscriber is durable.
+ *
+ * @return isDurableSubscriber
+ */
+ public boolean isDurableSubscriber() {
+ return isTopic && durableName != null;
+ }
+
+ /**
+ * Returns the acknowledgement mode.
+ *
+ * @return Returns the ackMode.
+ */
+ public int getAckMode() {
+ return ackMode;
+ }
+
+ /**
+ * Sets the acnknowledgement mode.
+ *
+ * @param ackMode The ackMode to set.
+ */
+ public void setAckMode(int ackMode) {
+ this.ackMode = ackMode;
+ }
+
+ /**
+ * Returns true if the destination is a topic, false if the destination is a
+ * queue.
+ *
+ * @return Returns the isTopic.
+ */
+ public boolean isTopic() {
+ return isTopic;
+ }
+
+ /**
+ * @param isTopic The isTopic to set.
+ */
+ public void setTopic(boolean isTopic) {
+ this.isTopic = isTopic;
+ }
+
+ /**
+ * Returns the server URI.
+ *
+ * @return Returns the serverUri.
+ */
+ public String getServerUri() {
+ return serverUri;
+ }
+
+ /**
+ * Sets the server URI.
+ *
+ * @param serverUri - the server URI to set.
+ */
+ public void setServerUri(String serverUri) {
+ this.serverUri = serverUri;
+ }
+
+ /**
+ * Return true if the session is transacted.
+ *
+ * @return Returns the transacted.
+ */
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ /**
+ * Sets the session to be transacted.
+ *
+ * @param transacted
+ */
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ if (transacted) {
+ setAckMode(Session.SESSION_TRANSACTED);
+ }
+ }
+
+ /**
+ * Returns the delivery mode.
+ *
+ * @return deliveryMode
+ */
+ public int getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ /**
+ * Sets the delivery mode.
+ *
+ * @param deliveryMode
+ */
+ public void setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ }
+
+ /**
+ * Returns the client id.
+ *
+ * @return clientID
+ */
+ public String getClientID() {
+ return clientID;
+ }
+
+ /**
+ * Sets the client id.
+ *
+ * @param clientID
+ */
+ public void setClientID(String clientID) {
+ this.clientID = clientID;
+ }
+
+ /**
+ * Returns the durable name of the provider.
+ *
+ * @return durableName
+ */
+ public String getDurableName() {
+ return durableName;
+ }
+
+ /**
+ * Sets the durable name of the provider.
+ *
+ * @param durableName
+ */
+ public void setDurableName(String durableName) {
+ this.durableName = durableName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
index 1cfea7b..ddc6cd8 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
@@ -23,6 +23,8 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +92,8 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport {
/** TODO we should be able to shut down properly */
session.close();
connection.close();
+ ArtemisBrokerHelper.stopArtemisBroker();
+ TcpTransportFactory.clearService();
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java
index 00452d1..c999d9a 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveWithTwoConnectionsTest.java
@@ -23,6 +23,8 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,6 +105,8 @@ public class JmsTopicSendReceiveWithTwoConnectionsTest extends JmsSendReceiveTes
receiveSession.close();
sendConnection.close();
receiveConnection.close();
+ TcpTransportFactory.clearService();
+ ArtemisBrokerHelper.stopArtemisBroker();
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java
new file mode 100755
index 0000000..be32877
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.reflect.Array;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Useful base class for unit test cases
+ *
+ *
+ */
+public abstract class TestSupport extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(TestSupport.class);
+
+ protected ActiveMQConnectionFactory connectionFactory;
+ protected boolean topic = true;
+
+ public TestSupport() {
+ super();
+ }
+
+ public TestSupport(String name) {
+ super(name);
+ }
+
+ /**
+ * Creates an ActiveMQMessage.
+ *
+ * @return ActiveMQMessage
+ */
+ protected ActiveMQMessage createMessage() {
+ return new ActiveMQMessage();
+ }
+
+ /**
+ * Creates a destination.
+ *
+ * @param subject - topic or queue name.
+ * @return Destination - either an ActiveMQTopic or ActiveMQQUeue.
+ */
+ protected Destination createDestination(String subject) {
+ if (topic) {
+ return new ActiveMQTopic(subject);
+ } else {
+ return new ActiveMQQueue(subject);
+ }
+ }
+
+ /**
+ * Tests if firstSet and secondSet are equal.
+ *
+ * @param messsage - string to be displayed when the assertion fails.
+ * @param firstSet[] - set of messages to be compared with its counterpart
+ * in the secondset.
+ * @param secondSet[] - set of messages to be compared with its counterpart
+ * in the firstset.
+ * @throws javax.jms.JMSException
+ */
+ protected void assertTextMessagesEqual(Message[] firstSet, Message[] secondSet) throws JMSException {
+ assertTextMessagesEqual("", firstSet, secondSet);
+ }
+
+ /**
+ * Tests if firstSet and secondSet are equal.
+ *
+ * @param messsage - string to be displayed when the assertion fails.
+ * @param firstSet[] - set of messages to be compared with its counterpart
+ * in the secondset.
+ * @param secondSet[] - set of messages to be compared with its counterpart
+ * in the firstset.
+ */
+ protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet) throws JMSException {
+ assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length);
+
+ for (int i = 0; i < secondSet.length; i++) {
+ TextMessage m1 = (TextMessage)firstSet[i];
+ TextMessage m2 = (TextMessage)secondSet[i];
+ assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2);
+ }
+ }
+
+ /**
+ * Tests if m1 and m2 are equal.
+ *
+ * @param m1 - message to be compared with m2.
+ * @param m2 - message to be compared with m1.
+ * @throws javax.jms.JMSException
+ */
+ protected void assertEquals(TextMessage m1, TextMessage m2) throws JMSException {
+ assertEquals("", m1, m2);
+ }
+
+ /**
+ * Tests if m1 and m2 are equal.
+ *
+ * @param message - string to be displayed when the assertion fails.
+ * @param m1 - message to be compared with m2.
+ * @param m2 - message to be compared with m1.
+ */
+ protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException {
+ assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
+
+ if (m1 == null) {
+ return;
+ }
+
+ assertEquals(message, m1.getText(), m2.getText());
+ }
+
+ /**
+ * Tests if m1 and m2 are equal.
+ *
+ * @param m1 - message to be compared with m2.
+ * @param m2 - message to be compared with m1.
+ * @throws javax.jms.JMSException
+ */
+ protected void assertEquals(Message m1, Message m2) throws JMSException {
+ assertEquals("", m1, m2);
+ }
+
+ /**
+ * Tests if m1 and m2 are equal.
+ *
+ * @param message - error message.
+ * @param m1 - message to be compared with m2.
+ * @param m2 -- message to be compared with m1.
+ */
+ protected void assertEquals(String message, Message m1, Message m2) throws JMSException {
+ assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
+
+ if (m1 == null) {
+ return;
+ }
+
+ assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getClass() == m2.getClass());
+
+ if (m1 instanceof TextMessage) {
+ assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2);
+ } else {
+ assertEquals(message, m1, m2);
+ }
+ }
+
+ /**
+ * Test if base directory contains spaces
+ */
+ protected void assertBaseDirectoryContainsSpaces() {
+ assertFalse("Base directory cannot contain spaces.", new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" "));
+ }
+
+ /**
+ * Creates an ActiveMQConnectionFactory.
+ *
+ * @return ActiveMQConnectionFactory
+ * @throws Exception
+ */
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ }
+
+ /**
+ * Factory method to create a new connection.
+ *
+ * @return connection
+ * @throws Exception
+ */
+ protected Connection createConnection() throws Exception {
+ return getConnectionFactory().createConnection();
+ }
+
+ /**
+ * Creates an ActiveMQ connection factory.
+ *
+ * @return connectionFactory
+ * @throws Exception
+ */
+ public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
+ if (connectionFactory == null) {
+ connectionFactory = createConnectionFactory();
+ assertTrue("Should have created a connection factory!", connectionFactory != null);
+ }
+
+ return connectionFactory;
+ }
+
+ /**
+ * Returns the consumer subject.
+ *
+ * @return String
+ */
+ protected String getConsumerSubject() {
+ return getSubject();
+ }
+
+ /**
+ * Returns the producer subject.
+ *
+ * @return String
+ */
+ protected String getProducerSubject() {
+ return getSubject();
+ }
+
+ /**
+ * Returns the subject.
+ *
+ * @return String
+ */
+ protected String getSubject() {
+ return getClass().getName() + "." + getName();
+ }
+
+ protected void assertArrayEqual(String message, Object[] expected, Object[] actual) {
+ assertEquals(message + ". Array length", expected.length, actual.length);
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals(message + ". element: " + i, expected[i], actual[i]);
+ }
+ }
+
+ protected void assertPrimitiveArrayEqual(String message, Object expected, Object actual) {
+ int length = Array.getLength(expected);
+ assertEquals(message + ". Array length", length, Array.getLength(actual));
+ for (int i = 0; i < length; i++) {
+ assertEquals(message + ". element: " + i, Array.get(expected, i), Array.get(actual, i));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
index 9902bd2..56d1546 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.activemq.transport;
+import org.junit.Before;
+
/**
*
*/
public class QueueClusterTest extends TopicClusterTest {
@Override
- protected void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
topic = false;
super.setUp();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
index 1b95006..0b62b31 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
@@ -21,42 +21,65 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
-
-import junit.framework.Test;
+import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsTestSupport;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SocketProxy;
-import org.apache.activemq.util.URISupport;
-import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SoWriteTimeoutClientTest extends JmsTestSupport {
+public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutClientTest.class);
+ private String messageTextPrefix = "";
+ private EmbeddedJMS server;
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ //this thread keeps alive in original test too. Exclude it.
+ ThreadLeakCheckRule.addKownThread("WriteTimeoutFilter-Timeout");
+ }
+
+ @AfterClass
+ public static void afterTest() throws Exception {
+ ThreadLeakCheckRule.removeKownThread("WriteTimeoutFilter-Timeout");
+ }
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
- KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
- adapter.setConcurrentStoreAndDispatchQueues(false);
- broker.setPersistenceAdapter(adapter);
- broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
- return broker;
+ @Before
+ public void setUp() throws Exception {
+ Configuration config = this.createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
}
+ @After
+ public void tearDown() throws Exception {
+ server.stop();
+ }
+
+ @Test
public void testSendWithClientWriteTimeout() throws Exception {
final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
messageTextPrefix = initMessagePrefix(80 * 1024);
- URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
+ URI tcpBrokerUri = new URI(newURI(0));
LOG.info("consuming using uri: " + tcpBrokerUri);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
@@ -66,51 +89,72 @@ public class SoWriteTimeoutClientTest extends JmsTestSupport {
MessageConsumer consumer = session.createConsumer(dest);
SocketProxy proxy = new SocketProxy();
- proxy.setTarget(tcpBrokerUri);
- proxy.open();
-
- ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
- final Connection pc = pFactory.createConnection();
- pc.start();
- proxy.pause();
-
- final int messageCount = 20;
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- sendMessages(pc, dest, messageCount);
- }
- catch (Exception ignored) {
- ignored.printStackTrace();
+ try {
+ proxy.setTarget(tcpBrokerUri);
+ proxy.open();
+
+ ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
+ final Connection pc = pFactory.createConnection();
+ try {
+ pc.start();
+ proxy.pause();
+
+ final int messageCount = 20;
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sendMessages(pc, dest, messageCount);
+ }
+ catch (Exception ignored) {
+ ignored.printStackTrace();
+ }
+ }
+ });
+
+ // wait for timeout and reconnect
+ TimeUnit.SECONDS.sleep(8);
+ proxy.goOn();
+ for (int i = 0; i < messageCount; i++) {
+ TextMessage m = (TextMessage) consumer.receive(10000);
+ Assert.assertNotNull("Got message " + i + " after reconnect", m);
}
- }
- });
- // wait for timeout and reconnect
- TimeUnit.SECONDS.sleep(8);
- proxy.goOn();
- for (int i = 0; i < messageCount; i++) {
- assertNotNull("Got message " + i + " after reconnect", consumer.receive(5000));
+ Assert.assertNull(consumer.receive(5000));
+ }
+ finally {
+ pc.close();
+ }
+ }
+ finally {
+ proxy.close();
+ c.close();
}
- assertTrue("no pending messages when done", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
+ }
- LOG.info("current total message count: " + broker.getAdminView().getTotalMessageCount());
- return broker.getAdminView().getTotalMessageCount() == 0;
- }
- }));
+ protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sendMessages(session, destination, count);
+ session.close();
+ }
+
+ protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
+ MessageProducer producer = session.createProducer(destination);
+ sendMessages(session, producer, count);
+ producer.close();
+ }
+
+ protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException {
+ for (int i = 0; i < count; i++) {
+ producer.send(session.createTextMessage(messageTextPrefix + i));
+ }
}
private String initMessagePrefix(int i) {
byte[] content = new byte[i];
return new String(content);
}
+}
- public static Test suite() {
- return suite(SoWriteTimeoutClientTest.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
index 5157c33..c2a7d24 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
@@ -17,9 +17,6 @@
package org.apache.activemq.transport;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
@@ -33,22 +30,23 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.ServiceStopper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
-public class TopicClusterTest extends TestCase implements MessageListener {
+public class TopicClusterTest extends OpenwireArtemisBaseTest implements MessageListener {
protected static final int MESSAGE_COUNT = 50;
protected static final int NUMBER_IN_CLUSTER = 3;
@@ -60,11 +58,11 @@ public class TopicClusterTest extends TestCase implements MessageListener {
protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
protected MessageProducer[] producers;
protected Connection[] connections;
- protected List<BrokerService> services = new ArrayList<>();
+ protected EmbeddedJMS[] servers = new EmbeddedJMS[NUMBER_IN_CLUSTER];
protected String groupId;
- @Override
- protected void setUp() throws Exception {
+ @Before
+ public void setUp() throws Exception {
groupId = "topic-cluster-test-" + System.currentTimeMillis();
connections = new Connection[NUMBER_IN_CLUSTER];
producers = new MessageProducer[NUMBER_IN_CLUSTER];
@@ -73,11 +71,13 @@ public class TopicClusterTest extends TestCase implements MessageListener {
if (root == null) {
root = "target/store";
}
+
+ this.setUpClusterServers(servers);
try {
for (int i = 0; i < NUMBER_IN_CLUSTER; i++) {
System.setProperty("activemq.store.dir", root + "_broker_" + i);
- connections[i] = createConnection("broker-" + i);
+ connections[i] = createConnection(i);
connections[i].setClientID("ClusterTest" + i);
connections[i].start();
Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -95,42 +95,35 @@ public class TopicClusterTest extends TestCase implements MessageListener {
}
}
- @Override
- protected void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
if (connections != null) {
for (int i = 0; i < connections.length; i++) {
- connections[i].close();
+ try {
+ connections[i].close();
+ } catch (Exception e) {
+ //ignore.
+ }
}
}
- ServiceStopper stopper = new ServiceStopper();
- stopper.stopServices(services);
+ this.shutDownClusterServers(servers);
}
protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
return session.createConsumer(destination);
}
- protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws Exception {
- BrokerService container = new BrokerService();
- container.setBrokerName(brokerName);
-
- String url = "tcp://localhost:0";
- TransportConnector connector = container.addConnector(url);
- connector.setDiscoveryUri(new URI("multicast://default?group=" + groupId));
- container.addNetworkConnector("multicast://default?group=" + groupId);
- container.start();
-
- services.add(container);
-
- return new ActiveMQConnectionFactory("vm://" + brokerName);
+ protected ActiveMQConnectionFactory createGenericClusterFactory(int serverID) throws Exception {
+ String url = newURI(serverID);
+ return new ActiveMQConnectionFactory(url);
}
protected int expectedReceiveCount() {
return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
}
- protected Connection createConnection(String name) throws Exception {
- return createGenericClusterFactory(name).createConnection();
+ protected Connection createConnection(int serverID) throws Exception {
+ return createGenericClusterFactory(serverID).createConnection();
}
protected Destination createDestination() {
@@ -146,10 +139,6 @@ public class TopicClusterTest extends TestCase implements MessageListener {
}
}
- /**
- * @param msg
- */
- @Override
public void onMessage(Message msg) {
// log.info("GOT: " + msg);
receivedMessageCount.incrementAndGet();
@@ -160,9 +149,7 @@ public class TopicClusterTest extends TestCase implements MessageListener {
}
}
- /**
- * @throws Exception
- */
+ @Test
public void testSendReceive() throws Exception {
for (int i = 0; i < MESSAGE_COUNT; i++) {
TextMessage textMessage = new ActiveMQTextMessage();
@@ -178,8 +165,8 @@ public class TopicClusterTest extends TestCase implements MessageListener {
}
// sleep a little - to check we don't get too many messages
Thread.sleep(2000);
- LOG.info("GOT: " + receivedMessageCount.get());
- assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
+ LOG.info("GOT: " + receivedMessageCount.get() + " Expected: " + expectedReceiveCount());
+ Assert.assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
}
}