You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/05/23 18:43:24 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6303

Repository: activemq
Updated Branches:
  refs/heads/master b4e35fe8a -> 4d6cc4b46


https://issues.apache.org/jira/browse/AMQ-6303

Properly setting typeCode value for new journal files used for ack
compaction


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4d6cc4b4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4d6cc4b4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4d6cc4b4

Branch: refs/heads/master
Commit: 4d6cc4b46007c7bcec10ade080e822d83261273f
Parents: b4e35fe
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon May 23 18:09:01 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon May 23 18:42:29 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/store/kahadb/MessageDatabase.java | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6cc4b4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 208a52b..e3918ab 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1375,12 +1375,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
     protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
         final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
-        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
-            // Mark the current journal file as a compacted file so that gc checks can skip
-            // over logs that are smaller compaction type logs.
-            DataFile current = journal.getDataFileById(location.getDataFileId());
-            current.setTypeCode(command.getRewriteType());
 
+        // Mark the current journal file as a compacted file so that gc checks can skip
+        // over logs that are smaller compaction type logs.
+        DataFile current = journal.getDataFileById(location.getDataFileId());
+        current.setTypeCode(command.getRewriteType());
+
+        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
             // Move offset so that next location read jumps to next file.
             location.setOffset(journalMaxFileLength);
         }
@@ -1971,6 +1972,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
 
         DataFile forwardsFile = journal.reserveDataFile();
+        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
         LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
 
         Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
@@ -1978,7 +1980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
             KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
             compactionMarker.setSourceDataFileId(journalToRead);
-            compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
+            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
 
             ByteSequence payload = toByteSequence(compactionMarker);
             appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);