You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/10/10 16:24:26 UTC

activemq git commit: [AMQ-6831, AMQ-6771] fix up recovery check to ensure full batch is available in memory, regression from AMQ-6771

Repository: activemq
Updated Branches:
  refs/heads/master 0e6fc19cf -> f98999227


[AMQ-6831, AMQ-6771] fix up recovery check to ensure full batch is available in memory, regression from AMQ-6771


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f9899922
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f9899922
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f9899922

Branch: refs/heads/master
Commit: f9899922783e0e94de030f4c867e5d48a3d869a9
Parents: 0e6fc19
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 10 17:24:09 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 10 17:24:09 2017 +0100

----------------------------------------------------------------------
 .../store/kahadb/disk/journal/Journal.java      | 10 +++++-
 .../store/kahadb/MessageDatabaseTest.java       | 36 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f9899922/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 5edee92..8e04414 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -567,10 +567,11 @@ public class Journal {
     }
 
     private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
-
+        ensureAvailable(bs, reader, EOF_RECORD.length);
         if (bs.startsWith(EOF_RECORD)) {
             return 0; // eof
         }
+        ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE);
         try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
 
             // Assert that it's a batch record.
@@ -623,6 +624,13 @@ public class Journal {
         }
     }
 
+    private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException {
+        if (bs.remaining() < required) {
+            bs.reset();
+            bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - bs.length));
+        }
+    }
+
     void addToTotalLength(int size) {
         totalLength.addAndGet(size);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f9899922/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
index d09be68..604b46f 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.activemq.store.kahadb;
 
+import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.util.ByteSequence;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -28,6 +30,7 @@ import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
 import static org.junit.Assert.*;
 
 public class MessageDatabaseTest {
@@ -78,4 +81,37 @@ public class MessageDatabaseTest {
         assertNull("audit location should be null", kaha.getMetadata().producerSequenceIdTrackerLocation);
     }
 
+    @Test
+    public void testRecoverCheckOnBatchBoundary() throws Exception {
+
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File(temporaryFolder.getRoot(), "kaha2"));
+        kaha.setCheckpointInterval(0l); // disable periodic checkpoint
+        kaha.setCheckForCorruptJournalFiles(true);
+        kaha.setChecksumJournalFiles(true);
+        kaha.setMaxFailoverProducersToTrack(10);
+        kaha.setBrokerService(new BrokerService() {
+            public void handleIOException(IOException exception) {
+                exception.printStackTrace();
+            }
+        });
+        kaha.start();
+
+        // track original metadata reference to ensure it is read from the journal on recovery
+        ActiveMQMessageAuditNoSync auditToVerify = kaha.getMetadata().producerSequenceIdTracker;
+        final String messsageId = "1:1:1:1";
+        auditToVerify.isDuplicate(messsageId);
+
+        ByteSequence byteSequence = new ByteSequence(new byte[DEFAULT_MAX_WRITE_BATCH_SIZE - 110]);
+        kaha.getJournal().write(byteSequence, false);
+        kaha.getJournal().write(byteSequence, false);
+
+        kaha.stop();
+        try {
+            kaha.start();
+            assertTrue("Value from journal recovered ok", kaha.getMetadata().producerSequenceIdTracker.isDuplicate(messsageId));
+        } finally {
+            kaha.stop();
+        }
+    }
 }
\ No newline at end of file