You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/07 22:28:34 UTC
[1/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5748
Repository: activemq
Updated Branches:
refs/heads/master 7a68ad5d9 -> 2b320ac06
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
new file mode 100644
index 0000000..398b2f7
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.store.MessageStore;
+import org.apache.commons.io.FileUtils;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
+ * compute the size of the messages in the store.
+ *
+ *
+ */
+public class MultiKahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest {
+
+
+ @Override
+ protected void createStore(boolean deleteAllMessages, String directory) throws Exception {
+ MultiKahaDBPersistenceAdapter multiStore = new MultiKahaDBPersistenceAdapter();
+
+ store = multiStore;
+ File fileDir = new File(directory);
+
+ if (deleteAllMessages && fileDir.exists()) {
+ FileUtils.cleanDirectory(new File(directory));
+ }
+
+ KahaDBPersistenceAdapter localStore = new KahaDBPersistenceAdapter();
+ localStore.setJournalMaxFileLength(1024 * 512);
+ localStore.setDirectory(new File(directory));
+
+ FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+ filtered.setPersistenceAdapter(localStore);
+ filtered.setPerDestination(true);
+ List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+ stores.add(filtered);
+
+ multiStore.setFilteredPersistenceAdapters(stores);
+ multiStore.setDirectory(fileDir);
+ multiStore.start();
+ messageStore = store.createQueueMessageStore(destination);
+ messageStore.start();
+ }
+
+ @Override
+ protected String getVersion5Dir() {
+ return "src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
new file mode 100644
index 0000000..755936c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.memory;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(MemoryMessageStoreSizeStatTest.class);
+
+ @Override
+ protected void initPersistence(BrokerService brokerService) throws IOException {
+ broker.setPersistent(false);
+ broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java
new file mode 100644
index 0000000..19a01ab
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.memory;
+
+import org.apache.activemq.store.AbstractMessageStoreSizeTest;
+import org.apache.activemq.store.MessageStore;
+
+public class MemoryMessageStoreSizeTest extends AbstractMessageStoreSizeTest {
+
+ MemoryMessageStore messageStore;
+
+ @Override
+ public void initStore() throws Exception {
+ messageStore = new MemoryMessageStore(destination);
+ messageStore.start();
+ }
+
+
+ @Override
+ public void destroyStore() throws Exception {
+ if (messageStore != null) {
+ messageStore.stop();
+ }
+ }
+
+ @Override
+ protected MessageStore getMessageStore() {
+ return messageStore;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log
new file mode 100644
index 0000000..18f34cf
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log differ
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data
new file mode 100644
index 0000000..8854b71
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data differ
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo
new file mode 100644
index 0000000..eaa3d6f
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo differ
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log
new file mode 100644
index 0000000..60d259b
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log differ
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data
new file mode 100644
index 0000000..721ec11
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data differ
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo
new file mode 100644
index 0000000..e7fe129
Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo differ
[2/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5748
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5748
Added a getMessageSize method to MessageStore to support retrieving the
total message size of all stored messages for a destination. Added a
new storeMessageSize statistic to DestinationStatistics.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/785b16bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/785b16bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/785b16bf
Branch: refs/heads/master
Commit: 785b16bf9ef19180e7c9783442f4a125b44255e1
Parents: 7a68ad5
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Apr 27 18:24:16 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 7 18:03:27 2015 +0000
----------------------------------------------------------------------
.../activemq/broker/jmx/DestinationView.java | 8 +
.../broker/jmx/DestinationViewMBean.java | 8 +
.../apache/activemq/broker/region/Queue.java | 1 +
.../apache/activemq/broker/region/Topic.java | 1 +
.../activemq/store/AbstractMessageStore.java | 21 ++
.../org/apache/activemq/store/MessageStore.java | 12 +
.../activemq/store/MessageStoreStatistics.java | 81 ++++++
.../activemq/store/ProxyMessageStore.java | 11 +
.../activemq/store/ProxyTopicMessageStore.java | 12 +-
.../store/memory/MemoryMessageStore.java | 43 ++-
.../activemq/management/SizeStatisticImpl.java | 17 ++
.../activemq/store/jdbc/JDBCMessageStore.java | 2 +
.../store/journal/JournalMessageStore.java | 15 +-
.../activemq/store/kahadb/KahaDBStore.java | 69 ++---
.../activemq/store/kahadb/MessageDatabase.java | 172 +++++++++++-
.../activemq/store/kahadb/TempKahaDBStore.java | 40 +--
.../kahadb/disk/util/LocationMarshaller.java | 5 +
.../apache/activemq/leveldb/LevelDBStore.scala | 2 +-
.../cursors/StoreQueueCursorOrderTest.java | 10 +-
.../store/AbstractMessageStoreSizeStatTest.java | 266 +++++++++++++++++++
.../store/AbstractMessageStoreSizeTest.java | 98 +++++++
.../AbstractKahaDBMessageStoreSizeTest.java | 147 ++++++++++
.../kahadb/KahaDBMessageStoreSizeStatTest.java | 82 ++++++
.../kahadb/KahaDBMessageStoreSizeTest.java | 46 ++++
.../MultiKahaDBMessageStoreSizeStatTest.java | 134 ++++++++++
.../kahadb/MultiKahaDBMessageStoreSizeTest.java | 68 +++++
.../memory/MemoryMessageStoreSizeStatTest.java | 45 ++++
.../memory/MemoryMessageStoreSizeTest.java | 45 ++++
.../kahadb/MessageStoreTest/version5/db-1.log | Bin 0 -> 524288 bytes
.../kahadb/MessageStoreTest/version5/db.data | Bin 0 -> 32768 bytes
.../kahadb/MessageStoreTest/version5/db.redo | Bin 0 -> 32824 bytes
.../version5/queue#3a#2f#2fTest/db-1.log | Bin 0 -> 524288 bytes
.../version5/queue#3a#2f#2fTest/db.data | Bin 0 -> 32768 bytes
.../version5/queue#3a#2f#2fTest/db.redo | Bin 0 -> 32824 bytes
34 files changed, 1382 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index b3bf869..3e51a49 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -38,6 +38,7 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
@@ -51,6 +52,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,6 +121,12 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getMessages().getCount();
}
+ @Override
+ public long getStoreMessageSize() {
+ MessageStore messageStore = destination.getMessageStore();
+ return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0;
+ }
+
public long getMessagesCached() {
return destination.getDestinationStatistics().getMessagesCached().getCount();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index 60340ff..aedc15d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -122,6 +122,14 @@ public interface DestinationViewMBean {
long getQueueSize();
/**
+ * Returns the memory size of all messages in this destination's store
+ *
+ * @return Returns the memory size of all messages in this destination's store
+ */
+ @MBeanInfo("The memory size of all messages in this destination's store.")
+ long getStoreMessageSize();
+
+ /**
* @return An array of all the messages in the destination's queue.
*/
@MBeanInfo("An array of all messages in the destination. Not HTML friendly.")
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index af61e19..c9823e1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -375,6 +375,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
messages.setMaxProducersToAudit(getMaxProducersToAudit());
messages.setUseCache(isUseCache());
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
+ store.start();
final int messageCount = store.getMessageCount();
if (messageCount > 0 && messages.isRecoveryRequired()) {
BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index bda000b..61c62ce 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -105,6 +105,7 @@ public class Topic extends BaseDestination implements Task {
// misleading metrics.
// int messageCount = store.getMessageCount();
// destinationStatistics.getMessages().setCount(messageCount);
+ store.start();
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
index faa6c1f..413f958 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
@@ -30,6 +30,7 @@ abstract public class AbstractMessageStore implements MessageStore {
protected final ActiveMQDestination destination;
protected boolean prioritizedMessages;
protected IndexListener indexListener;
+ protected final MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
public AbstractMessageStore(ActiveMQDestination destination) {
this.destination = destination;
@@ -41,6 +42,7 @@ abstract public class AbstractMessageStore implements MessageStore {
@Override
public void start() throws Exception {
+ recoverMessageStoreStatistics();
}
@Override
@@ -132,4 +134,23 @@ abstract public class AbstractMessageStore implements MessageStore {
static {
FUTURE = new InlineListenableFuture();
}
+
+ @Override
+ public int getMessageCount() throws IOException {
+ return (int) getMessageStoreStatistics().getMessageCount().getCount();
+ }
+
+ @Override
+ public long getMessageSize() throws IOException {
+ return getMessageStoreStatistics().getMessageSize().getTotalSize();
+ }
+
+ @Override
+ public MessageStoreStatistics getMessageStoreStatistics() {
+ return messageStoreStatistics;
+ }
+
+ protected void recoverMessageStoreStatistics() throws IOException {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
index 4cc472e..aee619a 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStore.java
@@ -159,6 +159,18 @@ public interface MessageStore extends Service {
int getMessageCount() throws IOException;
/**
+ * @return the size of the messages ready to deliver
+ * @throws IOException
+ */
+ long getMessageSize() throws IOException;
+
+
+ /**
+ * @return The statistics bean for this message store
+ */
+ MessageStoreStatistics getMessageStoreStatistics();
+
+ /**
* A hint to the Store to reset any batching state for the Destination
*
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java
new file mode 100644
index 0000000..0a2b021
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreStatistics.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store;
+
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.management.SizeStatisticImpl;
+import org.apache.activemq.management.StatsImpl;
+
+/**
+ * The J2EE Statistics for a Message Sore
+ */
+public class MessageStoreStatistics extends StatsImpl {
+
+ protected CountStatisticImpl messageCount;
+ protected SizeStatisticImpl messageSize;
+
+
+ public MessageStoreStatistics() {
+ this(true);
+ }
+
+ public MessageStoreStatistics(boolean enabled) {
+
+ messageCount = new CountStatisticImpl("messageCount", "The number of messages in the store passing through the destination");
+ messageSize = new SizeStatisticImpl("messageSize","Size of messages in the store passing through the destination");
+
+ addStatistic("messageCount", messageCount);
+ addStatistic("messageSize", messageSize);
+
+ this.setEnabled(enabled);
+ }
+
+
+ public CountStatisticImpl getMessageCount() {
+ return messageCount;
+ }
+
+ public SizeStatisticImpl getMessageSize() {
+ return messageSize;
+ }
+
+ public void reset() {
+ if (this.isDoReset()) {
+ super.reset();
+ messageCount.reset();
+ messageSize.reset();
+ }
+ }
+
+ public void setEnabled(boolean enabled) {
+ super.setEnabled(enabled);
+ messageCount.setEnabled(enabled);
+ messageSize.setEnabled(enabled);
+ }
+
+ public void setParent(MessageStoreStatistics parent) {
+ if (parent != null) {
+ messageCount.setParent(parent.messageCount);
+ messageSize.setParent(parent.messageSize);
+ } else {
+ messageCount.setParent(null);
+ messageSize.setParent(null);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
index c9b2060..cd319a6 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
@@ -101,6 +101,11 @@ public class ProxyMessageStore implements MessageStore {
}
@Override
+ public long getMessageSize() throws IOException {
+ return delegate.getMessageSize();
+ }
+
+ @Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
}
@@ -169,4 +174,10 @@ public class ProxyMessageStore implements MessageStore {
public String toString() {
return delegate.toString();
}
+
+ @Override
+ public MessageStoreStatistics getMessageStoreStatistics() {
+ return delegate.getMessageStoreStatistics();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 0f47f61..5c59158 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -17,7 +17,6 @@
package org.apache.activemq.store;
import java.io.IOException;
-import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -146,6 +145,11 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
}
@Override
+ public long getMessageSize() throws IOException {
+ return delegate.getMessageSize();
+ }
+
+ @Override
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
delegate.recoverNextMessages(maxReturned, listener);
}
@@ -213,4 +217,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public void registerIndexListener(IndexListener indexListener) {
delegate.registerIndexListener(indexListener);
}
+
+ @Override
+ public MessageStoreStatistics getMessageStoreStatistics() {
+ return delegate.getMessageStoreStatistics();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 7cdaa78..e71dab8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -35,8 +35,8 @@ import org.apache.activemq.store.AbstractMessageStore;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which
* uses a
- *
- *
+ *
+ *
*/
public class MemoryMessageStore extends AbstractMessageStore {
@@ -56,6 +56,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
synchronized (messageTable) {
messageTable.put(message.getMessageId(), message);
+ getMessageStoreStatistics().getMessageCount().increment();
+ getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
}
message.incrementReferenceCount();
message.getMessageId().setFutureOrSequenceLong(sequenceId++);
@@ -93,6 +95,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
lastBatchId = null;
}
+ getMessageStoreStatistics().getMessageCount().decrement();
+ getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
}
}
@@ -114,20 +118,17 @@ public class MemoryMessageStore extends AbstractMessageStore {
public void removeAllMessages(ConnectionContext context) throws IOException {
synchronized (messageTable) {
messageTable.clear();
+ getMessageStoreStatistics().reset();
}
}
public void delete() {
synchronized (messageTable) {
messageTable.clear();
+ getMessageStoreStatistics().reset();
}
}
-
- public int getMessageCount() {
- return messageTable.size();
- }
-
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
synchronized (messageTable) {
boolean pastLackBatch = lastBatchId == null;
@@ -161,8 +162,34 @@ public class MemoryMessageStore extends AbstractMessageStore {
public void updateMessage(Message message) {
synchronized (messageTable) {
+ Message original = messageTable.get(message.getMessageId());
+
+ //if can't be found then increment count, else remove old size
+ if (original == null) {
+ getMessageStoreStatistics().getMessageCount().increment();
+ } else {
+ getMessageStoreStatistics().getMessageSize().addSize(-original.getSize());
+ }
messageTable.put(message.getMessageId(), message);
+ getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
}
}
-
+
+ @Override
+ public void recoverMessageStoreStatistics() throws IOException {
+ synchronized (messageTable) {
+ long size = 0;
+ int count = 0;
+ for (Iterator<Message> iter = messageTable.values().iterator(); iter
+ .hasNext();) {
+ Message msg = iter.next();
+ size += msg.getSize();
+ }
+
+ getMessageStoreStatistics().reset();
+ getMessageStoreStatistics().getMessageCount().setCount(count);
+ getMessageStoreStatistics().getMessageSize().setTotalSize(size);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
index 1cf0058..e2bc033 100644
--- a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
+++ b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
@@ -68,6 +68,23 @@ public class SizeStatisticImpl extends StatisticImpl {
}
/**
+ * Reset the total size to the new value
+ *
+ * @param size
+ */
+ public synchronized void setTotalSize(long size) {
+ count++;
+ totalSize = size;
+ if (size > maxSize) {
+ maxSize = size;
+ }
+ if (size < minSize || minSize == 0) {
+ minSize = size;
+ }
+ updateSampleTime();
+ }
+
+ /**
* @return the maximum size of any step
*/
public long getMaxSize() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 4674d7a..ac4e8ce 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -304,6 +304,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
+ @Override
public int getMessageCount() throws IOException {
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
@@ -401,4 +402,5 @@ public class JDBCMessageStore extends AbstractMessageStore {
public String toString() {
return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
index 2d44769..7ec10c4 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
@@ -48,8 +48,8 @@ import org.slf4j.LoggerFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
- *
- *
+ *
+ *
*/
public class JournalMessageStore extends AbstractMessageStore {
@@ -79,7 +79,7 @@ public class JournalMessageStore extends AbstractMessageStore {
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
}
-
+
public void setMemoryUsage(MemoryUsage memoryUsage) {
this.memoryUsage=memoryUsage;
longTermStore.setMemoryUsage(memoryUsage);
@@ -323,7 +323,7 @@ public class JournalMessageStore extends AbstractMessageStore {
}
/**
- *
+ *
*/
public Message getMessage(MessageId identity) throws IOException {
Message answer = null;
@@ -348,7 +348,7 @@ public class JournalMessageStore extends AbstractMessageStore {
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
- *
+ *
* @param listener
* @throws Exception
*/
@@ -404,6 +404,11 @@ public class JournalMessageStore extends AbstractMessageStore {
return longTermStore.getMessageCount();
}
+ public long getMessageSize() throws IOException {
+ peristenceAdapter.checkpoint(true, true);
+ return longTermStore.getMessageSize();
+ }
+
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
peristenceAdapter.checkpoint(true, true);
longTermStore.recoverNextMessages(maxReturned, listener);
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 8ceef36..44f93a6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -61,6 +61,7 @@ import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.ListenableFuture;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer;
@@ -504,34 +505,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
@Override
- public int getMessageCount() throws IOException {
- try {
- lockAsyncJobQueue();
- indexLock.writeLock().lock();
- try {
- return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
- @Override
- public Integer execute(Transaction tx) throws IOException {
- // Iterate through all index entries to get a count
- // of messages in the destination.
- StoredDestination sd = getStoredDestination(dest, tx);
- int rc = 0;
- for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
- iterator.next();
- rc++;
- }
- return rc;
- }
- });
- } finally {
- indexLock.writeLock().unlock();
- }
- } finally {
- unlockAsyncJobQueue();
- }
- }
-
- @Override
public boolean isEmpty() throws IOException {
indexLock.writeLock().lock();
try {
@@ -716,6 +689,38 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public String toString(){
return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
}
+
+ @Override
+ protected void recoverMessageStoreStatistics() throws IOException {
+ try {
+ MessageStoreStatistics recoveredStatistics;
+ lockAsyncJobQueue();
+ indexLock.writeLock().lock();
+ try {
+ recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
+ @Override
+ public MessageStoreStatistics execute(Transaction tx) throws IOException {
+ MessageStoreStatistics statistics = new MessageStoreStatistics();
+
+ // Iterate through all index entries to get the size of each message
+ StoredDestination sd = getStoredDestination(dest, tx);
+ for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+ int locationSize = iterator.next().getKey().getSize();
+ statistics.getMessageCount().increment();
+ statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
+ }
+ return statistics;
+ }
+ });
+ getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
+ getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
+ } finally {
+ indexLock.writeLock().unlock();
+ }
+ } finally {
+ unlockAsyncJobQueue();
+ }
+ }
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
@@ -993,12 +998,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- return this.transactionStore.proxy(new KahaDBMessageStore(destination));
+ MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination));
+ storeCache.put(key(convert(destination)), store);
+ return store;
}
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
- return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
+ TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
+ storeCache.put(key(convert(destination)), store);
+ return store;
}
/**
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index ef8fe0a..e35619e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -46,6 +46,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -53,10 +54,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
@@ -113,7 +119,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
- static final int VERSION = 5;
+ static final int VERSION = 6;
protected class Metadata {
protected Page<Metadata> page;
@@ -738,7 +744,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long undoCounter=0;
// Go through all the destinations to see if they have messages past the lastAppendLocation
- for (StoredDestination sd : storedDestinations.values()) {
+ for (String key : storedDestinations.keySet()) {
+ StoredDestination sd = storedDestinations.get(key);
final ArrayList<Long> matches = new ArrayList<Long>();
// Find all the Locations that are >= than the last Append Location.
@@ -755,6 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
metadata.producerSequenceIdTracker.rollback(keys.messageId);
undoCounter++;
+ decrementAndSubSizeToStoreStat(key, keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
}
@@ -858,6 +866,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
undoCounter++;
+ decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
} else {
@@ -1312,6 +1321,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (previous == null) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if (previous == null) {
+ incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
addAckLocationForNewMessage(tx, sd, id);
@@ -1337,7 +1347,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// record this id in any event, initial send or recovery
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
- return id;
+
+ return id;
}
void trackPendingAdd(KahaDestination destination, Long seq) {
@@ -1367,9 +1378,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
new MessageKeys(command.getMessageId(), location)
);
sd.locationIndex.put(tx, location, id);
+ incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
// on first update previous is original location, on recovery/replay it may be the updated location
if(previousKeys != null && !previousKeys.location.equals(location)) {
sd.locationIndex.remove(tx, previousKeys.location);
+ decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
}
metadata.lastUpdate = location;
} else {
@@ -1387,6 +1400,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
+ decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
recordAckMessageReferenceLocation(ackLocation, keys.location);
metadata.lastUpdate = ackLocation;
} else if (LOG.isDebugEnabled()) {
@@ -1414,7 +1428,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
recordAckMessageReferenceLocation(ackLocation, keys.location);
}
// The following method handles deleting un-referenced messages.
- removeAckLocation(tx, sd, subscriptionKey, sequence);
+ removeAckLocation(command, tx, sd, subscriptionKey, sequence);
metadata.lastUpdate = ackLocation;
} else if (LOG.isDebugEnabled()) {
LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
@@ -1470,6 +1484,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
String key = key(command.getDestination());
storedDestinations.remove(key);
metadata.destinations.remove(tx, key);
+ clearStoreStats(command.getDestination());
+ storeCache.remove(key(command.getDestination()));
}
void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
@@ -1494,13 +1510,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.subLocations.remove(tx, subscriptionKey);
sd.subscriptionAcks.remove(tx, subscriptionKey);
sd.subscriptionCache.remove(subscriptionKey);
- removeAckLocationsForSub(tx, sd, subscriptionKey);
+ removeAckLocationsForSub(command, tx, sd, subscriptionKey);
if (sd.subscriptions.isEmpty(tx)) {
// remove the stored destination
KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
removeDestinationCommand.setDestination(command.getDestination());
updateIndex(tx, removeDestinationCommand, null);
+ clearStoreStats(command.getDestination());
}
}
}
@@ -1879,6 +1896,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
+
class StoredDestination {
MessageOrderIndex orderIndex = new MessageOrderIndex();
@@ -1912,6 +1930,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
+ final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
+
@Override
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
final StoredDestination value = new StoredDestination();
@@ -1996,12 +2016,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void execute(Transaction tx) throws IOException {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+ value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
value.orderIndex.lowPriorityIndex.load(tx);
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+ value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
value.orderIndex.highPriorityIndex.load(tx);
}
});
@@ -2100,7 +2120,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Figure out the next key using the last entry in the destination.
rc.orderIndex.configureLast(tx);
- rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
+ rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
rc.locationIndex.load(tx);
@@ -2202,6 +2222,133 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return rc;
}
+ /**
+ * Clear the counter for the destination, if one exists.
+ *
+ * @param kahaDestination
+ */
+ protected void clearStoreStats(KahaDestination kahaDestination) {
+ MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination));
+ if (storeStats != null) {
+ storeStats.reset();
+ }
+ }
+
+ /**
+ * Update MessageStoreStatistics
+ *
+ * @param kahaDestination
+ * @param size
+ */
+ protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
+ incrementAndAddSizeToStoreStat(key(kahaDestination), size);
+ }
+
+ protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
+ MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
+ if (storeStats != null) {
+ storeStats.getMessageCount().increment();
+ if (size > 0) {
+ storeStats.getMessageSize().addSize(size);
+ }
+ }
+ }
+
+ protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
+ decrementAndSubSizeToStoreStat(key(kahaDestination), size);
+ }
+
+ protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
+ MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
+ if (storeStats != null) {
+ storeStats.getMessageCount().decrement();
+ if (size > 0) {
+ storeStats.getMessageSize().addSize(-size);
+ }
+ }
+ }
+
+ /**
+ * This is a map to cache DestinationStatistics for a specific
+ * KahaDestination key
+ */
+ protected final Map<String, MessageStore> storeCache =
+ new ConcurrentHashMap<String, MessageStore>();
+
+ /**
+ * Locate the storeMessageSize counter for this KahaDestination
+ * @param kahaDestination
+ * @return
+ */
+ protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
+ MessageStoreStatistics storeStats = null;
+ try {
+ MessageStore messageStore = storeCache.get(kahaDestKey);
+ if (messageStore != null) {
+ storeStats = messageStore.getMessageStoreStatistics();
+ }
+ } catch (Exception e1) {
+ LOG.error("Getting size counter of destination failed", e1);
+ }
+
+ return storeStats;
+ }
+
+ /**
+ * Determine whether this Destination matches the DestinationType
+ *
+ * @param destination
+ * @param type
+ * @return
+ */
+ protected boolean matchType(Destination destination,
+ KahaDestination.DestinationType type) {
+ if (destination instanceof Topic
+ && type.equals(KahaDestination.DestinationType.TOPIC)) {
+ return true;
+ } else if (destination instanceof Queue
+ && type.equals(KahaDestination.DestinationType.QUEUE)) {
+ return true;
+ }
+ return false;
+ }
+
+ class LocationSizeMarshaller implements Marshaller<Location> {
+
+ public LocationSizeMarshaller() {
+
+ }
+
+ public Location readPayload(DataInput dataIn) throws IOException {
+ Location rc = new Location();
+ rc.setDataFileId(dataIn.readInt());
+ rc.setOffset(dataIn.readInt());
+ if (metadata.version >= 6) {
+ rc.setSize(dataIn.readInt());
+ }
+ return rc;
+ }
+
+ public void writePayload(Location object, DataOutput dataOut)
+ throws IOException {
+ dataOut.writeInt(object.getDataFileId());
+ dataOut.writeInt(object.getOffset());
+ dataOut.writeInt(object.getSize());
+ }
+
+ public int getFixedSize() {
+ return 12;
+ }
+
+ public Location deepCopy(Location source) {
+ return new Location(source);
+ }
+
+ public boolean isDeepCopySupported() {
+ return true;
+ }
+ }
+
private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
if (sequences == null) {
@@ -2269,7 +2416,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
- private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
+ private void removeAckLocationsForSub(KahaSubscriptionCommand command,
+ Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
if (!sd.ackPositions.isEmpty(tx)) {
SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
if (sequences == null || sequences.isEmpty()) {
@@ -2302,6 +2450,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
+ decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
}
}
}
@@ -2314,7 +2463,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* @param messageSequence
* @throws IOException
*/
- private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
+ private void removeAckLocation(KahaRemoveMessageCommand command,
+ Transaction tx, StoredDestination sd, String subscriptionKey,
+ Long messageSequence) throws IOException {
// Remove the sub from the previous location set..
if (messageSequence != null) {
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
@@ -2347,6 +2498,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
+ decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 45e35c6..04d74b6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -198,25 +198,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
@Override
- public int getMessageCount() throws IOException {
- synchronized(indexMutex) {
- return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
- @Override
- public Integer execute(Transaction tx) throws IOException {
- // Iterate through all index entries to get a count of messages in the destination.
- StoredDestination sd = getStoredDestination(dest, tx);
- int rc=0;
- for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
- iterator.next();
- rc++;
- }
- return rc;
- }
- });
- }
- }
-
- @Override
public void recover(final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
@@ -297,6 +278,27 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
public void stop() throws Exception {
}
+ @Override
+ public void recoverMessageStoreStatistics() throws IOException {
+ int count = 0;
+ synchronized(indexMutex) {
+ count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+ @Override
+ public Integer execute(Transaction tx) throws IOException {
+ // Iterate through all index entries to get a count of messages in the destination.
+ StoredDestination sd = getStoredDestination(dest, tx);
+ int rc=0;
+ for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
+ iterator.next();
+ rc++;
+ }
+ return rc;
+ }
+ });
+ }
+ getMessageStoreStatistics().getMessageCount().setCount(count);
+ }
+
}
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
index 7826a0b..e859f9c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/LocationMarshaller.java
@@ -22,8 +22,13 @@ import java.io.IOException;
import org.apache.activemq.store.kahadb.disk.journal.Location;
public class LocationMarshaller implements Marshaller<Location> {
+
public final static LocationMarshaller INSTANCE = new LocationMarshaller();
+ public LocationMarshaller () {
+
+ }
+
public Location readPayload(DataInput dataIn) throws IOException {
Location rc = new Location();
rc.setDataFileId(dataIn.readInt());
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 49e8cfa..7c2d327 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -834,7 +834,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
cursorPosition = cursorResetPosition
}
- def getMessageCount: Int = {
+ override def getMessageCount: Int = {
return db.collectionSize(key).toInt
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index f8fab10..90b8428 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -491,11 +491,6 @@ public class StoreQueueCursorOrderTest {
}
@Override
- public int getMessageCount() throws IOException {
- return 0;
- }
-
- @Override
public void resetBatching() {
}
@@ -513,5 +508,10 @@ public class StoreQueueCursorOrderTest {
batch.incrementAndGet();
}
+ @Override
+ public void recoverMessageStoreStatistics() throws IOException {
+ this.getMessageStoreStatistics().reset();
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
new file mode 100644
index 0000000..59ae44b
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public abstract class AbstractMessageStoreSizeStatTest {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(AbstractMessageStoreSizeStatTest.class);
+
+
+ protected BrokerService broker;
+ protected URI brokerConnectURI;
+ protected String defaultQueueName = "test.queue";
+ protected static int messageSize = 1000;
+
+ @Before
+ public void startBroker() throws Exception {
+ setUpBroker(true);
+ }
+
+ protected void setUpBroker(boolean clearDataDir) throws Exception {
+
+ broker = new BrokerService();
+ this.initPersistence(broker);
+ //set up a transport
+ TransportConnector connector = broker
+ .addConnector(new TransportConnector());
+ connector.setUri(new URI("tcp://0.0.0.0:0"));
+ connector.setName("tcp");
+
+ broker.start();
+ broker.waitUntilStarted();
+ brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
+
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ protected abstract void initPersistence(BrokerService brokerService) throws IOException;
+
+ @Test
+ public void testMessageSize() throws Exception {
+ Destination dest = publishTestMessages(200);
+ verifyStats(dest, 200, 200 * messageSize);
+ }
+
+ @Test
+ public void testMessageSizeAfterConsumption() throws Exception {
+
+ Destination dest = publishTestMessages(200);
+ verifyStats(dest, 200, 200 * messageSize);
+
+ consumeTestMessages();
+ Thread.sleep(3000);
+ verifyStats(dest, 0, 0);
+ }
+
+ @Test
+ public void testMessageSizeDurable() throws Exception {
+
+ Destination dest = publishTestMessagesDurable();
+
+ //verify the count and size
+ verifyStats(dest, 200, 200 * messageSize);
+
+ }
+
+ @Test
+ public void testMessageSizeAfterDestinationDeletion() throws Exception {
+ Destination dest = publishTestMessages(200);
+ verifyStats(dest, 200, 200 * messageSize);
+
+ //check that the size is 0 after deletion
+ broker.removeDestination(dest.getActiveMQDestination());
+ verifyStats(dest, 0, 0);
+ }
+
+ protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception {
+ MessageStore messageStore = dest.getMessageStore();
+ MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
+ assertEquals(messageStore.getMessageCount(), count);
+ assertEquals(messageStore.getMessageCount(),
+ storeStats.getMessageCount().getCount());
+ assertEquals(messageStore.getMessageSize(),
+ messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
+ if (count > 0) {
+ assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
+ } else {
+ assertEquals(storeStats.getMessageSize().getTotalSize(), 0);
+ }
+ }
+
+ /**
+ * Generate random 1 megabyte messages
+ * @param session
+ * @return
+ * @throws JMSException
+ */
+ protected BytesMessage createMessage(Session session) throws JMSException {
+ final BytesMessage message = session.createBytesMessage();
+ final byte[] data = new byte[messageSize];
+ final Random rng = new Random();
+ rng.nextBytes(data);
+ message.writeBytes(data);
+ return message;
+ }
+
+
+ protected Destination publishTestMessages(int count) throws Exception {
+ return publishTestMessages(count, defaultQueueName);
+ }
+
+ protected Destination publishTestMessages(int count, String queueName) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+ queueName);
+
+ Destination dest = broker.getDestination(activeMqQueue);
+
+ // Start the connection
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
+ .createConnection();
+ connection.setClientID("clientId" + queueName);
+ connection.start();
+ Session session = connection.createSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+
+ try {
+ // publish a bunch of non-persistent messages to fill up the temp
+ // store
+ MessageProducer prod = session.createProducer(queue);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ for (int i = 0; i < count; i++) {
+ prod.send(createMessage(session));
+ }
+
+ } finally {
+ connection.stop();
+ }
+
+ return dest;
+ }
+
+ protected Destination consumeTestMessages() throws Exception {
+ return consumeTestMessages(defaultQueueName);
+ }
+
+ protected Destination consumeTestMessages(String queueName) throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
+ queueName);
+
+ Destination dest = broker.getDestination(activeMqQueue);
+
+ // Start the connection
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
+ .createConnection();
+ connection.setClientID("clientId2" + queueName);
+ connection.start();
+ Session session = connection.createSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+
+ try {
+ MessageConsumer consumer = session.createConsumer(queue);
+ for (int i = 0; i < 200; i++) {
+ consumer.receive();
+ }
+
+ } finally {
+ connection.stop();
+ }
+
+ return dest;
+ }
+
+ protected Destination publishTestMessagesDurable() throws Exception {
+ // create a new queue
+ final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
+ "test.topic");
+
+ Destination dest = broker.getDestination(activeMqTopic);
+
+ // Start the connection
+ Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
+ .createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+ Session session = connection.createSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("test.topic");
+ session.createDurableSubscriber(topic, "sub1");
+
+ try {
+ // publish a bunch of non-persistent messages to fill up the temp
+ // store
+ MessageProducer prod = session.createProducer(topic);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ for (int i = 0; i < 200; i++) {
+ prod.send(createMessage(session));
+ }
+
+ } finally {
+ connection.stop();
+ }
+
+ return dest;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
new file mode 100644
index 0000000..923bc82
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Random;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
+ * compute the size of the messages in the store.
+ *
+ */
+public abstract class AbstractMessageStoreSizeTest {
+
+ protected static final IdGenerator id = new IdGenerator();
+ protected ActiveMQQueue destination = new ActiveMQQueue("Test");
+ protected ProducerId producerId = new ProducerId("1.1.1");
+ protected static final int MESSAGE_COUNT = 20;
+ protected static String dataDirectory = "target/test-amq-5748/datadb";
+ protected static int testMessageSize = 1000;
+
+ @Before
+ public void init() throws Exception {
+ this.initStore();
+ }
+
+ @After
+ public void destroy() throws Exception {
+ this.destroyStore();
+ }
+
+ protected abstract void initStore() throws Exception;
+
+
+ protected abstract void destroyStore() throws Exception;
+
+
+ /**
+ * This method tests that the message size exists after writing a bunch of messages to the store.
+ * @throws Exception
+ */
+ @Test
+ public void testMessageSize() throws Exception {
+ writeMessages();
+ long messageSize = getMessageStore().getMessageSize();
+ assertTrue(getMessageStore().getMessageCount() == 20);
+ assertTrue(messageSize > 20 * testMessageSize);
+ }
+
+
+ /**
+ * Write random byte messages to the store for testing.
+ *
+ * @throws Exception
+ */
+ protected void writeMessages() throws Exception {
+ final ConnectionContext context = new ConnectionContext();
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ ActiveMQMessage message = new ActiveMQMessage();
+ final byte[] data = new byte[testMessageSize];
+ final Random rng = new Random();
+ rng.nextBytes(data);
+ message.setContent(new ByteSequence(data));
+ message.setDestination(destination);
+ message.setMessageId(new MessageId(id.generateId() + ":1", i));
+ getMessageStore().addMessage(context, message);
+ }
+ }
+
+ protected abstract MessageStore getMessageStore();
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
new file mode 100644
index 0000000..7d53cbd
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+
+import org.apache.activemq.store.AbstractMessageStoreSizeTest;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.junit.Test;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
+ * compute the size of the messages in the store.
+ *
+ * For KahaDB specifically, the size was not being stored in in the index ({@link LocationMarshaller}). LocationMarshaller
+ * has been updated to include an option to include the size in the serialized value. This way the message
+ * size will be persisted in the index and be available between broker restarts without needing to rebuild the index.
+ * Note that the KahaDB version has been incremented from 5 to 6 because the index will need to be rebuild when a version
+ * 5 index is detected since it will be detected as corrupt.
+ *
+ */
+public abstract class AbstractKahaDBMessageStoreSizeTest extends AbstractMessageStoreSizeTest {
+
+ MessageStore messageStore;
+ PersistenceAdapter store;
+
+ @Override
+ public void initStore() throws Exception {
+ createStore(true, dataDirectory);
+ }
+
+ abstract protected void createStore(boolean deleteAllMessages, String directory) throws Exception;
+
+ abstract protected String getVersion5Dir();
+
+ @Override
+ public void destroyStore() throws Exception {
+ if (store != null) {
+ store.stop();
+ }
+ }
+
+
+ /**
+ * This method tests that the message sizes exist for all messages that exist after messages are recovered
+ * off of disk.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessageSizeStoreRecovery() throws Exception {
+ writeMessages();
+ store.stop();
+
+ createStore(false, dataDirectory);
+ writeMessages();
+ long messageSize = messageStore.getMessageSize();
+ assertEquals(40, messageStore.getMessageCount());
+ assertTrue(messageSize > 40 * testMessageSize);
+ }
+
+ /**
+ * This method tests that a version 5 store with an old index still works but returns 0 for messgage sizes.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessageSizeStoreRecoveryVersion5() throws Exception {
+ store.stop();
+
+ //Copy over an existing version 5 store with messages
+ File dataDir = new File(dataDirectory);
+ if (dataDir.exists())
+ FileUtils.deleteDirectory(new File(dataDirectory));
+ FileUtils.copyDirectory(new File(getVersion5Dir()),
+ dataDir);
+
+ //reload store
+ createStore(false, dataDirectory);
+
+ //make sure size is 0
+ long messageSize = messageStore.getMessageSize();
+ assertTrue(messageStore.getMessageCount() == 20);
+ assertTrue(messageSize == 0);
+
+
+ }
+
+ /**
+ * This method tests that a version 5 store with existing messages will correctly be recovered and converted
+ * to version 6. After index deletion, the index will be rebuilt and will include message sizes.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessageSizeStoreRecoveryVersion5RebuildIndex() throws Exception {
+ store.stop();
+
+ //Copy over an existing version 5 store with messages
+ File dataDir = new File(dataDirectory);
+ if (dataDir.exists())
+ FileUtils.deleteDirectory(new File(dataDirectory));
+ FileUtils.copyDirectory(new File(getVersion5Dir()),
+ dataDir);
+ for (File index : FileUtils.listFiles(new File(dataDirectory), new WildcardFileFilter("*.data"), TrueFileFilter.INSTANCE)) {
+ FileUtils.deleteQuietly(index);
+ }
+
+ //append more messages...at this point the index should be rebuilt
+ createStore(false, dataDirectory);
+ writeMessages();
+
+ //after writing new messages to the existing store, make sure the index is rebuilt and size is correct
+ long messageSize = messageStore.getMessageSize();
+ assertTrue(messageStore.getMessageCount() == 40);
+ assertTrue(messageSize > 40 * testMessageSize);
+
+ }
+
+ @Override
+ protected MessageStore getMessageStore() {
+ return messageStore;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
new file mode 100644
index 0000000..bb46f20
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize
+ * statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public class KahaDBMessageStoreSizeStatTest extends
+ AbstractMessageStoreSizeStatTest {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(KahaDBMessageStoreSizeStatTest.class);
+
+ File dataFileDir = new File("target/test-amq-5748/stat-datadb");
+
+ @Override
+ protected void setUpBroker(boolean clearDataDir) throws Exception {
+ if (clearDataDir && dataFileDir.exists())
+ FileUtils.cleanDirectory(dataFileDir);
+ super.setUpBroker(clearDataDir);
+ }
+
+ @Override
+ protected void initPersistence(BrokerService brokerService)
+ throws IOException {
+ broker.setPersistent(true);
+ broker.setDataDirectoryFile(dataFileDir);
+ }
+
+ /**
+ * Test that the the counter restores size and works after restart and more
+ * messages are published
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessageSizeAfterRestartAndPublish() throws Exception {
+
+ Destination dest = publishTestMessages(200);
+
+ // verify the count and size
+ verifyStats(dest, 200, 200 * messageSize);
+
+ // stop, restart broker and publish more messages
+ stopBroker();
+ this.setUpBroker(false);
+ dest = publishTestMessages(200);
+
+ // verify the count and size
+ verifyStats(dest, 400, 400 * messageSize);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
new file mode 100644
index 0000000..43dc2f6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+
+import org.apache.activemq.store.MessageStore;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
+ * compute the size of the messages in the KahaDB Store.
+ *
+ */
+public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest {
+
+ @Override
+ protected void createStore(boolean deleteAllMessages, String directory) throws Exception {
+ KahaDBStore kahaDBStore = new KahaDBStore();
+ store = kahaDBStore;
+ kahaDBStore.setJournalMaxFileLength(1024 * 512);
+ kahaDBStore.setDeleteAllMessages(deleteAllMessages);
+ kahaDBStore.setDirectory(new File(directory));
+ kahaDBStore.start();
+ messageStore = store.createQueueMessageStore(destination);
+ messageStore.start();
+ }
+
+ @Override
+ protected String getVersion5Dir() {
+ return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
new file mode 100644
index 0000000..4342e1d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test checks that KahaDB properly sets the new storeMessageSize
+ * statistic.
+ *
+ * AMQ-5748
+ *
+ */
+public class MultiKahaDBMessageStoreSizeStatTest extends
+ AbstractMessageStoreSizeStatTest {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
+
+ File dataFileDir = new File("target/test-amq-5748/stat-datadb");
+
+ @Override
+ protected void setUpBroker(boolean clearDataDir) throws Exception {
+ if (clearDataDir && dataFileDir.exists())
+ FileUtils.cleanDirectory(dataFileDir);
+ super.setUpBroker(clearDataDir);
+ }
+
+ @Override
+ protected void initPersistence(BrokerService brokerService)
+ throws IOException {
+ broker.setPersistent(true);
+
+ //setup multi-kaha adapter
+ MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
+ persistenceAdapter.setDirectory(dataFileDir);
+
+ KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+ kahaStore.setJournalMaxFileLength(1024 * 512);
+
+ //set up a store per destination
+ FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+ filtered.setPersistenceAdapter(kahaStore);
+ filtered.setPerDestination(true);
+ List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
+ stores.add(filtered);
+
+ persistenceAdapter.setFilteredPersistenceAdapters(stores);
+ broker.setPersistenceAdapter(persistenceAdapter);
+ }
+
+ /**
+ * Test that the the counter restores size and works after restart and more
+ * messages are published
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testMessageSizeAfterRestartAndPublish() throws Exception {
+
+ Destination dest = publishTestMessages(200);
+
+ // verify the count and size
+ verifyStats(dest, 200, 200 * messageSize);
+
+ // stop, restart broker and publish more messages
+ stopBroker();
+ this.setUpBroker(false);
+ dest = publishTestMessages(200);
+
+ // verify the count and size
+ verifyStats(dest, 400, 400 * messageSize);
+
+ }
+
+ @Test
+ public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
+
+ Destination dest = publishTestMessages(200);
+
+ // verify the count and size
+ verifyStats(dest, 200, 200 * messageSize);
+ assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
+
+ Destination dest2 = publishTestMessages(200, "test.queue2");
+
+ // verify the count and size
+ verifyStats(dest2, 200, 200 * messageSize);
+ assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize);
+
+ // stop, restart broker and publish more messages
+ stopBroker();
+ this.setUpBroker(false);
+ dest = publishTestMessages(200);
+ dest2 = publishTestMessages(200, "test.queue2");
+
+ // verify the count and size after publishing messages
+ verifyStats(dest, 400, 400 * messageSize);
+ verifyStats(dest2, 400, 400 * messageSize);
+
+ System.out.println(broker.getPersistenceAdapter().size());
+ assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize);
+ assertTrue(broker.getPersistenceAdapter().size() >=
+ (dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize()));
+
+ }
+
+}
[3/3] activemq git commit: This closes #92
Posted by ta...@apache.org.
This closes #92
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2b320ac0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2b320ac0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2b320ac0
Branch: refs/heads/master
Commit: 2b320ac0656138d92b5e4f7ff135814d033d47e5
Parents: 7a68ad5 785b16b
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jul 7 16:17:14 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jul 7 16:17:14 2015 -0400
----------------------------------------------------------------------
.../activemq/broker/jmx/DestinationView.java | 8 +
.../broker/jmx/DestinationViewMBean.java | 8 +
.../apache/activemq/broker/region/Queue.java | 1 +
.../apache/activemq/broker/region/Topic.java | 1 +
.../activemq/store/AbstractMessageStore.java | 21 ++
.../org/apache/activemq/store/MessageStore.java | 12 +
.../activemq/store/MessageStoreStatistics.java | 81 ++++++
.../activemq/store/ProxyMessageStore.java | 11 +
.../activemq/store/ProxyTopicMessageStore.java | 12 +-
.../store/memory/MemoryMessageStore.java | 43 ++-
.../activemq/management/SizeStatisticImpl.java | 17 ++
.../activemq/store/jdbc/JDBCMessageStore.java | 2 +
.../store/journal/JournalMessageStore.java | 15 +-
.../activemq/store/kahadb/KahaDBStore.java | 69 ++---
.../activemq/store/kahadb/MessageDatabase.java | 172 +++++++++++-
.../activemq/store/kahadb/TempKahaDBStore.java | 40 +--
.../kahadb/disk/util/LocationMarshaller.java | 5 +
.../apache/activemq/leveldb/LevelDBStore.scala | 2 +-
.../cursors/StoreQueueCursorOrderTest.java | 10 +-
.../store/AbstractMessageStoreSizeStatTest.java | 266 +++++++++++++++++++
.../store/AbstractMessageStoreSizeTest.java | 98 +++++++
.../AbstractKahaDBMessageStoreSizeTest.java | 147 ++++++++++
.../kahadb/KahaDBMessageStoreSizeStatTest.java | 82 ++++++
.../kahadb/KahaDBMessageStoreSizeTest.java | 46 ++++
.../MultiKahaDBMessageStoreSizeStatTest.java | 134 ++++++++++
.../kahadb/MultiKahaDBMessageStoreSizeTest.java | 68 +++++
.../memory/MemoryMessageStoreSizeStatTest.java | 45 ++++
.../memory/MemoryMessageStoreSizeTest.java | 45 ++++
.../kahadb/MessageStoreTest/version5/db-1.log | Bin 0 -> 524288 bytes
.../kahadb/MessageStoreTest/version5/db.data | Bin 0 -> 32768 bytes
.../kahadb/MessageStoreTest/version5/db.redo | Bin 0 -> 32824 bytes
.../version5/queue#3a#2f#2fTest/db-1.log | Bin 0 -> 524288 bytes
.../version5/queue#3a#2f#2fTest/db.data | Bin 0 -> 32768 bytes
.../version5/queue#3a#2f#2fTest/db.redo | Bin 0 -> 32824 bytes
34 files changed, 1382 insertions(+), 79 deletions(-)
----------------------------------------------------------------------