You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/04 23:42:50 UTC

[06/58] [abbrv] activemq-artemis git commit: open wire changes equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java
deleted file mode 100644
index 2575afd..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/TempKahaStoreQueueTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.perf;
-
-import java.io.File;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.perf.SimpleQueueTest;
-import org.apache.activemq.store.kahadb.TempKahaDBStore;
-
-/**
- *
- */
-public class TempKahaStoreQueueTest extends SimpleQueueTest {
-
-   @Override
-   protected void configureBroker(BrokerService answer, String uri) throws Exception {
-      File dataFileDir = new File("target/test-amq-data/perfTest/temp-amqdb");
-      dataFileDir.mkdirs();
-      answer.setDeleteAllMessagesOnStartup(true);
-
-      TempKahaDBStore adaptor = new TempKahaDBStore();
-      adaptor.setDirectory(dataFileDir);
-
-      answer.setDataDirectoryFile(dataFileDir);
-      answer.setPersistenceAdapter(adaptor);
-      answer.addConnector(uri);
-   }
-
-}
-

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
deleted file mode 100644
index 6626603..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.plist;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.*;
-import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
-import org.apache.activemq.broker.region.cursors.FilePendingMessageCursorTestSupport;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.usage.SystemUsage;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursorTestSupport {
-
-   @Test
-   public void testAddRemoveAddIndexSize() throws Exception {
-      brokerService = new BrokerService();
-      brokerService.setUseJmx(false);
-      SystemUsage usage = brokerService.getSystemUsage();
-      usage.getMemoryUsage().setLimit(1024 * 150);
-      String body = new String(new byte[1024]);
-      Destination destination = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null);
-
-      underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
-      underTest.setSystemUsage(usage);
-
-      LOG.info("start");
-      final PageFile pageFile = ((PListImpl) underTest.getDiskList()).getPageFile();
-      LOG.info("page count: " + pageFile.getPageCount());
-      LOG.info("free count: " + pageFile.getFreePageCount());
-      LOG.info("content size: " + pageFile.getPageContentSize());
-
-      final long initialPageCount = pageFile.getPageCount();
-
-      final int numMessages = 1000;
-
-      for (int j = 0; j < 10; j++) {
-         // ensure free pages are reused
-         for (int i = 0; i < numMessages; i++) {
-            ActiveMQMessage mqMessage = new ActiveMQMessage();
-            mqMessage.setStringProperty("body", body);
-            mqMessage.setMessageId(new MessageId("1:2:3:" + i));
-            mqMessage.setMemoryUsage(usage.getMemoryUsage());
-            mqMessage.setRegionDestination(destination);
-            underTest.addMessageLast(new IndirectMessageReference(mqMessage));
-         }
-         assertFalse("cursor is not full " + usage.getTempUsage(), underTest.isFull());
-
-         underTest.reset();
-         long receivedCount = 0;
-         while (underTest.hasNext()) {
-            MessageReference ref = underTest.next();
-            underTest.remove();
-            ref.decrementReferenceCount();
-            assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
-         }
-         assertEquals("got all messages back", receivedCount, numMessages);
-         LOG.info("page count: " + pageFile.getPageCount());
-         LOG.info("free count: " + pageFile.getFreePageCount());
-         LOG.info("content size: " + pageFile.getPageContentSize());
-      }
-
-      assertEquals("expected page usage", initialPageCount, pageFile.getPageCount() - pageFile.getFreePageCount());
-
-      LOG.info("Destroy");
-      underTest.destroy();
-      LOG.info("page count: " + pageFile.getPageCount());
-      LOG.info("free count: " + pageFile.getFreePageCount());
-      LOG.info("content size: " + pageFile.getPageContentSize());
-      assertEquals("expected page usage", initialPageCount - 1, pageFile.getPageCount() - pageFile.getFreePageCount());
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
deleted file mode 100644
index 73adb52..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
+++ /dev/null
@@ -1,669 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.plist;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.store.PList;
-import org.apache.activemq.store.PListEntry;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PListTest {
-
-   static final Logger LOG = LoggerFactory.getLogger(PListTest.class);
-   private PListStoreImpl store;
-   private PListImpl plist;
-   final ByteSequence payload = new ByteSequence(new byte[400]);
-   final String idSeed = new String("Seed" + Arrays.toString(new byte[1024]));
-   final Vector<Throwable> exceptions = new Vector<>();
-   ExecutorService executor;
-
-   private PListEntry getFirst(PList plist) throws IOException {
-      PList.PListIterator iterator = plist.iterator();
-      try {
-         if (iterator.hasNext()) {
-            return iterator.next();
-         }
-         else {
-            return null;
-         }
-      }
-      finally {
-         iterator.release();
-      }
-   }
-
-   @Test
-   public void testAddLast() throws Exception {
-      final int COUNT = 1000;
-      Map<String, ByteSequence> map = new LinkedHashMap<>();
-      for (int i = 0; i < COUNT; i++) {
-         String test = new String("test" + i);
-         ByteSequence bs = new ByteSequence(test.getBytes());
-         map.put(test, bs);
-         plist.addLast(test, bs);
-      }
-      assertEquals(plist.size(), COUNT);
-      int count = 0;
-      for (ByteSequence bs : map.values()) {
-         String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
-         PListEntry entry = plist.get(count);
-         String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength());
-         assertEquals(origStr, plistString);
-         count++;
-      }
-   }
-
-   @Test
-   public void testAddFirst() throws Exception {
-      final int COUNT = 1000;
-      Map<String, ByteSequence> map = new LinkedHashMap<>();
-      for (int i = 0; i < COUNT; i++) {
-         String test = new String("test" + i);
-         ByteSequence bs = new ByteSequence(test.getBytes());
-         map.put(test, bs);
-         plist.addFirst(test, bs);
-      }
-      assertEquals(plist.size(), COUNT);
-      long count = plist.size() - 1;
-      for (ByteSequence bs : map.values()) {
-         String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
-         PListEntry entry = plist.get(count);
-         String plistString = new String(entry.getByteSequence().getData(), entry.getByteSequence().getOffset(), entry.getByteSequence().getLength());
-         assertEquals(origStr, plistString);
-         count--;
-      }
-   }
-
-   @Test
-   public void testRemove() throws IOException {
-      doTestRemove(2000);
-   }
-
-   protected void doTestRemove(final int COUNT) throws IOException {
-      Map<String, ByteSequence> map = new LinkedHashMap<>();
-      for (int i = 0; i < COUNT; i++) {
-         String test = new String("test" + i);
-         ByteSequence bs = new ByteSequence(test.getBytes());
-         map.put(test, bs);
-         plist.addLast(test, bs);
-      }
-      assertEquals(plist.size(), COUNT);
-      PListEntry entry = plist.getFirst();
-      while (entry != null) {
-         plist.remove(entry.getId());
-         entry = plist.getFirst();
-      }
-      assertEquals(0, plist.size());
-   }
-
-   @Test
-   public void testDestroy() throws Exception {
-      doTestRemove(1);
-      plist.destroy();
-      assertEquals(0, plist.size());
-   }
-
-   @Test
-   public void testDestroyNonEmpty() throws Exception {
-      final int COUNT = 1000;
-      Map<String, ByteSequence> map = new LinkedHashMap<>();
-      for (int i = 0; i < COUNT; i++) {
-         String test = new String("test" + i);
-         ByteSequence bs = new ByteSequence(test.getBytes());
-         map.put(test, bs);
-         plist.addLast(test, bs);
-      }
-      plist.destroy();
-      assertEquals(0, plist.size());
-   }
-
-   @Test
-   public void testRemoveSecond() throws Exception {
-      plist.addLast("First", new ByteSequence("A".getBytes()));
-      plist.addLast("Second", new ByteSequence("B".getBytes()));
-
-      assertTrue(plist.remove("Second"));
-      assertTrue(plist.remove("First"));
-      assertFalse(plist.remove("doesNotExist"));
-   }
-
-   @Test
-   public void testRemoveSingleEntry() throws Exception {
-      plist.addLast("First", new ByteSequence("A".getBytes()));
-
-      Iterator<PListEntry> iterator = plist.iterator();
-      while (iterator.hasNext()) {
-         iterator.next();
-         iterator.remove();
-      }
-   }
-
-   @Test
-   public void testRemoveSecondPosition() throws Exception {
-      plist.addLast("First", new ByteSequence("A".getBytes()));
-      plist.addLast("Second", new ByteSequence("B".getBytes()));
-
-      assertTrue(plist.remove(1));
-      assertTrue(plist.remove(0));
-      assertFalse(plist.remove(0));
-   }
-
-   @Test
-   public void testConcurrentAddRemove() throws Exception {
-      File directory = store.getDirectory();
-      store.stop();
-      IOHelper.mkdirs(directory);
-      IOHelper.deleteChildren(directory);
-      store = new PListStoreImpl();
-      store.setCleanupInterval(400);
-      store.setDirectory(directory);
-      store.setJournalMaxFileLength(1024 * 5);
-      store.setLazyInit(false);
-      store.start();
-
-      final ByteSequence payload = new ByteSequence(new byte[1024 * 2]);
-
-      final Vector<Throwable> exceptions = new Vector<>();
-      final int iterations = 1000;
-      final int numLists = 10;
-
-      final PList[] lists = new PList[numLists];
-      String threadName = Thread.currentThread().getName();
-      for (int i = 0; i < numLists; i++) {
-         Thread.currentThread().setName("C:" + String.valueOf(i));
-         lists[i] = store.getPList(String.valueOf(i));
-      }
-      Thread.currentThread().setName(threadName);
-
-      executor = Executors.newFixedThreadPool(100);
-      class A implements Runnable {
-
-         @Override
-         public void run() {
-            final String threadName = Thread.currentThread().getName();
-            try {
-               for (int i = 0; i < iterations; i++) {
-                  PList candidate = lists[i % numLists];
-                  Thread.currentThread().setName("ALRF:" + candidate.getName());
-                  synchronized (plistLocks(candidate)) {
-                     Object locator = candidate.addLast(String.valueOf(i), payload);
-                     getFirst(candidate);
-                     assertTrue(candidate.remove(locator));
-                  }
-               }
-            }
-            catch (Exception error) {
-               LOG.error("Unexpcted ex", error);
-               error.printStackTrace();
-               exceptions.add(error);
-            }
-            finally {
-               Thread.currentThread().setName(threadName);
-            }
-         }
-      }
-
-      class B implements Runnable {
-
-         @Override
-         public void run() {
-            final String threadName = Thread.currentThread().getName();
-            try {
-               for (int i = 0; i < iterations; i++) {
-                  PList candidate = lists[i % numLists];
-                  Thread.currentThread().setName("ALRF:" + candidate.getName());
-                  synchronized (plistLocks(candidate)) {
-                     Object locator = candidate.addLast(String.valueOf(i), payload);
-                     getFirst(candidate);
-                     assertTrue(candidate.remove(locator));
-                  }
-               }
-            }
-            catch (Exception error) {
-               error.printStackTrace();
-               exceptions.add(error);
-            }
-            finally {
-               Thread.currentThread().setName(threadName);
-            }
-         }
-      }
-
-      executor.execute(new A());
-      executor.execute(new A());
-      executor.execute(new A());
-      executor.execute(new B());
-      executor.execute(new B());
-      executor.execute(new B());
-
-      executor.shutdown();
-      boolean finishedInTime = executor.awaitTermination(30, TimeUnit.SECONDS);
-
-      assertTrue("no exceptions", exceptions.isEmpty());
-      assertTrue("finished ok", finishedInTime);
-   }
-
-   @Test
-   public void testConcurrentAddLast() throws Exception {
-      File directory = store.getDirectory();
-      store.stop();
-      IOHelper.mkdirs(directory);
-      IOHelper.deleteChildren(directory);
-      store = new PListStoreImpl();
-      store.setDirectory(directory);
-      store.start();
-
-      final int numThreads = 20;
-      final int iterations = 1000;
-      executor = Executors.newFixedThreadPool(100);
-      for (int i = 0; i < numThreads; i++) {
-         new Job(i, PListTest.TaskType.ADD, iterations).run();
-      }
-
-      for (int i = 0; i < numThreads; i++) {
-         executor.execute(new Job(i, PListTest.TaskType.ITERATE, iterations));
-      }
-
-      for (int i = 0; i < 100; i++) {
-         executor.execute(new Job(i + 20, PListTest.TaskType.ADD, 100));
-      }
-
-      executor.shutdown();
-      boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS);
-      assertTrue("finished ok", finishedInTime);
-   }
-
-   @Test
-   public void testOverFlow() throws Exception {
-      File directory = store.getDirectory();
-      store.stop();
-      IOHelper.mkdirs(directory);
-      IOHelper.deleteChildren(directory);
-      store = new PListStoreImpl();
-      store.setDirectory(directory);
-      store.start();
-
-      for (int i = 0; i < 2000; i++) {
-         new Job(i, PListTest.TaskType.ADD, 5).run();
-
-      }
-      LOG.info("After Load index file: " + store.pageFile.getFile().length());
-      LOG.info("After remove index file: " + store.pageFile.getFile().length());
-   }
-
-   @Test
-   public void testConcurrentAddRemoveWithPreload() throws Exception {
-      File directory = store.getDirectory();
-      store.stop();
-      IOHelper.mkdirs(directory);
-      IOHelper.deleteChildren(directory);
-      store = new PListStoreImpl();
-      store.setDirectory(directory);
-      store.setJournalMaxFileLength(1024 * 5);
-      store.setCleanupInterval(5000);
-      store.setIndexWriteBatchSize(500);
-      store.start();
-
-      final int iterations = 500;
-      final int numLists = 10;
-
-      // prime the store
-
-      // create/delete
-      LOG.info("create");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.CREATE, iterations).run();
-      }
-
-      LOG.info("delete");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.DELETE, iterations).run();
-      }
-
-      LOG.info("fill");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.ADD, iterations).run();
-      }
-      LOG.info("remove");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.REMOVE, iterations).run();
-      }
-
-      LOG.info("check empty");
-      for (int i = 0; i < numLists; i++) {
-         assertEquals("empty " + i, 0, store.getPList("List-" + i).size());
-      }
-
-      LOG.info("delete again");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.DELETE, iterations).run();
-      }
-
-      LOG.info("fill again");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.ADD, iterations).run();
-      }
-
-      LOG.info("parallel add and remove");
-      executor = Executors.newFixedThreadPool(numLists * 2);
-      for (int i = 0; i < numLists * 2; i++) {
-         executor.execute(new Job(i, i >= numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
-      }
-
-      executor.shutdown();
-      LOG.info("wait for parallel work to complete");
-      boolean finishedInTime = executor.awaitTermination(60 * 5, TimeUnit.SECONDS);
-      assertTrue("no exceptions", exceptions.isEmpty());
-      assertTrue("finished ok", finishedInTime);
-   }
-
-   // for non determinant issues, increasing this may help diagnose
-   final int numRepeats = 1;
-
-   @Test
-   public void testRepeatStressWithCache() throws Exception {
-      for (int i = 0; i < numRepeats; i++) {
-         do_testConcurrentAddIterateRemove(true);
-      }
-   }
-
-   @Test
-   public void testRepeatStressWithOutCache() throws Exception {
-      for (int i = 0; i < numRepeats; i++) {
-         do_testConcurrentAddIterateRemove(false);
-      }
-   }
-
-   public void do_testConcurrentAddIterateRemove(boolean enablePageCache) throws Exception {
-      File directory = store.getDirectory();
-      store.stop();
-      IOHelper.mkdirs(directory);
-      IOHelper.deleteChildren(directory);
-      store = new PListStoreImpl();
-      store.setIndexEnablePageCaching(enablePageCache);
-      store.setIndexPageSize(2 * 1024);
-      store.setDirectory(directory);
-      store.start();
-
-      final int iterations = 500;
-      final int numLists = 10;
-
-      LOG.info("create");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.CREATE, iterations).run();
-      }
-
-      LOG.info("fill");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.ADD, iterations).run();
-      }
-
-      LOG.info("parallel add and remove");
-      executor = Executors.newFixedThreadPool(400);
-      final int numProducer = 5;
-      final int numConsumer = 10;
-      for (int i = 0; i < numLists; i++) {
-         for (int j = 0; j < numProducer; j++) {
-            executor.execute(new Job(i, PListTest.TaskType.ADD, iterations * 2));
-         }
-         for (int k = 0; k < numConsumer; k++) {
-            executor.execute(new Job(i, TaskType.ITERATE_REMOVE, iterations / 4));
-         }
-      }
-
-      for (int i = numLists; i < numLists * 10; i++) {
-         executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
-      }
-
-      executor.shutdown();
-      LOG.info("wait for parallel work to complete");
-      boolean shutdown = executor.awaitTermination(60 * 60, TimeUnit.SECONDS);
-      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-      assertTrue("test did not  timeout ", shutdown);
-   }
-
-   @Test
-   public void testConcurrentAddIterate() throws Exception {
-      File directory = store.getDirectory();
-      store.stop();
-      IOHelper.mkdirs(directory);
-      IOHelper.deleteChildren(directory);
-      store = new PListStoreImpl();
-      store.setIndexPageSize(2 * 1024);
-      store.setJournalMaxFileLength(1024 * 1024);
-      store.setDirectory(directory);
-      store.setCleanupInterval(-1);
-      store.setIndexEnablePageCaching(false);
-      store.setIndexWriteBatchSize(100);
-      store.start();
-
-      final int iterations = 250;
-      final int numLists = 10;
-
-      LOG.info("create");
-      for (int i = 0; i < numLists; i++) {
-         new Job(i, PListTest.TaskType.CREATE, iterations).run();
-      }
-
-      LOG.info("parallel add and iterate");
-      // We want a lot of adds occurring so that new free pages get created
-      // along
-      // with overlapping seeks from the iterators so that we are likely to
-      // seek into
-      // some bad area in the page file.
-      executor = Executors.newFixedThreadPool(100);
-      final int numProducer = 30;
-      final int numConsumer = 10;
-      for (int i = 0; i < numLists; i++) {
-         for (int j = 0; j < numProducer; j++) {
-            executor.execute(new Job(i, PListTest.TaskType.ADD, iterations));
-         }
-         for (int k = 0; k < numConsumer; k++) {
-            executor.execute(new Job(i, TaskType.ITERATE, iterations * 2));
-         }
-      }
-
-      executor.shutdown();
-      LOG.info("wait for parallel work to complete");
-      boolean shutdown = executor.awaitTermination(5 * 60, TimeUnit.SECONDS);
-      assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
-      assertTrue("test did not  timeout ", shutdown);
-      LOG.info("Num dataFiles:" + store.getJournal().getFiles().size());
-   }
-
-   enum TaskType {
-      CREATE, DELETE, ADD, REMOVE, ITERATE, ITERATE_REMOVE
-   }
-
-   class Job implements Runnable {
-
-      int id;
-      TaskType task;
-      int iterations;
-
-      public Job(int id, TaskType t, int iterations) {
-         this.id = id;
-         this.task = t;
-         this.iterations = iterations;
-      }
-
-      @Override
-      public void run() {
-         final String threadName = Thread.currentThread().getName();
-         try {
-            PListImpl plist = null;
-            switch (task) {
-               case CREATE:
-                  Thread.currentThread().setName("C:" + id);
-                  plist = store.getPList(String.valueOf(id));
-                  LOG.info("Job-" + id + ", CREATE");
-                  break;
-               case DELETE:
-                  Thread.currentThread().setName("D:" + id);
-                  store.removePList(String.valueOf(id));
-                  break;
-               case ADD:
-                  Thread.currentThread().setName("A:" + id);
-                  plist = store.getPList(String.valueOf(id));
-
-                  for (int j = 0; j < iterations; j++) {
-                     synchronized (plistLocks(plist)) {
-                        if (exceptions.isEmpty()) {
-                           plist.addLast("PL>" + id + idSeed + "-" + j, payload);
-                        }
-                        else {
-                           break;
-                        }
-                     }
-                  }
-
-                  if (exceptions.isEmpty()) {
-                     LOG.info("Job-" + id + ", Add, done: " + iterations);
-                  }
-                  break;
-               case REMOVE:
-                  Thread.currentThread().setName("R:" + id);
-                  plist = store.getPList(String.valueOf(id));
-                  synchronized (plistLocks(plist)) {
-
-                     for (int j = iterations - 1; j >= 0; j--) {
-                        plist.remove("PL>" + id + idSeed + "-" + j);
-                        if (j > 0 && j % (iterations / 2) == 0) {
-                           LOG.info("Job-" + id + " Done remove: " + j);
-                        }
-                     }
-                  }
-                  break;
-               case ITERATE:
-                  Thread.currentThread().setName("I:" + id);
-                  plist = store.getPList(String.valueOf(id));
-                  int iterateCount = 0;
-                  synchronized (plistLocks(plist)) {
-                     if (exceptions.isEmpty()) {
-                        Iterator<PListEntry> iterator = plist.iterator();
-                        while (iterator.hasNext() && exceptions.isEmpty()) {
-                           iterator.next();
-                           iterateCount++;
-                        }
-
-                        // LOG.info("Job-" + id + " Done iterate: it=" +
-                        // iterator + ", count:" + iterateCount +
-                        // ", size:" + plist.size());
-                        if (plist.size() != iterateCount) {
-                           System.err.println("Count Wrong: " + iterator);
-                        }
-                        assertEquals("iterate got all " + id + " iterator:" + iterator, plist.size(), iterateCount);
-                     }
-                  }
-                  break;
-
-               case ITERATE_REMOVE:
-                  Thread.currentThread().setName("IRM:" + id);
-                  plist = store.getPList(String.valueOf(id));
-
-                  int removeCount = 0;
-                  synchronized (plistLocks(plist)) {
-
-                     Iterator<PListEntry> removeIterator = plist.iterator();
-
-                     while (removeIterator.hasNext()) {
-                        removeIterator.next();
-                        removeIterator.remove();
-                        if (removeCount++ > iterations) {
-                           break;
-                        }
-                     }
-                  }
-                  LOG.info("Job-" + id + " Done remove: " + removeCount);
-                  break;
-
-               default:
-            }
-
-         }
-         catch (Exception e) {
-            LOG.warn("Job[" + id + "] caught exception: " + e.getMessage());
-            e.printStackTrace();
-            exceptions.add(e);
-            if (executor != null) {
-               executor.shutdownNow();
-            }
-         }
-         finally {
-            Thread.currentThread().setName(threadName);
-         }
-      }
-   }
-
-   final Map<PList, Object> locks = new HashMap<>();
-
-   private Object plistLocks(PList plist) {
-      Object lock = null;
-      synchronized (locks) {
-         if (locks.containsKey(plist)) {
-            lock = locks.get(plist);
-         }
-         else {
-            lock = new Object();
-            locks.put(plist, lock);
-         }
-      }
-      return lock;
-   }
-
-   @Before
-   public void setUp() throws Exception {
-      File directory = new File("target/test/PlistDB");
-      IOHelper.mkdirs(directory);
-      IOHelper.deleteChildren(directory);
-      startStore(directory);
-
-   }
-
-   protected void startStore(File directory) throws Exception {
-      store = new PListStoreImpl();
-      store.setDirectory(directory);
-      store.start();
-      plist = store.getPList("main");
-   }
-
-   @After
-   public void tearDown() throws Exception {
-      if (executor != null) {
-         executor.shutdownNow();
-      }
-      store.stop();
-      exceptions.clear();
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml
deleted file mode 100644
index 5042df8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/shared.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
--->
-<beans
-        xmlns="http://www.springframework.org/schema/beans"
-        xmlns:amq="http://activemq.apache.org/schema/core"
-        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-    <!-- normal ActiveMQ XML config which is less verbose & can be validated -->
-    <amq:broker brokerName="brokerConfigTest" populateJMSXUserID="false"
-                useLoggingForShutdownErrors="true" useJmx="true"
-                persistent="true" vmConnectorURI="vm://javacoola"
-                useShutdownHook="false" deleteAllMessagesOnStartup="true">
-
-        <amq:persistenceAdapter>
-            <amq:kahaDB directory = "target/activemq-data">
-                <amq:locker>
-                    <amq:shared-file-locker lockAcquireSleepInterval="5000"/>
-                </amq:locker>
-            </amq:kahaDB>
-        </amq:persistenceAdapter>
-
-        <amq:systemUsage>
-            <amq:systemUsage>
-                <amq:memoryUsage>
-                    <amq:memoryUsage limit="10 mb" percentUsageMinDelta="20"/>
-                </amq:memoryUsage>
-                <amq:storeUsage>
-                    <amq:storeUsage limit="1 gb" name="foo"/>
-                </amq:storeUsage>
-                <amq:tempUsage>
-                    <amq:tempUsage limit="100 mb"/>
-                </amq:tempUsage>
-            </amq:systemUsage>
-        </amq:systemUsage>
-
-        <amq:transportConnectors>
-            <amq:transportConnector uri="tcp://localhost:61635"/>
-        </amq:transportConnectors>
-
-    </amq:broker>
-
-</beans>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java
deleted file mode 100644
index 3c28b3d..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBNegativeQueueTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.leveldb;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.cursors.NegativeQueueTest;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.apache.activemq.util.IOHelper;
-
-import java.io.File;
-
-public class LevelDBNegativeQueueTest extends NegativeQueueTest {
-
-   @Override
-   protected void configureBroker(BrokerService answer) throws Exception {
-      super.configureBroker(answer);
-      LevelDBStore levelDBStore = new LevelDBStore();
-      File directory = new File("target/activemq-data/leveldb");
-      IOHelper.deleteChildren(directory);
-      levelDBStore.setDirectory(directory);
-      levelDBStore.deleteAllMessages();
-      answer.setPersistenceAdapter(levelDBStore);
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java
deleted file mode 100644
index 99583d5..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDBStoreBrokerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.leveldb;
-
-import java.io.File;
-
-import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerTest;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.leveldb.LevelDBStore;
-
-/**
- * Once the wire format is completed we can test against real persistence storage.
- */
-public class LevelDBStoreBrokerTest extends BrokerTest {
-
-   @Override
-   protected void setUp() throws Exception {
-      this.setAutoFail(true);
-      super.setUp();
-   }
-
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService broker = new BrokerService();
-      LevelDBStore levelDBStore = new LevelDBStore();
-      File directory = new File("target/activemq-data/leveldb");
-      IOHelper.deleteChildren(directory);
-      levelDBStore.setDirectory(directory);
-      levelDBStore.deleteAllMessages();
-      broker.setPersistenceAdapter(levelDBStore);
-      return broker;
-   }
-
-   protected BrokerService createRestartedBroker() throws Exception {
-      BrokerService broker = new BrokerService();
-      KahaDBStore kaha = new KahaDBStore();
-      kaha.setDirectory(new File("target/activemq-data/leveldb"));
-      broker.setPersistenceAdapter(kaha);
-      return broker;
-   }
-
-   public static Test suite() {
-      return suite(LevelDBStoreBrokerTest.class);
-   }
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/db-1.log
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.data
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/schedulerDB/legacy/scheduleDB.redo
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
deleted file mode 100644
index 38f0213..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.streams;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQInputStream;
-import org.apache.activemq.ActiveMQOutputStream;
-import org.apache.activemq.JmsTestSupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-
-/**
- * JMSInputStreamTest
- */
-@Deprecated
-public class JMSInputStreamTest extends JmsTestSupport {
-
-   public Destination destination;
-   protected DataOutputStream out;
-   protected DataInputStream in;
-   private ActiveMQConnection connection2;
-
-   private ActiveMQInputStream amqIn;
-   private ActiveMQOutputStream amqOut;
-
-   public static Test suite() {
-      return suite(JMSInputStreamTest.class);
-   }
-
-   public static void main(String[] args) {
-      junit.textui.TestRunner.run(suite());
-   }
-
-   public void initCombos() {
-      addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST.QUEUE"), new ActiveMQTopic("TEST.TOPIC")});
-   }
-
-   @Override
-   protected void setUp() throws Exception {
-      super.setAutoFail(true);
-      super.setUp();
-   }
-
-   private void setUpConnection(Map<String, Object> props, long timeout) throws JMSException {
-      connection2 = (ActiveMQConnection) factory.createConnection(userName, password);
-      connections.add(connection2);
-      if (props != null) {
-         amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination, props, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
-      }
-      else {
-         amqOut = (ActiveMQOutputStream) connection.createOutputStream(destination);
-      }
-
-      out = new DataOutputStream(amqOut);
-      if (timeout == -1) {
-         amqIn = (ActiveMQInputStream) connection2.createInputStream(destination);
-      }
-      else {
-         amqIn = (ActiveMQInputStream) connection2.createInputStream(destination, null, false, timeout);
-      }
-      in = new DataInputStream(amqIn);
-   }
-
-   /*
-    * @see TestCase#tearDown()
-    */
-   @Override
-   protected void tearDown() throws Exception {
-      super.tearDown();
-   }
-
-   /**
-    * Test for AMQ-3010
-    */
-   public void testInputStreamTimeout() throws Exception {
-      long timeout = 500;
-
-      setUpConnection(null, timeout);
-      try {
-         in.read();
-         fail();
-      }
-      catch (ActiveMQInputStream.ReadTimeoutException e) {
-         // timeout reached, everything ok
-      }
-      in.close();
-   }
-
-   // Test for AMQ-2988
-   public void testStreamsWithProperties() throws Exception {
-      String name1 = "PROPERTY_1";
-      String name2 = "PROPERTY_2";
-      String value1 = "VALUE_1";
-      String value2 = "VALUE_2";
-      Map<String, Object> jmsProperties = new HashMap<>();
-      jmsProperties.put(name1, value1);
-      jmsProperties.put(name2, value2);
-      setUpConnection(jmsProperties, -1);
-
-      out.writeInt(4);
-      out.flush();
-      assertTrue(in.readInt() == 4);
-      out.writeFloat(2.3f);
-      out.flush();
-      assertTrue(in.readFloat() == 2.3f);
-      String str = "this is a test string";
-      out.writeUTF(str);
-      out.flush();
-      assertTrue(in.readUTF().equals(str));
-      for (int i = 0; i < 100; i++) {
-         out.writeLong(i);
-      }
-      out.flush();
-
-      // check properties before we try to read the stream
-      checkProperties(jmsProperties);
-
-      for (int i = 0; i < 100; i++) {
-         assertTrue(in.readLong() == i);
-      }
-
-      // check again after read was done
-      checkProperties(jmsProperties);
-   }
-
-   public void testStreamsWithPropertiesOnlyOnFirstMessage() throws Exception {
-      String name1 = "PROPERTY_1";
-      String name2 = "PROPERTY_2";
-      String value1 = "VALUE_1";
-      String value2 = "VALUE_2";
-      Map<String, Object> jmsProperties = new HashMap<>();
-      jmsProperties.put(name1, value1);
-      jmsProperties.put(name2, value2);
-
-      ActiveMQDestination dest = (ActiveMQDestination) destination;
-
-      if (dest.isQueue()) {
-         destination = new ActiveMQQueue(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
-      }
-      else {
-         destination = new ActiveMQTopic(dest.getPhysicalName() + "?producer.addPropertiesOnFirstMsgOnly=true");
-      }
-
-      setUpConnection(jmsProperties, -1);
-
-      assertTrue(amqOut.isAddPropertiesOnFirstMsgOnly());
-
-      out.writeInt(4);
-      out.flush();
-      assertTrue(in.readInt() == 4);
-      out.writeFloat(2.3f);
-      out.flush();
-      assertTrue(in.readFloat() == 2.3f);
-      String str = "this is a test string";
-      out.writeUTF(str);
-      out.flush();
-      assertTrue(in.readUTF().equals(str));
-      for (int i = 0; i < 100; i++) {
-         out.writeLong(i);
-      }
-      out.flush();
-
-      // check properties before we try to read the stream
-      checkProperties(jmsProperties);
-
-      for (int i = 0; i < 100; i++) {
-         assertTrue(in.readLong() == i);
-      }
-
-      // check again after read was done
-      checkProperties(jmsProperties);
-   }
-
-   // check if the received stream has the properties set
-   // Test for AMQ-2988
-   private void checkProperties(Map<String, Object> jmsProperties) throws IOException {
-      Map<String, Object> receivedJmsProps = amqIn.getJMSProperties();
-
-      // we should at least have the same amount or more properties
-      assertTrue(jmsProperties.size() <= receivedJmsProps.size());
-
-      // check the properties to see if we have everything in there
-      Iterator<String> propsIt = jmsProperties.keySet().iterator();
-      while (propsIt.hasNext()) {
-         String key = propsIt.next();
-         assertTrue(receivedJmsProps.containsKey(key));
-         assertEquals(jmsProperties.get(key), receivedJmsProps.get(key));
-      }
-   }
-
-   public void testLarge() throws Exception {
-      setUpConnection(null, -1);
-
-      final int testData = 23;
-      final int dataLength = 4096;
-      final int count = 1024;
-      byte[] data = new byte[dataLength];
-      for (int i = 0; i < data.length; i++) {
-         data[i] = testData;
-      }
-      final AtomicBoolean complete = new AtomicBoolean(false);
-      Thread runner = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            try {
-               for (int x = 0; x < count; x++) {
-                  byte[] b = new byte[2048];
-                  in.readFully(b);
-                  for (int i = 0; i < b.length; i++) {
-                     assertTrue(b[i] == testData);
-                  }
-               }
-               complete.set(true);
-               synchronized (complete) {
-                  complete.notify();
-               }
-            }
-            catch (Exception ex) {
-               ex.printStackTrace();
-            }
-         }
-      });
-      runner.start();
-      for (int i = 0; i < count; i++) {
-         out.write(data);
-      }
-      out.flush();
-      synchronized (complete) {
-         while (!complete.get()) {
-            complete.wait(30000);
-         }
-      }
-      assertTrue(complete.get());
-   }
-
-   public void testStreams() throws Exception {
-      setUpConnection(null, -1);
-      out.writeInt(4);
-      out.flush();
-      assertTrue(in.readInt() == 4);
-      out.writeFloat(2.3f);
-      out.flush();
-      assertTrue(in.readFloat() == 2.3f);
-      String str = "this is a test string";
-      out.writeUTF(str);
-      out.flush();
-      assertTrue(in.readUTF().equals(str));
-      for (int i = 0; i < 100; i++) {
-         out.writeLong(i);
-      }
-      out.flush();
-
-      for (int i = 0; i < 100; i++) {
-         assertTrue(in.readLong() == i);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
new file mode 100755
index 0000000..45be088
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsResourceProvider.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ *
+ */
+public class JmsResourceProvider {
+
+    private String serverUri = "vm://localhost?broker.persistent=false";
+    private boolean transacted;
+    private int ackMode = Session.AUTO_ACKNOWLEDGE;
+    private boolean isTopic;
+    private int deliveryMode = DeliveryMode.PERSISTENT;
+    private String durableName = "DummyName";
+    private String clientID = getClass().getName();
+
+    /**
+     * Creates a connection factory.
+     *
+     * @see org.apache.activemq.test.JmsResourceProvider#createConnectionFactory()
+     */
+    public ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(serverUri);
+    }
+
+    /**
+     * Creates a connection.
+     *
+     * @see org.apache.activemq.test.JmsResourceProvider#createConnection(javax.jms.ConnectionFactory)
+     */
+    public Connection createConnection(ConnectionFactory cf) throws JMSException {
+        Connection connection = cf.createConnection();
+        if (getClientID() != null) {
+            connection.setClientID(getClientID());
+        }
+        return connection;
+    }
+
+    /**
+     * @see org.apache.activemq.test.JmsResourceProvider#createSession(javax.jms.Connection)
+     */
+    public Session createSession(Connection conn) throws JMSException {
+        return conn.createSession(transacted, ackMode);
+    }
+
+    /**
+     * @see org.apache.activemq.test.JmsResourceProvider#createConsumer(javax.jms.Session,
+     *      javax.jms.Destination)
+     */
+    public MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
+        if (isDurableSubscriber()) {
+            return session.createDurableSubscriber((Topic)destination, durableName);
+        }
+        return session.createConsumer(destination);
+    }
+
+    /**
+     * Creates a connection for a consumer.
+     *
+     * @param ssp - ServerSessionPool
+     * @return ConnectionConsumer
+     */
+    public ConnectionConsumer createConnectionConsumer(Connection connection, Destination destination, ServerSessionPool ssp) throws JMSException {
+        return connection.createConnectionConsumer(destination, null, ssp, 1);
+    }
+
+    /**
+     * Creates a producer.
+     *
+     * @see org.apache.activemq.test.JmsResourceProvider#createProducer(javax.jms.Session,
+     *      javax.jms.Destination)
+     */
+    public MessageProducer createProducer(Session session, Destination destination) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+        return producer;
+    }
+
+    /**
+     * Creates a destination, which can either a topic or a queue.
+     *
+     * @see org.apache.activemq.test.JmsResourceProvider#createDestination(javax.jms.Session,
+     *      String)
+     */
+    public Destination createDestination(Session session, String name) throws JMSException {
+        if (isTopic) {
+            return session.createTopic("TOPIC." + name);
+        } else {
+            return session.createQueue("QUEUE." + name);
+        }
+    }
+
+    /**
+     * Returns true if the subscriber is durable.
+     *
+     * @return isDurableSubscriber
+     */
+    public boolean isDurableSubscriber() {
+        return isTopic && durableName != null;
+    }
+
+    /**
+     * Returns the acknowledgement mode.
+     *
+     * @return Returns the ackMode.
+     */
+    public int getAckMode() {
+        return ackMode;
+    }
+
+    /**
+     * Sets the acnknowledgement mode.
+     *
+     * @param ackMode The ackMode to set.
+     */
+    public void setAckMode(int ackMode) {
+        this.ackMode = ackMode;
+    }
+
+    /**
+     * Returns true if the destination is a topic, false if the destination is a
+     * queue.
+     *
+     * @return Returns the isTopic.
+     */
+    public boolean isTopic() {
+        return isTopic;
+    }
+
+    /**
+     * @param isTopic The isTopic to set.
+     */
+    public void setTopic(boolean isTopic) {
+        this.isTopic = isTopic;
+    }
+
+    /**
+     * Returns the server URI.
+     *
+     * @return Returns the serverUri.
+     */
+    public String getServerUri() {
+        return serverUri;
+    }
+
+    /**
+     * Sets the server URI.
+     *
+     * @param serverUri - the server URI to set.
+     */
+    public void setServerUri(String serverUri) {
+        this.serverUri = serverUri;
+    }
+
+    /**
+     * Return true if the session is transacted.
+     *
+     * @return Returns the transacted.
+     */
+    public boolean isTransacted() {
+        return transacted;
+    }
+
+    /**
+     * Sets the session to be transacted.
+     *
+     * @param transacted
+     */
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+        if (transacted) {
+            setAckMode(Session.SESSION_TRANSACTED);
+        }
+    }
+
+    /**
+     * Returns the delivery mode.
+     *
+     * @return deliveryMode
+     */
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    /**
+     * Sets the delivery mode.
+     *
+     * @param deliveryMode
+     */
+    public void setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+    }
+
+    /**
+     * Returns the client id.
+     *
+     * @return clientID
+     */
+    public String getClientID() {
+        return clientID;
+    }
+
+    /**
+     * Sets the client id.
+     *
+     * @param clientID
+     */
+    public void setClientID(String clientID) {
+        this.clientID = clientID;
+    }
+
+    /**
+     * Returns the durable name of the provider.
+     *
+     * @return durableName
+     */
+    public String getDurableName() {
+        return durableName;
+    }
+
+    /**
+     * Sets the durable name of the provider.
+     *
+     * @param durableName
+     */
+    public void setDurableName(String durableName) {
+        this.durableName = durableName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java
new file mode 100755
index 0000000..be32877
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/TestSupport.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.reflect.Array;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Useful base class for unit test cases
+ *
+ *
+ */
+public abstract class TestSupport extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(TestSupport.class);
+
+    protected ActiveMQConnectionFactory connectionFactory;
+    protected boolean topic = true;
+
+    public TestSupport() {
+        super();
+    }
+
+    public TestSupport(String name) {
+        super(name);
+    }
+
+    /**
+     * Creates an ActiveMQMessage.
+     *
+     * @return ActiveMQMessage
+     */
+    protected ActiveMQMessage createMessage() {
+        return new ActiveMQMessage();
+    }
+
+    /**
+     * Creates a destination.
+     *
+     * @param subject - topic or queue name.
+     * @return Destination - either an ActiveMQTopic or ActiveMQQUeue.
+     */
+    protected Destination createDestination(String subject) {
+        if (topic) {
+            return new ActiveMQTopic(subject);
+        } else {
+            return new ActiveMQQueue(subject);
+        }
+    }
+
+    /**
+     * Tests if firstSet and secondSet are equal.
+     *
+     * @param messsage - string to be displayed when the assertion fails.
+     * @param firstSet[] - set of messages to be compared with its counterpart
+     *                in the secondset.
+     * @param secondSet[] - set of messages to be compared with its counterpart
+     *                in the firstset.
+     * @throws javax.jms.JMSException
+     */
+    protected void assertTextMessagesEqual(Message[] firstSet, Message[] secondSet) throws JMSException {
+        assertTextMessagesEqual("", firstSet, secondSet);
+    }
+
+    /**
+     * Tests if firstSet and secondSet are equal.
+     *
+     * @param messsage - string to be displayed when the assertion fails.
+     * @param firstSet[] - set of messages to be compared with its counterpart
+     *                in the secondset.
+     * @param secondSet[] - set of messages to be compared with its counterpart
+     *                in the firstset.
+     */
+    protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet) throws JMSException {
+        assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length);
+
+        for (int i = 0; i < secondSet.length; i++) {
+            TextMessage m1 = (TextMessage)firstSet[i];
+            TextMessage m2 = (TextMessage)secondSet[i];
+            assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2);
+        }
+    }
+
+    /**
+     * Tests if m1 and m2 are equal.
+     *
+     * @param m1 - message to be compared with m2.
+     * @param m2 - message to be compared with m1.
+     * @throws javax.jms.JMSException
+     */
+    protected void assertEquals(TextMessage m1, TextMessage m2) throws JMSException {
+        assertEquals("", m1, m2);
+    }
+
+    /**
+     * Tests if m1 and m2 are equal.
+     *
+     * @param message - string to be displayed when the assertion fails.
+     * @param m1 - message to be compared with m2.
+     * @param m2 - message to be compared with m1.
+     */
+    protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException {
+        assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
+
+        if (m1 == null) {
+            return;
+        }
+
+        assertEquals(message, m1.getText(), m2.getText());
+    }
+
+    /**
+     * Tests if m1 and m2 are equal.
+     *
+     * @param m1 - message to be compared with m2.
+     * @param m2 - message to be compared with m1.
+     * @throws javax.jms.JMSException
+     */
+    protected void assertEquals(Message m1, Message m2) throws JMSException {
+        assertEquals("", m1, m2);
+    }
+
+    /**
+     * Tests if m1 and m2 are equal.
+     *
+     * @param message - error message.
+     * @param m1 - message to be compared with m2.
+     * @param m2 -- message to be compared with m1.
+     */
+    protected void assertEquals(String message, Message m1, Message m2) throws JMSException {
+        assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);
+
+        if (m1 == null) {
+            return;
+        }
+
+        assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getClass() == m2.getClass());
+
+        if (m1 instanceof TextMessage) {
+            assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2);
+        } else {
+            assertEquals(message, m1, m2);
+        }
+    }
+
+    /**
+     * Test if base directory contains spaces
+     */
+    protected void assertBaseDirectoryContainsSpaces() {
+    	assertFalse("Base directory cannot contain spaces.", new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" "));
+    }
+
+    /**
+     * Creates an ActiveMQConnectionFactory.
+     *
+     * @return ActiveMQConnectionFactory
+     * @throws Exception
+     */
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+    }
+
+    /**
+     * Factory method to create a new connection.
+     *
+     * @return connection
+     * @throws Exception
+     */
+    protected Connection createConnection() throws Exception {
+        return getConnectionFactory().createConnection();
+    }
+
+    /**
+     * Creates an ActiveMQ connection factory.
+     *
+     * @return connectionFactory
+     * @throws Exception
+     */
+    public ActiveMQConnectionFactory getConnectionFactory() throws Exception {
+        if (connectionFactory == null) {
+            connectionFactory = createConnectionFactory();
+            assertTrue("Should have created a connection factory!", connectionFactory != null);
+        }
+
+        return connectionFactory;
+    }
+
+    /**
+     * Returns the consumer subject.
+     *
+     * @return String
+     */
+    protected String getConsumerSubject() {
+        return getSubject();
+    }
+
+    /**
+     * Returns the producer subject.
+     *
+     * @return String
+     */
+    protected String getProducerSubject() {
+        return getSubject();
+    }
+
+    /**
+     * Returns the subject.
+     *
+     * @return String
+     */
+    protected String getSubject() {
+        return getClass().getName() + "." + getName();
+    }
+
+    protected void assertArrayEqual(String message, Object[] expected, Object[] actual) {
+        assertEquals(message + ". Array length", expected.length, actual.length);
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(message + ". element: " + i, expected[i], actual[i]);
+        }
+    }
+
+    protected void assertPrimitiveArrayEqual(String message, Object expected, Object actual) {
+        int length = Array.getLength(expected);
+        assertEquals(message + ". Array length", length, Array.getLength(actual));
+        for (int i = 0; i < length; i++) {
+            assertEquals(message + ". element: " + i, Array.get(expected, i), Array.get(actual, i));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
index 9902bd2..56d1546 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/QueueClusterTest.java
@@ -16,13 +16,16 @@
  */
 package org.apache.activemq.transport;
 
+import org.junit.Before;
+
 /**
  *
  */
 public class QueueClusterTest extends TopicClusterTest {
 
    @Override
-   protected void setUp() throws Exception {
+   @Before
+   public void setUp() throws Exception {
       topic = false;
       super.setUp();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
index 1b95006..3506ff0 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
@@ -21,42 +21,50 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import junit.framework.Test;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsTestSupport;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.SocketProxy;
-import org.apache.activemq.util.URISupport;
-import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SoWriteTimeoutClientTest extends JmsTestSupport {
+public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
 
    private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutClientTest.class);
+   private String messageTextPrefix = "";
+   private EmbeddedJMS server;
+
+   @Before
+   public void setUp() throws Exception {
+      Configuration config = this.createConfig(0);
+      server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+      server.start();
+   }
 
-   @Override
-   protected BrokerService createBroker() throws Exception {
-      BrokerService broker = new BrokerService();
-      broker.setDeleteAllMessagesOnStartup(true);
-      KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-      adapter.setConcurrentStoreAndDispatchQueues(false);
-      broker.setPersistenceAdapter(adapter);
-      broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
-      return broker;
+   @After
+   public void tearDown() throws Exception {
+      server.stop();
    }
 
+   @Test
    public void testSendWithClientWriteTimeout() throws Exception {
       final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
       messageTextPrefix = initMessagePrefix(80 * 1024);
 
-      URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
+      URI tcpBrokerUri = new URI(newURI(0));
       LOG.info("consuming using uri: " + tcpBrokerUri);
 
       ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
@@ -92,25 +100,34 @@ public class SoWriteTimeoutClientTest extends JmsTestSupport {
       TimeUnit.SECONDS.sleep(8);
       proxy.goOn();
       for (int i = 0; i < messageCount; i++) {
-         assertNotNull("Got message " + i + " after reconnect", consumer.receive(5000));
+         Assert.assertNotNull("Got message " + i + " after reconnect", consumer.receive(10000));
       }
 
-      assertTrue("no pending messages when done", Wait.waitFor(new Wait.Condition() {
-         @Override
-         public boolean isSatisified() throws Exception {
+      Assert.assertNull(consumer.receive(5000));
 
-            LOG.info("current total message count: " + broker.getAdminView().getTotalMessageCount());
-            return broker.getAdminView().getTotalMessageCount() == 0;
-         }
-      }));
+   }
+
+   protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      sendMessages(session, destination, count);
+      session.close();
+   }
+
+   protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
+      MessageProducer producer = session.createProducer(destination);
+      sendMessages(session, producer, count);
+      producer.close();
+   }
+
+   protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException {
+      for (int i = 0; i < count; i++) {
+         producer.send(session.createTextMessage(messageTextPrefix + i));
+      }
    }
 
    private String initMessagePrefix(int i) {
       byte[] content = new byte[i];
       return new String(content);
    }
+}
 
-   public static Test suite() {
-      return suite(SoWriteTimeoutClientTest.class);
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9307f771/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
index 5157c33..c2a7d24 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.activemq.transport;
 
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
@@ -33,22 +30,23 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import junit.framework.TestCase;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.ServiceStopper;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  *
  */
-public class TopicClusterTest extends TestCase implements MessageListener {
+public class TopicClusterTest extends OpenwireArtemisBaseTest implements MessageListener {
 
    protected static final int MESSAGE_COUNT = 50;
    protected static final int NUMBER_IN_CLUSTER = 3;
@@ -60,11 +58,11 @@ public class TopicClusterTest extends TestCase implements MessageListener {
    protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
    protected MessageProducer[] producers;
    protected Connection[] connections;
-   protected List<BrokerService> services = new ArrayList<>();
+   protected EmbeddedJMS[] servers = new EmbeddedJMS[NUMBER_IN_CLUSTER];
    protected String groupId;
 
-   @Override
-   protected void setUp() throws Exception {
+   @Before
+   public void setUp() throws Exception {
       groupId = "topic-cluster-test-" + System.currentTimeMillis();
       connections = new Connection[NUMBER_IN_CLUSTER];
       producers = new MessageProducer[NUMBER_IN_CLUSTER];
@@ -73,11 +71,13 @@ public class TopicClusterTest extends TestCase implements MessageListener {
       if (root == null) {
          root = "target/store";
       }
+
+      this.setUpClusterServers(servers);
       try {
          for (int i = 0; i < NUMBER_IN_CLUSTER; i++) {
 
             System.setProperty("activemq.store.dir", root + "_broker_" + i);
-            connections[i] = createConnection("broker-" + i);
+            connections[i] = createConnection(i);
             connections[i].setClientID("ClusterTest" + i);
             connections[i].start();
             Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -95,42 +95,35 @@ public class TopicClusterTest extends TestCase implements MessageListener {
       }
    }
 
-   @Override
-   protected void tearDown() throws Exception {
+   @After
+   public void tearDown() throws Exception {
       if (connections != null) {
          for (int i = 0; i < connections.length; i++) {
-            connections[i].close();
+            try {
+               connections[i].close();
+            } catch (Exception e) {
+               //ignore.
+            }
          }
       }
-      ServiceStopper stopper = new ServiceStopper();
-      stopper.stopServices(services);
+      this.shutDownClusterServers(servers);
    }
 
    protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException {
       return session.createConsumer(destination);
    }
 
-   protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws Exception {
-      BrokerService container = new BrokerService();
-      container.setBrokerName(brokerName);
-
-      String url = "tcp://localhost:0";
-      TransportConnector connector = container.addConnector(url);
-      connector.setDiscoveryUri(new URI("multicast://default?group=" + groupId));
-      container.addNetworkConnector("multicast://default?group=" + groupId);
-      container.start();
-
-      services.add(container);
-
-      return new ActiveMQConnectionFactory("vm://" + brokerName);
+   protected ActiveMQConnectionFactory createGenericClusterFactory(int serverID) throws Exception {
+      String url = newURI(serverID);
+      return new ActiveMQConnectionFactory(url);
    }
 
    protected int expectedReceiveCount() {
       return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
    }
 
-   protected Connection createConnection(String name) throws Exception {
-      return createGenericClusterFactory(name).createConnection();
+   protected Connection createConnection(int serverID) throws Exception {
+      return createGenericClusterFactory(serverID).createConnection();
    }
 
    protected Destination createDestination() {
@@ -146,10 +139,6 @@ public class TopicClusterTest extends TestCase implements MessageListener {
       }
    }
 
-   /**
-    * @param msg
-    */
-   @Override
    public void onMessage(Message msg) {
       // log.info("GOT: " + msg);
       receivedMessageCount.incrementAndGet();
@@ -160,9 +149,7 @@ public class TopicClusterTest extends TestCase implements MessageListener {
       }
    }
 
-   /**
-    * @throws Exception
-    */
+   @Test
    public void testSendReceive() throws Exception {
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          TextMessage textMessage = new ActiveMQTextMessage();
@@ -178,8 +165,8 @@ public class TopicClusterTest extends TestCase implements MessageListener {
       }
       // sleep a little - to check we don't get too many messages
       Thread.sleep(2000);
-      LOG.info("GOT: " + receivedMessageCount.get());
-      assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
+      LOG.info("GOT: " + receivedMessageCount.get() + " Expected: " + expectedReceiveCount());
+      Assert.assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
    }
 
 }