You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/05/23 18:43:24 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6303
Repository: activemq
Updated Branches:
refs/heads/master b4e35fe8a -> 4d6cc4b46
https://issues.apache.org/jira/browse/AMQ-6303
Properly setting typeCode value for new journal files used for ack
compaction
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4d6cc4b4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4d6cc4b4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4d6cc4b4
Branch: refs/heads/master
Commit: 4d6cc4b46007c7bcec10ade080e822d83261273f
Parents: b4e35fe
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon May 23 18:09:01 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon May 23 18:42:29 2016 +0000
----------------------------------------------------------------------
.../apache/activemq/store/kahadb/MessageDatabase.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4d6cc4b4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 208a52b..e3918ab 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1375,12 +1375,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException {
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
- if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
- // Mark the current journal file as a compacted file so that gc checks can skip
- // over logs that are smaller compaction type logs.
- DataFile current = journal.getDataFileById(location.getDataFileId());
- current.setTypeCode(command.getRewriteType());
+ // Mark the current journal file as a compacted file so that gc checks can skip
+ // over logs that are smaller compaction type logs.
+ DataFile current = journal.getDataFileById(location.getDataFileId());
+ current.setTypeCode(command.getRewriteType());
+
+ if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
// Move offset so that next location read jumps to next file.
location.setOffset(journalMaxFileLength);
}
@@ -1971,6 +1972,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
DataFile forwardsFile = journal.reserveDataFile();
+ forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
@@ -1978,7 +1980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
compactionMarker.setSourceDataFileId(journalToRead);
- compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
+ compactionMarker.setRewriteType(forwardsFile.getTypeCode());
ByteSequence payload = toByteSequence(compactionMarker);
appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);