You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:19 UTC
[10/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
deleted file mode 100644
index 352d2f0..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionControl;
-import org.apache.activemq.store.kahadb.disk.journal.FileAppender;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KahaDBFastEnqueueTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(KahaDBFastEnqueueTest.class);
- private BrokerService broker;
- private ActiveMQConnectionFactory connectionFactory;
- KahaDBPersistenceAdapter kahaDBPersistenceAdapter;
- private final Destination destination = new ActiveMQQueue("Test");
- private final String payloadString = new String(new byte[6 * 1024]);
- private final boolean useBytesMessage = true;
- private final int parallelProducer = 20;
- private final Vector<Exception> exceptions = new Vector<>();
- long toSend = 10000;
-
- // use with:
- // -Xmx4g -Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000 -Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true
- @Test
- public void testPublishNoConsumer() throws Exception {
-
- startBroker(true, 10);
-
- final AtomicLong sharedCount = new AtomicLong(toSend);
- long start = System.currentTimeMillis();
- ExecutorService executorService = Executors.newCachedThreadPool();
- for (int i = 0; i < parallelProducer; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- publishMessages(sharedCount, 0);
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
- }
- executorService.shutdown();
- executorService.awaitTermination(30, TimeUnit.MINUTES);
- assertTrue("Producers done in time", executorService.isTerminated());
- assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
- long totalSent = toSend * payloadString.length();
-
- double duration = System.currentTimeMillis() - start;
- stopBroker();
- LOG.info("Duration: " + duration + "ms");
- LOG.info("Rate: " + (toSend * 1000 / duration) + "m/s");
- LOG.info("Total send: " + totalSent);
- LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length());
- LOG.info("Total index size " + kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize());
- LOG.info("Total store size: " + kahaDBPersistenceAdapter.size());
- LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double) totalSent * 100 + "%");
-
- restartBroker(0, 1200000);
- consumeMessages(toSend);
- }
-
- @Test
- public void testPublishNoConsumerNoCheckpoint() throws Exception {
-
- toSend = 100;
- startBroker(true, 0);
-
- final AtomicLong sharedCount = new AtomicLong(toSend);
- long start = System.currentTimeMillis();
- ExecutorService executorService = Executors.newCachedThreadPool();
- for (int i = 0; i < parallelProducer; i++) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- try {
- publishMessages(sharedCount, 0);
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
- }
- executorService.shutdown();
- executorService.awaitTermination(30, TimeUnit.MINUTES);
- assertTrue("Producers done in time", executorService.isTerminated());
- assertTrue("No exceptions: " + exceptions, exceptions.isEmpty());
- long totalSent = toSend * payloadString.length();
-
- broker.getAdminView().gc();
-
- double duration = System.currentTimeMillis() - start;
- stopBroker();
- LOG.info("Duration: " + duration + "ms");
- LOG.info("Rate: " + (toSend * 1000 / duration) + "m/s");
- LOG.info("Total send: " + totalSent);
- LOG.info("Total journal write: " + kahaDBPersistenceAdapter.getStore().getJournal().length());
- LOG.info("Total index size " + kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize());
- LOG.info("Total store size: " + kahaDBPersistenceAdapter.size());
- LOG.info("Journal writes %: " + kahaDBPersistenceAdapter.getStore().getJournal().length() / (double) totalSent * 100 + "%");
-
- restartBroker(0, 0);
- consumeMessages(toSend);
- }
-
- private void consumeMessages(long count) throws Exception {
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setWatchTopicAdvisories(false);
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i = 0; i < count; i++) {
- assertNotNull("got message " + i, consumer.receive(10000));
- }
- assertNull("none left over", consumer.receive(2000));
- }
-
- private void restartBroker(int restartDelay, int checkpoint) throws Exception {
- stopBroker();
- TimeUnit.MILLISECONDS.sleep(restartDelay);
- startBroker(false, checkpoint);
- }
-
- @Before
- public void setProps() {
- System.setProperty(Journal.CALLER_BUFFER_APPENDER, Boolean.toString(true));
- System.setProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW, "10000");
- }
-
- @After
- public void stopBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- System.clearProperty(Journal.CALLER_BUFFER_APPENDER);
- System.clearProperty(FileAppender.PROPERTY_LOG_WRITE_STAT_WINDOW);
- }
-
- final double sampleRate = 100000;
-
- private void publishMessages(AtomicLong count, int expiry) throws Exception {
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setWatchTopicAdvisories(false);
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer producer = session.createProducer(destination);
- Long start = System.currentTimeMillis();
- long i = 0L;
- while ((i = count.getAndDecrement()) > 0) {
- Message message = null;
- if (useBytesMessage) {
- message = session.createBytesMessage();
- ((BytesMessage) message).writeBytes(payloadString.getBytes());
- }
- else {
- message = session.createTextMessage(payloadString);
- }
- producer.send(message, DeliveryMode.PERSISTENT, 5, expiry);
- if (i != toSend && i % sampleRate == 0) {
- long now = System.currentTimeMillis();
- LOG.info("Remainder: " + i + ", rate: " + sampleRate * 1000 / (now - start) + "m/s");
- start = now;
- }
- }
- connection.syncSendPacket(new ConnectionControl());
- connection.close();
- }
-
- public void startBroker(boolean deleteAllMessages, int checkPointPeriod) throws Exception {
- broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
- kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
- kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
- // defer checkpoints which require a sync
- kahaDBPersistenceAdapter.setCleanupInterval(checkPointPeriod);
- kahaDBPersistenceAdapter.setCheckpointInterval(checkPointPeriod);
-
- // optimise for disk best batch rate
- kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24 * 1024 * 1024); //4mb default
- kahaDBPersistenceAdapter.setJournalMaxFileLength(128 * 1024 * 1024); // 32mb default
- // keep index in memory
- kahaDBPersistenceAdapter.setIndexCacheSize(500000);
- kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000);
- kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
- kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
-
- broker.addConnector("tcp://0.0.0.0:0");
- broker.start();
-
- String options = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
- connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options);
- }
-
- @Test
- public void testRollover() throws Exception {
- byte flip = 0x1;
- for (long i = 0; i < Short.MAX_VALUE; i++) {
- assertEquals("0 @:" + i, 0, flip ^= (byte) 1);
- assertEquals("1 @:" + i, 1, flip ^= (byte) 1);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
deleted file mode 100644
index 24229c9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBIndexLocationTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FilenameFilter;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class KahaDBIndexLocationTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(KahaDBIndexLocationTest.class);
-
- @Rule
- public TestName name = new TestName();
-
- private BrokerService broker;
-
- private final File testDataDir = new File("target/activemq-data/QueuePurgeTest");
- private final File kahaDataDir = new File(testDataDir, "kahadb");
- private final File kahaIndexDir = new File(testDataDir, "kahadb/index");
-
- /**
- * @throws java.lang.Exception
- */
- @Before
- public void setUp() throws Exception {
- startBroker();
- }
-
- @After
- public void tearDown() throws Exception {
- stopBroker();
- }
-
- private void startBroker() throws Exception {
- createBroker();
- broker.start();
- broker.waitUntilStarted();
- }
-
- private void stopBroker() throws Exception {
- if (broker != null) {
- broker.stop();
- broker.waitUntilStopped();
- }
- }
-
- private void restartBroker() throws Exception {
- stopBroker();
- createBroker();
- broker.start();
- broker.waitUntilStarted();
- }
-
- private void createBroker() throws Exception {
- broker = new BrokerService();
-
- KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
- persistenceAdapter.setDirectory(kahaDataDir);
- persistenceAdapter.setIndexDirectory(kahaIndexDir);
-
- broker.setDataDirectoryFile(testDataDir);
- broker.setUseJmx(false);
- broker.setAdvisorySupport(false);
- broker.setSchedulerSupport(false);
- broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistenceAdapter(persistenceAdapter);
- }
-
- @Test
- public void testIndexDirExists() throws Exception {
- LOG.info("Index dir is configured as: {}", kahaIndexDir);
- assertTrue(kahaDataDir.exists());
- assertTrue(kahaIndexDir.exists());
-
- String[] index = kahaIndexDir.list(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- LOG.info("Testing filename: {}", name);
- return name.endsWith("data") || name.endsWith("redo");
- }
- });
-
- String[] journal = kahaDataDir.list(new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- LOG.info("Testing filename: {}", name);
- return name.endsWith("log") || name.equals("lock");
- }
- });
-
- produceMessages();
-
- // Should be db.data and db.redo and nothing else.
- assertNotNull(index);
- assertEquals(2, index.length);
-
- // Should contain the initial log for the journal and the lock.
- assertNotNull(journal);
- assertEquals(2, journal.length);
- }
-
- @Test
- public void testRestartWithDeleteWorksWhenIndexIsSeparate() throws Exception {
- produceMessages();
- restartBroker();
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
- Connection connection = cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
- MessageConsumer consumer = session.createConsumer(queue);
- assertNull(consumer.receive(2000));
- }
-
- private void produceMessages() throws Exception {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false");
- Connection connection = cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
- MessageProducer producer = session.createProducer(queue);
- for (int i = 0; i < 5; ++i) {
- producer.send(session.createTextMessage("test:" + i));
- }
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
deleted file mode 100644
index bb0e954..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessagePriorityTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store.kahadb;
-
-import junit.framework.Test;
-
-import org.apache.activemq.store.MessagePriorityTest;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-
-public class KahaDBMessagePriorityTest extends MessagePriorityTest {
-
- @Override
- protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
- KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
- adapter.setConcurrentStoreAndDispatchQueues(false);
- adapter.setConcurrentStoreAndDispatchTopics(false);
- adapter.deleteAllMessages();
- return adapter;
- }
-
- public static Test suite() {
- return suite(KahaDBMessagePriorityTest.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
deleted file mode 100644
index cddbd71..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapterTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.PersistenceAdapterTestSupport;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public class KahaDBPersistenceAdapterTest extends PersistenceAdapterTestSupport {
-
- @Override
- protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws IOException {
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File("target/activemq-data/kahadb"));
- if (delete) {
- kaha.deleteAllMessages();
- }
- return kaha;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java
deleted file mode 100644
index b8fef90..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreBrokerTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-
-import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerTest;
-import org.apache.activemq.util.IOHelper;
-
-/**
- * Once the wire format is completed we can test against real persistence storage.
- */
-public class KahaDBStoreBrokerTest extends BrokerTest {
-
- @Override
- protected void setUp() throws Exception {
- this.setAutoFail(true);
- super.setUp();
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- File directory = new File("target/activemq-data/kahadb");
- IOHelper.deleteChildren(directory);
- kaha.setDirectory(directory);
- kaha.deleteAllMessages();
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- protected BrokerService createRestartedBroker() throws Exception {
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File("target/activemq-data/kahadb"));
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- public static Test suite() {
- return suite(KahaDBStoreBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
deleted file mode 100644
index e672890..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOrderTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.store.StoreOrderTest;
-
-// https://issues.apache.org/activemq/browse/AMQ-2594
-public class KahaDBStoreOrderTest extends StoreOrderTest {
-
- @Override
- protected void setPersistentAdapter(BrokerService brokerService) throws Exception {
- KahaDBStore kaha = new KahaDBStore();
- File directory = new File("target/activemq-data/kahadb/storeOrder");
- kaha.setDirectory(directory);
- brokerService.setPersistenceAdapter(kaha);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
deleted file mode 100644
index bddfde8..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.ArrayList;
-
-import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.RecoveryBrokerTest;
-import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.commons.io.FileUtils;
-
-/**
- * Used to verify that recovery works correctly against
- */
-public class KahaDBStoreRecoveryBrokerTest extends RecoveryBrokerTest {
-
- public static final String KAHADB_DIR_BASE = "target/activemq-data/kahadb";
- public static String kahaDbDirectoryName;
-
- enum CorruptionType {None, FailToLoad, LoadInvalid, LoadCorrupt, LoadOrderIndex0}
-
- public CorruptionType failTest = CorruptionType.None;
-
- @Override
- protected void setUp() throws Exception {
- kahaDbDirectoryName = KAHADB_DIR_BASE + "/" + System.currentTimeMillis();
- super.setUp();
- }
-
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- try {
- File kahaDbDir = new File(kahaDbDirectoryName);
- FileUtils.deleteDirectory(kahaDbDir);
- }
- catch (IOException e) {
- }
- }
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File(kahaDbDirectoryName));
- kaha.deleteAllMessages();
- kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0);
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- @Override
- @SuppressWarnings("resource")
- protected BrokerService createRestartedBroker() throws Exception {
-
- // corrupting index
- File index = new File(kahaDbDirectoryName + "/db.data");
- RandomAccessFile raf = new RandomAccessFile(index, "rw");
- switch (failTest) {
- case FailToLoad:
- index.delete();
- raf = new RandomAccessFile(index, "rw");
- raf.seek(index.length());
- raf.writeBytes("corrupt");
- break;
- case LoadInvalid:
- // page size 0
- raf.seek(0);
- raf.writeBytes("corrupt and cannot load metadata");
- break;
- case LoadCorrupt:
- // loadable but invalid metadata
- // location of order index low priority index for first destination...
- raf.seek(8 * 1024 + 57);
- raf.writeLong(Integer.MAX_VALUE - 10);
- break;
- case LoadOrderIndex0:
- // loadable but invalid metadata
- // location of order index default priority index size
- // so looks like there are no ids in the order index
- // picked up by setCheckForCorruptJournalFiles
- raf.seek(12 * 1024 + 21);
- raf.writeShort(0);
- raf.writeChar(0);
- raf.writeLong(-1);
- break;
- default:
- }
- raf.close();
-
- // starting broker
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- kaha.setCheckForCorruptJournalFiles(failTest == CorruptionType.LoadOrderIndex0);
- // uncomment if you want to test archiving
- //kaha.setArchiveCorruptedIndex(true);
- kaha.setDirectory(new File(kahaDbDirectoryName));
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- public static Test suite() {
- return suite(KahaDBStoreRecoveryBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
- public void initCombosForTestLargeQueuePersistentMessagesNotLostOnRestart() {
- this.addCombinationValues("failTest", new CorruptionType[]{CorruptionType.FailToLoad, CorruptionType.LoadInvalid, CorruptionType.LoadCorrupt, CorruptionType.LoadOrderIndex0});
- }
-
- public void testLargeQueuePersistentMessagesNotLostOnRestart() throws Exception {
-
- ActiveMQDestination destination = new ActiveMQQueue("TEST");
-
- // Setup the producer and send the message.
- StubConnection connection = createConnection();
- ConnectionInfo connectionInfo = createConnectionInfo();
- SessionInfo sessionInfo = createSessionInfo(connectionInfo);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- connection.send(producerInfo);
-
- ArrayList<String> expected = new ArrayList<>();
-
- int MESSAGE_COUNT = 10000;
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message message = createMessage(producerInfo, destination);
- message.setPersistent(true);
- connection.send(message);
- expected.add(message.getMessageId().toString());
- }
- connection.request(closeConnectionInfo(connectionInfo));
-
- // restart the broker.
- restartBroker();
-
- // Setup the consumer and receive the message.
- connection = createConnection();
- connectionInfo = createConnectionInfo();
- sessionInfo = createSessionInfo(connectionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
- producerInfo = createProducerInfo(sessionInfo);
- connection.send(producerInfo);
-
- for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
- Message m = receiveMessage(connection);
- assertNotNull("Should have received message " + expected.get(0) + " by now!", m);
- assertEquals(expected.remove(0), m.getMessageId().toString());
- MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
- connection.send(ack);
- }
-
- connection.request(closeConnectionInfo(connectionInfo));
-
- // restart the broker.
- restartBroker();
-
- // Setup the consumer and receive the message.
- connection = createConnection();
- connectionInfo = createConnectionInfo();
- sessionInfo = createSessionInfo(connectionInfo);
- connection.send(connectionInfo);
- connection.send(sessionInfo);
- consumerInfo = createConsumerInfo(sessionInfo, destination);
- connection.send(consumerInfo);
-
- for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
- Message m = receiveMessage(connection);
- assertNotNull("Should have received message " + expected.get(i) + " by now!", m);
- assertEquals(expected.get(i), m.getMessageId().toString());
- MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
- connection.send(ack);
-
- }
-
- connection.request(closeConnectionInfo(connectionInfo));
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
deleted file mode 100644
index 6ed4000..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryExpiryTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.BaseDestination;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class KahaDBStoreRecoveryExpiryTest {
-
- private BrokerService broker;
- private ActiveMQConnection connection;
- private final Destination destination = new ActiveMQQueue("Test");
- private Session session;
-
- @Test
- public void testRestartWitExpired() throws Exception {
- publishMessages(1, 0);
- publishMessages(1, 2000);
- publishMessages(1, 0);
- restartBroker(3000);
- consumeMessages(2);
- }
-
- @Test
- public void testRestartWitExpiredLargerThanBatchRecovery() throws Exception {
- publishMessages(BaseDestination.MAX_PAGE_SIZE + 10, 2000);
- publishMessages(10, 0);
- restartBroker(3000);
- consumeMessages(10);
- }
-
- private void consumeMessages(int count) throws Exception {
- MessageConsumer consumer = session.createConsumer(destination);
- for (int i = 0; i < count; i++) {
- assertNotNull("got message " + i, consumer.receive(4000));
- }
- assertNull("none left over", consumer.receive(2000));
- }
-
- private void restartBroker(int restartDelay) throws Exception {
- stopBroker();
- TimeUnit.MILLISECONDS.sleep(restartDelay);
- startBroker();
- }
-
- @After
- public void stopBroker() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- private void publishMessages(int count, int expiry) throws Exception {
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 5, expiry);
- }
- }
-
- @Before
- public void startBroker() throws Exception {
- broker = new BrokerService();
- ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setIndexCacheSize(0);
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
- policyMap.setDefaultEntry(defaultEntry);
- broker.setDestinationPolicy(policyMap);
- broker.setUseJmx(false);
- broker.start();
-
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
- connection = (ActiveMQConnection) connectionFactory.createConnection();
- connection.setWatchTopicAdvisories(false);
- connection.start();
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java
deleted file mode 100644
index 1b9980f..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class KahaDBStoreTest {
-
- KahaDBStore.KahaDBMessageStore underTest;
- KahaDBStore store;
- ActiveMQMessage message;
- ProducerId producerId = new ProducerId("1.1.1");
- private static final int MESSAGE_COUNT = 2000;
- private Vector<Throwable> exceptions = new Vector<>();
-
- @Before
- public void initStore() throws Exception {
- ActiveMQDestination destination = new ActiveMQQueue("Test");
- store = new KahaDBStore();
- store.setMaxAsyncJobs(100);
- store.setDeleteAllMessages(true);
- store.start();
- underTest = store.new KahaDBMessageStore(destination);
- underTest.start();
- message = new ActiveMQMessage();
- message.setDestination(destination);
- }
-
- @After
- public void destroyStore() throws Exception {
- if (store != null) {
- store.stop();
- }
- }
-
- @Test
- public void testConcurrentStoreAndDispatchQueue() throws Exception {
-
- ExecutorService executor = Executors.newCachedThreadPool();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- final int id = ++i;
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Message msg = message.copy();
- msg.setMessageId(new MessageId(producerId, id));
- underTest.asyncAddQueueMessage(null, msg);
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
- }
-
- ExecutorService executor2 = Executors.newCachedThreadPool();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- final int id = ++i;
- executor2.execute(new Runnable() {
- @Override
- public void run() {
- try {
- MessageAck ack = new MessageAck();
- ack.setLastMessageId(new MessageId(producerId, id));
- underTest.removeAsyncMessage(null, ack);
- }
- catch (Exception e) {
- exceptions.add(e);
- }
- }
- });
- }
-
- executor.shutdown();
- executor.awaitTermination(60, TimeUnit.SECONDS);
-
- executor2.shutdown();
- executor2.awaitTermination(60, TimeUnit.SECONDS);
-
- assertTrue("no exceptions " + exceptions, exceptions.isEmpty());
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
deleted file mode 100644
index 3b63758..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-
-/**
- * @author chirino
- */
-public class KahaDBTest extends TestCase {
-
- protected BrokerService createBroker(KahaDBStore kaha) throws Exception {
-
- BrokerService broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setPersistenceAdapter(kaha);
- broker.start();
- return broker;
- }
-
- private KahaDBStore createStore(boolean delete) throws IOException {
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File("target/activemq-data/kahadb"));
- if (delete) {
- kaha.deleteAllMessages();
- }
- return kaha;
- }
-
- public void testIgnoreMissingJournalfilesOptionSetFalse() throws Exception {
- KahaDBStore kaha = createStore(true);
- kaha.setJournalMaxFileLength(1024 * 100);
- assertFalse(kaha.isIgnoreMissingJournalfiles());
- BrokerService broker = createBroker(kaha);
- sendMessages(1000);
- broker.stop();
-
- // Delete some journal files..
- assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log"));
- assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log"));
-
- kaha = createStore(false);
- kaha.setJournalMaxFileLength(1024 * 100);
- assertFalse(kaha.isIgnoreMissingJournalfiles());
- try {
- broker = createBroker(kaha);
- fail("expected IOException");
- }
- catch (IOException e) {
- assertTrue(e.getMessage().startsWith("Detected missing/corrupt journal files"));
- }
-
- }
-
- public void testIgnoreMissingJournalfilesOptionSetTrue() throws Exception {
- KahaDBStore kaha = createStore(true);
- kaha.setJournalMaxFileLength(1024 * 100);
- assertFalse(kaha.isIgnoreMissingJournalfiles());
- BrokerService broker = createBroker(kaha);
- sendMessages(1000);
- broker.stop();
-
- // Delete some journal files..
- assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log"));
- assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log"));
-
- kaha = createStore(false);
- kaha.setIgnoreMissingJournalfiles(true);
- kaha.setJournalMaxFileLength(1024 * 100);
- broker = createBroker(kaha);
-
- // We know we won't get all the messages but we should get most of them.
- int count = receiveMessages();
- assertTrue(count > 800);
- assertTrue(count < 1000);
-
- broker.stop();
- }
-
- public void testCheckCorruptionNotIgnored() throws Exception {
- KahaDBStore kaha = createStore(true);
- assertTrue(kaha.isChecksumJournalFiles());
- assertFalse(kaha.isCheckForCorruptJournalFiles());
-
- kaha.setJournalMaxFileLength(1024 * 100);
- kaha.setChecksumJournalFiles(true);
- BrokerService broker = createBroker(kaha);
- sendMessages(1000);
- broker.stop();
-
- // Modify/Corrupt some journal files..
- assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-4.log"));
- assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-8.log"));
-
- kaha = createStore(false);
- kaha.setJournalMaxFileLength(1024 * 100);
- kaha.setChecksumJournalFiles(true);
- kaha.setCheckForCorruptJournalFiles(true);
- assertFalse(kaha.isIgnoreMissingJournalfiles());
- try {
- broker = createBroker(kaha);
- fail("expected IOException");
- }
- catch (IOException e) {
- assertTrue(e.getMessage().startsWith("Detected missing/corrupt journal files"));
- }
-
- }
-
- public void testMigrationOnNewDefaultForChecksumJournalFiles() throws Exception {
- KahaDBStore kaha = createStore(true);
- kaha.setChecksumJournalFiles(false);
- assertFalse(kaha.isChecksumJournalFiles());
- assertFalse(kaha.isCheckForCorruptJournalFiles());
-
- kaha.setJournalMaxFileLength(1024 * 100);
- BrokerService broker = createBroker(kaha);
- sendMessages(1000);
- broker.stop();
-
- kaha = createStore(false);
- kaha.setJournalMaxFileLength(1024 * 100);
- kaha.setCheckForCorruptJournalFiles(true);
- assertFalse(kaha.isIgnoreMissingJournalfiles());
- createBroker(kaha);
- assertEquals(1000, receiveMessages());
- }
-
- private void assertExistsAndCorrupt(File file) throws IOException {
- assertTrue(file.exists());
- RandomAccessFile f = new RandomAccessFile(file, "rw");
- try {
- f.seek(1024 * 5 + 134);
- f.write("... corruption string ...".getBytes());
- }
- finally {
- f.close();
- }
- }
-
- public void testCheckCorruptionIgnored() throws Exception {
- KahaDBStore kaha = createStore(true);
- kaha.setJournalMaxFileLength(1024 * 100);
- BrokerService broker = createBroker(kaha);
- sendMessages(1000);
- broker.stop();
-
- // Delete some journal files..
- assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-4.log"));
- assertExistsAndCorrupt(new File(kaha.getDirectory(), "db-8.log"));
-
- kaha = createStore(false);
- kaha.setIgnoreMissingJournalfiles(true);
- kaha.setJournalMaxFileLength(1024 * 100);
- kaha.setCheckForCorruptJournalFiles(true);
- broker = createBroker(kaha);
-
- // We know we won't get all the messages but we should get most of them.
- int count = receiveMessages();
- assertTrue("Expected to received a min # of messages.. Got: " + count, count > 990);
- assertTrue(count < 1000);
-
- broker.stop();
- }
-
- private void assertExistsAndDelete(File file) {
- assertTrue(file.exists());
- file.delete();
- assertFalse(file.exists());
- }
-
- private void sendMessages(int count) throws JMSException {
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
- for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage(createContent(i)));
- }
- }
- finally {
- connection.close();
- }
- }
-
- private int receiveMessages() throws JMSException {
- int rc = 0;
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- try {
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST"));
- while (messageConsumer.receive(1000) != null) {
- rc++;
- }
- return rc;
- }
- finally {
- connection.close();
- }
- }
-
- private String createContent(int i) {
- StringBuilder sb = new StringBuilder(i + ":");
- while (sb.length() < 1024) {
- sb.append("*");
- }
- return sb.toString();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db-1.log
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db-1.log
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.data
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.data
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.redo
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion1/db.redo
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db-1.log
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.data
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion2/db.redo
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db-1.log
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db-1.log
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.data
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.data
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.redo
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion3/db.redo
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db-1.log
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db-1.log b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db-1.log
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.data
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.data b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.data
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.redo
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.redo b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersion4/db.redo
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
deleted file mode 100644
index e1b42ad..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.ProtectionDomain;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author chirino
- */
-public class KahaDBVersionTest extends TestCase {
-
- static String basedir;
-
- static {
- try {
- ProtectionDomain protectionDomain = KahaDBVersionTest.class.getProtectionDomain();
- basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalPath();
- }
- catch (IOException e) {
- basedir = ".";
- }
- }
-
- static final Logger LOG = LoggerFactory.getLogger(KahaDBVersionTest.class);
- final static File VERSION_1_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
- final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
- final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
- final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
-
- BrokerService broker = null;
-
- protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception {
- broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setPersistenceAdapter(kaha);
- broker.start();
- return broker;
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (broker != null) {
- broker.stop();
- }
- }
-
- public void XtestCreateStore() throws Exception {
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
- IOHelper.deleteFile(dir);
- kaha.setDirectory(dir);
- kaha.setJournalMaxFileLength(1024 * 1024);
- BrokerService broker = createBroker(kaha);
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- connection.setClientID("test");
- connection.start();
- producerSomeMessages(connection, 1000);
- connection.close();
- broker.stop();
- }
-
- private void producerSomeMessages(Connection connection, int numToSend) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("test.topic");
- Queue queue = session.createQueue("test.queue");
- MessageConsumer consumer = session.createDurableSubscriber(topic, "test");
- consumer.close();
- MessageProducer producer = session.createProducer(topic);
- producer.setPriority(9);
- for (int i = 0; i < numToSend; i++) {
- Message msg = session.createTextMessage("test message:" + i);
- producer.send(msg);
- }
- LOG.info("sent " + numToSend + " to topic");
- producer = session.createProducer(queue);
- for (int i = 0; i < numToSend; i++) {
- Message msg = session.createTextMessage("test message:" + i);
- producer.send(msg);
- }
- LOG.info("sent " + numToSend + " to queue");
- }
-
- public void testVersion1Conversion() throws Exception {
- doConvertRestartCycle(VERSION_1_DB);
- }
-
- public void testVersion2Conversion() throws Exception {
- doConvertRestartCycle(VERSION_2_DB);
- }
-
- public void testVersion3Conversion() throws Exception {
- doConvertRestartCycle(VERSION_3_DB);
- }
-
- public void testVersion4Conversion() throws Exception {
- doConvertRestartCycle(VERSION_4_DB);
- }
-
- public void doConvertRestartCycle(File existingStore) throws Exception {
-
- File testDir = new File("target/activemq-data/kahadb/versionDB");
- IOHelper.deleteFile(testDir);
- IOHelper.copyFile(existingStore, testDir);
- final int numToSend = 1000;
-
- // on repeat store will be upgraded
- for (int repeats = 0; repeats < 3; repeats++) {
- KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
- kaha.setDirectory(testDir);
- kaha.setJournalMaxFileLength(1024 * 1024);
- BrokerService broker = createBroker(kaha);
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
- Connection connection = cf.createConnection();
- connection.setClientID("test");
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("test.topic");
- Queue queue = session.createQueue("test.queue");
-
- if (repeats > 0) {
- // upgraded store will be empty so generated some more messages
- producerSomeMessages(connection, numToSend);
- }
-
- MessageConsumer queueConsumer = session.createConsumer(queue);
- int count = 0;
- for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
- TextMessage msg = (TextMessage) queueConsumer.receive(10000);
- count++;
- // System.err.println(msg.getText());
- assertNotNull(msg);
- }
- LOG.info("Consumed " + count + " from queue");
- count = 0;
- MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test");
- for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
- TextMessage msg = (TextMessage) topicConsumer.receive(10000);
- count++;
- // System.err.println(msg.getText());
- assertNotNull("" + count, msg);
- }
- LOG.info("Consumed " + count + " from topic");
- connection.close();
-
- broker.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java
deleted file mode 100644
index 30e79c9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/NoSpaceIOTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-import java.io.RandomAccessFile;
-import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Ignore;
-import org.junit.Test;
-
-public class NoSpaceIOTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(NoSpaceIOTest.class);
-
- // need an app to input to console in intellij idea
- public static void main(String[] args) throws Exception {
- new NoSpaceIOTest().testRunOutOfSpace();
- }
-
- // handy way to validate some out of space related errors with a usb key
- // allow it to run out of space, delete toDelete and see it recover
- @Ignore("needs small volume, like usb key")
- @Test
- public void testRunOutOfSpace() throws Exception {
- BrokerService broker = new BrokerService();
- File dataDir = new File("/Volumes/NO NAME/");
- File useUpSpace = new File(dataDir, "bigFile");
- if (!useUpSpace.exists()) {
- LOG.info("using up some space...");
- RandomAccessFile filler = new RandomAccessFile(useUpSpace, "rw");
- filler.setLength(1024 * 1024 * 1212); // use ~1.xG of 2G (usb) volume
- filler.close();
- File toDelete = new File(dataDir, "toDelete");
- filler = new RandomAccessFile(toDelete, "rw");
- filler.setLength(1024 * 1024 * 32 * 10); // 10 data files
- filler.close();
- }
- broker.setDataDirectoryFile(dataDir);
- broker.start();
- AtomicLong consumed = new AtomicLong(0);
- consume(consumed);
- LOG.info("consumed: " + consumed);
-
- broker.getPersistenceAdapter().checkpoint(true);
-
- AtomicLong sent = new AtomicLong(0);
- try {
- produce(sent, 200);
- }
- catch (Exception expected) {
- LOG.info("got ex, sent: " + sent);
- }
- LOG.info("sent: " + sent);
- System.out.println("Remove toDelete file and press any key to continue");
- int read = System.in.read();
- System.err.println("read:" + read);
-
- LOG.info("Trying to send again: " + sent);
- try {
- produce(sent, 200);
- }
- catch (Exception expected) {
- LOG.info("got ex, sent: " + sent);
- }
- LOG.info("sent: " + sent);
- }
-
- private void consume(AtomicLong consumed) throws JMSException {
- Connection c = new ActiveMQConnectionFactory("vm://localhost").createConnection();
- try {
- c.start();
- Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = s.createConsumer(new ActiveMQQueue("t"));
- while (consumer.receive(2000) != null) {
- consumed.incrementAndGet();
- }
- }
- finally {
- c.close();
- }
- }
-
- private void produce(AtomicLong sent, long toSend) throws JMSException {
- Connection c = new ActiveMQConnectionFactory("vm://localhost").createConnection();
- try {
- c.start();
- Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = s.createProducer(new ActiveMQQueue("t"));
- TextMessage m = s.createTextMessage();
- m.setText(String.valueOf(new char[1024 * 1024]));
- for (int i = 0; i < toSend; i++) {
- producer.send(m);
- sent.incrementAndGet();
- }
- }
- finally {
- c.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
deleted file mode 100644
index f225dee..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/PBMesssagesTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
-
-public class PBMesssagesTest extends TestCase {
-
- @SuppressWarnings("rawtypes")
- public void testKahaAddMessageCommand() throws IOException {
-
- KahaAddMessageCommand expected = new KahaAddMessageCommand();
- expected.setDestination(new KahaDestination().setName("Foo").setType(DestinationType.QUEUE));
- expected.setMessage(new Buffer(new byte[]{1, 2, 3, 4, 5, 6}));
- expected.setMessageId("Hello World");
-
- int size = expected.serializedSizeFramed();
- DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
- os.writeByte(expected.type().getNumber());
- expected.writeFramed(os);
- ByteSequence seq = os.toByteSequence();
-
- DataByteArrayInputStream is = new DataByteArrayInputStream(seq);
- KahaEntryType type = KahaEntryType.valueOf(is.readByte());
- JournalCommand message = (JournalCommand) type.createMessage();
- message.mergeFramed(is);
-
- assertEquals(expected, message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java
deleted file mode 100644
index 4316fc7..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/TempKahaDBStoreBrokerTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-
-import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerTest;
-
-/**
- * Once the wire format is completed we can test against real persistence storage.
- */
-public class TempKahaDBStoreBrokerTest extends BrokerTest {
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File("target/activemq-data/kahadb"));
- kaha.deleteAllMessages();
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- protected BrokerService createRestartedBroker() throws Exception {
- BrokerService broker = new BrokerService();
- TempKahaDBStore kaha = new TempKahaDBStore();
- kaha.setDirectory(new File("target/activemq-data/kahadb"));
- broker.setPersistenceAdapter(kaha);
- return broker;
- }
-
- public static Test suite() {
- return suite(TempKahaDBStoreBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java
deleted file mode 100644
index 1261959..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaBulkLoadingTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.perf;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.BytesMessage;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import junit.framework.Test;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.JmsTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This tests bulk loading and unloading of messages to a Queue.s
- */
-public class KahaBulkLoadingTest extends JmsTestSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(KahaBulkLoadingTest.class);
-
- protected int messageSize = 1024 * 4;
-
- @Override
- protected BrokerService createBroker() throws Exception {
- BrokerService broker = new BrokerService();
- KahaDBStore kaha = new KahaDBStore();
- kaha.setDirectory(new File("target/activemq-data/kahadb"));
- // kaha.deleteAllMessages();
- broker.setPersistenceAdapter(kaha);
- broker.addConnector("tcp://localhost:0");
- return broker;
- }
-
- @Override
- protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getServer().getConnectURI());
- factory.setUseAsyncSend(true);
- return factory;
- }
-
- public void testQueueSendThenAddConsumer() throws Exception {
- long start;
- long end;
- ActiveMQDestination destination = new ActiveMQQueue("TEST");
-
- connection.setUseCompression(false);
- connection.getPrefetchPolicy().setAll(10);
- connection.start();
-
- Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
-
- LOG.info("Receiving messages that are in the queue");
- MessageConsumer consumer = session.createConsumer(destination);
- BytesMessage msg = (BytesMessage) consumer.receive(2000);
- int consumed = 0;
- if (msg != null) {
- consumed++;
- }
- while (true) {
- int counter = 0;
- if (msg == null) {
- break;
- }
- end = start = System.currentTimeMillis();
- int size = 0;
- while ((end - start) < 5000) {
- msg = (BytesMessage) consumer.receive(5000);
- if (msg == null) {
- break;
- }
- counter++;
- consumed++;
- end = System.currentTimeMillis();
- size += msg.getBodyLength();
- }
- LOG.info("Consumed: " + (counter * 1000.0 / (end - start)) + " " + " messages/sec, " + (1.0 * size / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec ");
- }
- consumer.close();
- LOG.info("Consumed " + consumed + " messages from the queue.");
-
- MessageProducer producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- LOG.info("Sending messages that are " + (messageSize / 1024.0) + "k large");
- // Send a message to the broker.
- start = System.currentTimeMillis();
-
- final AtomicBoolean stop = new AtomicBoolean();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- stop.set(true);
- }
- });
-
- int produced = 0;
- while (!stop.get()) {
- end = start = System.currentTimeMillis();
- int produceCount = 0;
- while ((end - start) < 5000 && !stop.get()) {
- BytesMessage bm = session.createBytesMessage();
- bm.writeBytes(new byte[messageSize]);
- producer.send(bm);
- produceCount++;
- produced++;
- end = System.currentTimeMillis();
- }
- LOG.info("Produced: " + (produceCount * 1000.0 / (end - start)) + " messages/sec, " + (1.0 * produceCount * messageSize / (1024.0 * 1024.0)) * ((1000.0 / (end - start))) + " megs/sec");
- }
- LOG.info("Prodcued " + produced + " messages to the queue.");
-
- }
-
- public static Test suite() {
- return suite(KahaBulkLoadingTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java
deleted file mode 100644
index 5d52adb..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreDurableTopicTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.perf;
-
-import java.io.File;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.perf.SimpleDurableTopicTest;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-
-/**
- *
- */
-public class KahaStoreDurableTopicTest extends SimpleDurableTopicTest {
-
- @Override
- protected void configureBroker(BrokerService answer, String uri) throws Exception {
- File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
- dataFileDir.mkdirs();
- // answer.setDeleteAllMessagesOnStartup(true);
-
- KahaDBStore adaptor = new KahaDBStore();
- adaptor.setDirectory(dataFileDir);
-
- answer.setDataDirectoryFile(dataFileDir);
- answer.setPersistenceAdapter(adaptor);
- answer.addConnector(uri);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java
deleted file mode 100644
index a2898f9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/perf/KahaStoreQueueTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.perf;
-
-import java.io.File;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.perf.SimpleQueueTest;
-import org.apache.activemq.store.kahadb.KahaDBStore;
-
-/**
- *
- */
-public class KahaStoreQueueTest extends SimpleQueueTest {
-
- @Override
- protected void configureBroker(BrokerService answer, String uri) throws Exception {
- File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
- dataFileDir.mkdirs();
- answer.setDeleteAllMessagesOnStartup(true);
-
- KahaDBStore adaptor = new KahaDBStore();
- adaptor.setDirectory(dataFileDir);
-
- answer.setDataDirectoryFile(dataFileDir);
- answer.setPersistenceAdapter(adaptor);
- answer.addConnector(uri);
- }
-
-}
-