You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/29 11:33:18 UTC

activemq git commit: [AMQ-6522] - remove hardcoded 32k batch limit from recovery check of the journal, fix and test

Repository: activemq
Updated Branches:
  refs/heads/master 0a29533ed -> dad629e88


[AMQ-6522] - remove hardcoded 32k batch limit from recovery check of the journal, fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dad629e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dad629e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dad629e8

Branch: refs/heads/master
Commit: dad629e889b2116a778fd4f77680a1b2944b400f
Parents: 0a29533
Author: gtully <ga...@gmail.com>
Authored: Tue Nov 29 11:32:03 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Nov 29 11:32:03 2016 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |   8 +-
 .../store/kahadb/disk/journal/Journal.java      |   4 +-
 .../org/apache/activemq/bugs/AMQ6522Test.java   | 117 +++++++++++++++++++
 3 files changed, 122 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dad629e8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8c79c66..c46a127 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -935,11 +935,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         if (!missingPredicates.isEmpty()) {
             for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
                 final StoredDestination sd = sdEntry.getValue();
-                final ArrayList<Long> matches = new ArrayList<Long>();
+                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
                 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
                     @Override
                     protected void matched(Location key, Long value) {
-                        matches.add(value);
+                        matches.put(value, key);
                     }
                 });
 
@@ -950,7 +950,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     // we error out.
                     if( ignoreMissingJournalfiles ) {
                         // Update the index to remove the references to the missing data
-                        for (Long sequenceId : matches) {
+                        for (Long sequenceId : matches.keySet()) {
                             MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                             sd.locationIndex.remove(tx, keys.location);
                             sd.messageIdIndex.remove(tx, keys.messageId);
@@ -960,7 +960,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                             // TODO: do we need to modify the ack positions for the pub sub case?
                         }
                     } else {
-                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
+                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
                         throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dad629e8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 65a952b..548928e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -65,8 +65,6 @@ public class Journal {
     public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
     public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
 
-    private static final int MAX_BATCH_SIZE = 32*1024*1024;
-
     private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
 
     // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
@@ -570,7 +568,7 @@ public class Journal {
             }
 
             int size = controlIs.readInt();
-            if (size > MAX_BATCH_SIZE) {
+            if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
                 return -1;
             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/dad629e8/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
new file mode 100644
index 0000000..286788d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ6522Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6522Test.class);
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory connectionFactory;
+    private final Destination destination = new ActiveMQQueue("large_message_queue");
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        initBroker(true);
+    }
+
+    public void initBroker(Boolean deleteAllMessages) throws Exception {
+        broker = createBroker();
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setUseCache(false);
+        broker.setDestinationPolicy(new PolicyMap());
+        broker.getDestinationPolicy().setDefaultEntry(policy);
+
+        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+        kahadb.setCheckForCorruptJournalFiles(true);
+        kahadb.setPreallocationScope(Journal.PreallocationScope.NONE.name());
+
+        broker.setPersistenceAdapter(kahadb);
+        broker.setUseJmx(false);
+
+        return broker;
+    }
+
+
+    @Test
+    public void verifyMessageExceedsJournalRestartRecoveryCheck() throws Exception {
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer producer = session.createProducer(destination);
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[33*1024*1024]);
+            producer.send(message);
+
+        } finally {
+            connection.close();
+        }
+
+        tearDown();
+        initBroker(false);
+
+        connection = connectionFactory.createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(destination);
+            assertNotNull("Got message after restart", consumer.receive(20000));
+        } finally {
+            connection.close();
+        }
+    }
+}
\ No newline at end of file