You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/14 13:22:33 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6356

Repository: activemq
Updated Branches:
  refs/heads/master 159713298 -> a5050a8bc


https://issues.apache.org/jira/browse/AMQ-6356

Fixing store size calculation on KahaUpdateMessageCommand processing so
that the size won't increase inadvertently if the existing location of
the command in the journal is the same as the new location


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a5050a8b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a5050a8b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a5050a8b

Branch: refs/heads/master
Commit: a5050a8bc5b5f45852269867dd9bf46b9381912d
Parents: 1597132
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Jul 14 09:21:23 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Thu Jul 14 09:21:23 2016 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  11 +-
 .../store/kahadb/MessageDatabaseSizeTest.java   | 147 +++++++++++++++++++
 2 files changed, 155 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a5050a8b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 3ffe55f..cc3f676 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1471,10 +1471,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             );
             sd.locationIndex.put(tx, location, id);
             incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
-            // on first update previous is original location, on recovery/replay it may be the updated location
-            if(previousKeys != null && !previousKeys.location.equals(location)) {
-                sd.locationIndex.remove(tx, previousKeys.location);
+
+            if (previousKeys != null) {
+                //Remove the existing from the size
                 decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
+
+                // on first update previous is original location, on recovery/replay it may be the updated location
+                if(!previousKeys.location.equals(location)) {
+                    sd.locationIndex.remove(tx, previousKeys.location);
+                }
             }
             metadata.lastUpdate = location;
         } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/a5050a8b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
new file mode 100644
index 0000000..357dc5f
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageDatabaseSizeTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabaseSizeTest.class);
+
+    @Rule
+    public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
+    private final String payload = new String(new byte[1024]);
+
+    private BrokerService broker = null;
+    private final ActiveMQQueue destination = new ActiveMQQueue("Test");
+    private KahaDBPersistenceAdapter adapter;
+
+    protected void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
+        adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    @Before
+    public void start() throws Exception {
+        startBroker();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    /**
+     * Test that when only updating the index and not rewriting the message to the journal
+     * that the size doesn't change
+     *
+     * This was broken before AMQ-6356
+     */
+    @Test
+    public void testUpdateMessageSameLocation() throws Exception {
+        final KahaDBStore store = adapter.getStore();
+        MessageId messageId = new MessageId("111:222:333");
+        ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333"));
+
+        //Add a single message and update once so we can compare the size consistently
+        MessageStore messageStore = store.createQueueMessageStore(destination);
+        messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
+        messageStore.updateMessage(textMessage);
+
+        Location location = findMessageLocation(messageId.toString(), store.convert(destination));
+        long existingSize = messageStore.getMessageSize();
+
+        //Process the update command for the index and verify the size doesn't change
+        KahaUpdateMessageCommand updateMessageCommand = (KahaUpdateMessageCommand) store.load(location);
+        store.process(updateMessageCommand, location);
+        assertEquals(existingSize, messageStore.getMessageSize());
+    }
+
+    /**
+     * Test that when updating an existing message to a different location in the
+     * journal that the index size doesn't change
+     */
+    @Test
+    public void testUpdateMessageDifferentLocation() throws Exception {
+        final KahaDBStore store = adapter.getStore();
+        ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333"));
+
+        //Add a single message and update once so we can compare the size consistently
+        MessageStore messageStore = store.createQueueMessageStore(destination);
+        messageStore.addMessage(broker.getAdminConnectionContext(), textMessage);
+        messageStore.updateMessage(textMessage);
+
+        //Update again and make sure the size is the same
+        long existingSize = messageStore.getMessageSize();
+        messageStore.updateMessage(textMessage);
+        assertEquals(existingSize, messageStore.getMessageSize());
+    }
+
+    private ActiveMQTextMessage getMessage(final MessageId messageId) throws Exception {
+        ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setMessageId(messageId);
+        textMessage.setText(payload);
+
+        return textMessage;
+    }
+
+    private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
+        final KahaDBStore store = adapter.getStore();
+        return store.pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
+            @Override
+            public Location execute(Transaction tx) throws IOException {
+                StoredDestination sd = store.getStoredDestination(destination, tx);
+                Long sequence = sd.messageIdIndex.get(tx, key);
+                if (sequence == null) {
+                    return null;
+                }
+                return sd.orderIndex.get(tx, sequence).location;
+            }
+        });
+    }
+
+}