You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/04/12 15:32:43 UTC
[1/4] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6203
Repository: activemq
Updated Branches:
refs/heads/activemq-5.13.x 20e84d63e -> 91f05189a
https://issues.apache.org/jira/browse/AMQ-6203
Rewrite older acks that can be preventing GC of log files.
(cherry picked from commit 946e62d702d2bf5fbcdb0ed4cb6977046acb659b)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2a7255ab
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2a7255ab
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2a7255ab
Branch: refs/heads/activemq-5.13.x
Commit: 2a7255ab2c99be872f1175c0caea78e157d49bd1
Parents: 20e84d6
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Mar 14 11:04:57 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Apr 12 12:33:17 2016 +0000
----------------------------------------------------------------------
.../apache/activemq/util/ThreadPoolUtils.java | 40 ++-
.../activemq/store/kahadb/MessageDatabase.java | 345 +++++++++++++++----
.../apache/activemq/store/kahadb/Visitor.java | 4 +
.../store/kahadb/disk/journal/DataFile.java | 23 +-
.../kahadb/disk/journal/DataFileAccessor.java | 5 +-
.../kahadb/disk/journal/DataFileAppender.java | 32 +-
.../store/kahadb/disk/journal/FileAppender.java | 6 +-
.../store/kahadb/disk/journal/Journal.java | 128 +++++--
.../disk/journal/TargetedDataFileAppender.java | 297 ++++++++++++++++
.../disk/util/DataByteArrayInputStream.java | 31 +-
.../disk/util/DataByteArrayOutputStream.java | 24 +-
.../src/main/proto/journal-data.proto | 12 +
.../journal/TargetedDataFileAppenderTest.java | 116 +++++++
13 files changed, 898 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
index 27b69fc..554f730 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java
@@ -82,7 +82,9 @@ public final class ThreadPoolUtils {
* {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
* forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
* is used as timeout value waiting for orderly shutdown to
- * complete normally, before going aggressively.
+ * complete normally, before going aggressively. If the shutdownAwaitTermination
+ * value is negative the shutdown waits indefinitely for the ExecutorService
+ * to complete its shutdown.
*
* @param executorService the executor service to shutdown
* @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
@@ -130,6 +132,19 @@ public final class ThreadPoolUtils {
Thread.currentThread().interrupt();
}
}
+ } else if (shutdownAwaitTermination < 0) {
+ try {
+ awaitTermination(executorService);
+ } catch (InterruptedException e) {
+ warned = true;
+ LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
+ // we were interrupted during shutdown, so force shutdown
+ try {
+ executorService.shutdownNow();
+ } finally {
+ Thread.currentThread().interrupt();
+ }
+ }
}
// if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
@@ -144,6 +159,29 @@ public final class ThreadPoolUtils {
}
/**
+ * Awaits the termination of the thread pool indefinitely (Use with Caution).
+ * <p/>
+ * This implementation will log every 2nd second at INFO level that we are waiting, so the end user
+ * can see we are not hanging in case it takes longer time to terminate the pool.
+ *
+ * @param executorService the thread pool
+ *
+ * @throws InterruptedException is thrown if we are interrupted during the waiting
+ */
+ public static void awaitTermination(ExecutorService executorService) throws InterruptedException {
+ // log progress every 5th second so end user is aware of we are shutting down
+ StopWatch watch = new StopWatch();
+ final long interval = 2000;
+ while (true) {
+ if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
+ return;
+ } else {
+ LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
+ }
+ }
+ }
+
+ /**
* Awaits the termination of the thread pool.
* <p/>
* This implementation will log every 2nd second at INFO level that we are waiting, so the end user
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 931a18b..434e49c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -48,6 +48,10 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,6 +77,7 @@ import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -84,6 +89,7 @@ import org.apache.activemq.store.kahadb.disk.index.ListIndex;
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
@@ -97,9 +103,11 @@ import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,6 +130,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static final int VERSION = 6;
+ static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
+
protected class Metadata {
protected Page<Metadata> page;
protected int state;
@@ -234,8 +244,10 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected boolean deleteAllMessages;
protected File directory = DEFAULT_DIRECTORY;
protected File indexDirectory = null;
- protected Thread checkpointThread;
- protected boolean enableJournalDiskSyncs=true;
+ protected ScheduledExecutorService scheduler;
+ private final Object schedulerLock = new Object();
+
+ protected boolean enableJournalDiskSyncs = true;
protected boolean archiveDataLogs;
protected File directoryArchive;
protected AtomicLong journalSize = new AtomicLong(0);
@@ -254,7 +266,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = true;
protected boolean forceRecoverIndex = false;
- private final Object checkpointThreadLock = new Object();
private boolean archiveCorruptedIndex = false;
private boolean useIndexLFRUEviction = false;
private float indexLFUEvictionFactor = 0.2f;
@@ -263,6 +274,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean enableIndexPageCaching = true;
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
+ private int compactAcksAfterNoGC = 10;
+ private boolean compactAcksIgnoresStoreGrowth = false;
+ private int checkPointCyclesWithNoGC;
+ private int journalLogOnLastCompactionCheck;
+
@Override
public void doStart() throws Exception {
load();
@@ -330,51 +346,59 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
private void startCheckpoint() {
- if (checkpointInterval == 0 && cleanupInterval == 0) {
+ if (checkpointInterval == 0 && cleanupInterval == 0) {
LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
return;
}
- synchronized (checkpointThreadLock) {
- boolean start = false;
- if (checkpointThread == null) {
- start = true;
- } else if (!checkpointThread.isAlive()) {
- start = true;
- LOG.info("KahaDB: Recovering checkpoint thread after death");
- }
- if (start) {
- checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ synchronized (schedulerLock) {
+ if (scheduler == null) {
+ scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
@Override
- public void run() {
- try {
- long lastCleanup = System.currentTimeMillis();
- long lastCheckpoint = System.currentTimeMillis();
- // Sleep for a short time so we can periodically check
- // to see if we need to exit this thread.
- long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
- while (opened.get()) {
- Thread.sleep(sleepTime);
- long now = System.currentTimeMillis();
- if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
- checkpointCleanup(true);
- lastCleanup = now;
- lastCheckpoint = now;
- } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
- checkpointCleanup(false);
- lastCheckpoint = now;
- }
- }
- } catch (InterruptedException e) {
- // Looks like someone really wants us to exit this thread...
- } catch (IOException ioe) {
- LOG.error("Checkpoint failed", ioe);
- brokerService.handleIOException(ioe);
- }
+ public Thread newThread(Runnable r) {
+ Thread schedulerThread = new Thread(r);
+
+ schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
+ schedulerThread.setDaemon(true);
+
+ return schedulerThread;
}
- };
+ });
- checkpointThread.setDaemon(true);
- checkpointThread.start();
+ // Short intervals for check-point and cleanups
+ long delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+
+ scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private final class CheckpointRunner implements Runnable {
+
+ private long lastCheckpoint = System.currentTimeMillis();
+ private long lastCleanup = System.currentTimeMillis();
+
+ @Override
+ public void run() {
+ try {
+ // Decide on cleanup vs full checkpoint here.
+ if (opened.get()) {
+ long now = System.currentTimeMillis();
+ if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
+ checkpointCleanup(true);
+ lastCleanup = now;
+ lastCheckpoint = now;
+ } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
+ checkpointCleanup(false);
+ lastCheckpoint = now;
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.error("Checkpoint failed", ioe);
+ brokerService.handleIOException(ioe);
+ } catch (Throwable e) {
+ LOG.error("Checkpoint failed", e);
+ brokerService.handleIOException(IOExceptionSupport.create(e));
}
}
}
@@ -444,12 +468,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
checkpointLock.writeLock().unlock();
}
journal.close();
- synchronized (checkpointThreadLock) {
- if (checkpointThread != null) {
- checkpointThread.join();
- }
- }
- //clear the cache and journalSize on shutdown of the store
+ ThreadPoolUtils.shutdownGraceful(scheduler, -1);
+ // clear the cache and journalSize on shutdown of the store
storeCache.clear();
journalSize.set(0);
}
@@ -503,11 +523,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@SuppressWarnings("rawtypes")
private void trackMaxAndMin(Location[] range, List<Operation> ops) {
Location t = ops.get(0).getLocation();
- if (range[0]==null || t.compareTo(range[0]) <= 0) {
+ if (range[0] == null || t.compareTo(range[0]) <= 0) {
range[0] = t;
}
t = ops.get(ops.size() -1).getLocation();
- if (range[1]==null || t.compareTo(range[1]) >= 0) {
+ if (range[1] == null || t.compareTo(range[1]) >= 0) {
range[1] = t;
}
}
@@ -776,7 +796,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
- if( undoCounter > 0 ) {
+ if (undoCounter > 0) {
// The rolledback operations are basically in flight journal writes. To avoid getting
// these the end user should do sync writes to the journal.
if (LOG.isInfoEnabled()) {
@@ -909,7 +929,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
- if( undoCounter > 0 ) {
+ if (undoCounter > 0) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
// should do sync writes to the journal.
if (LOG.isInfoEnabled()) {
@@ -1019,31 +1039,31 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
try {
ByteSequence sequence = toByteSequence(data);
-
Location location;
+
checkpointLock.readLock().lock();
try {
long start = System.currentTimeMillis();
- location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
+ location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
long start2 = System.currentTimeMillis();
process(data, location, before);
long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
if (LOG.isInfoEnabled()) {
LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
}
}
-
- } finally{
+ } finally {
checkpointLock.readLock().unlock();
}
+
if (after != null) {
after.run();
}
- if (checkpointThread != null && !checkpointThread.isAlive() && opened.get()) {
+ if (scheduler == null && opened.get()) {
startCheckpoint();
}
return location;
@@ -1167,6 +1187,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void visit(KahaUpdateMessageCommand command) throws IOException {
process(command, location);
}
+
+ @Override
+ public void visit(KahaRewrittenDataFileCommand command) throws IOException {
+ process(command, location);
+ }
});
}
@@ -1323,6 +1348,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
+ protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException {
+ final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
+ if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
+ // Mark the current journal file as a compacted file so that gc checks can skip
+ // over logs that are smaller compaction type logs.
+ DataFile current = journal.getDataFileById(location.getDataFileId());
+ current.setTypeCode(command.getRewriteType());
+
+ // Move offset so that next location read jumps to next file.
+ location.setOffset(journalMaxFileLength);
+ }
+ }
+
// /////////////////////////////////////////////////////////////////
// These methods do the actual index updates.
// /////////////////////////////////////////////////////////////////
@@ -1595,7 +1633,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
tx.store(metadata.page, metadataMarshaller, true);
pageFile.flush();
- if( cleanup ) {
+ if (cleanup) {
final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
@@ -1743,6 +1781,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
LOG.trace("gc candidates: " + gcCandidateSet);
LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap);
}
+
boolean ackMessageFileMapMod = false;
Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) {
@@ -1768,9 +1807,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
if (!gcCandidateSet.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
- }
+ LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
journal.removeDataFiles(gcCandidateSet);
for (Integer candidate : gcCandidateSet) {
for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
@@ -1780,12 +1817,153 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (ackMessageFileMapMod) {
checkpointUpdate(tx, false);
}
+ } else {
+ if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
+ // First check length of journal to make sure it makes sense to even try.
+ //
+ // If there is only one journal file with Acks in it we don't need to move
+ // it since it won't be chained to any later logs.
+ //
+ // If the logs haven't grown since the last time then we need to compact
+ // otherwise there seems to still be room for growth and we don't need to incur
+ // the overhead. Depending on configuration this check can be avoided and
+ // Ack compaction will run any time the store has not GC'd a journal file in
+ // the configured amount of cycles.
+ if (metadata.ackMessageFileMap.size() > 1 &&
+ (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
+
+ LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
+ try {
+ scheduler.execute(new AckCompactionRunner());
+ } catch (Exception ex) {
+ LOG.warn("Error on queueing the Ack Compactor", ex);
+ }
+ } else {
+ LOG.trace("Journal activity detected, no Ack compaction scheduled.");
+ }
+
+ checkPointCyclesWithNoGC = 0;
+ } else {
+ LOG.trace("Not yet time to check for compaction: {} of {} cycles",
+ checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
+ }
+
+ journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
}
}
LOG.debug("Checkpoint done.");
}
+ private final class AckCompactionRunner implements Runnable {
+
+ @Override
+ public void run() {
+ // Lock index to capture the ackMessageFileMap data
+ indexLock.writeLock().lock();
+
+ // Map keys might not be sorted, find the earliest log file to forward acks
+ // from and move only those, future cycles can chip away at more as needed.
+ // We won't move files that are themselves rewritten on a previous compaction.
+ List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
+ Collections.sort(journalFileIds);
+ int journalToAdvance = -1;
+ for (Integer journalFileId : journalFileIds) {
+ DataFile current = journal.getDataFileById(journalFileId);
+ if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
+ journalToAdvance = journalFileId;
+ break;
+ }
+ }
+
+ // Check if we found one, or if we only found the current file being written to.
+ if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
+ return;
+ }
+
+ Set<Integer> journalLogsReferenced =
+ new HashSet<Integer>(metadata.ackMessageFileMap.get(journalToAdvance));
+
+ indexLock.writeLock().unlock();
+
+ try {
+ // Background rewrite of the old acks
+ forwardAllAcks(journalToAdvance, journalLogsReferenced);
+
+ // Checkpoint with changes from the ackMessageFileMap
+ checkpointUpdate(false);
+ } catch (IOException ioe) {
+ LOG.error("Checkpoint failed", ioe);
+ brokerService.handleIOException(ioe);
+ } catch (Throwable e) {
+ LOG.error("Checkpoint failed", e);
+ brokerService.handleIOException(IOExceptionSupport.create(e));
+ }
+ }
+ }
+
+ private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
+ LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
+
+ DataFile forwardsFile = journal.reserveDataFile();
+ LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
+
+ Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
+
+ try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
+ KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
+ compactionMarker.setSourceDataFileId(journalToRead);
+ compactionMarker.setRewriteType(COMPACTED_JOURNAL_FILE);
+
+ ByteSequence payload = toByteSequence(compactionMarker);
+ appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
+ LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
+
+ Location nextLocation = journal.getNextLocation(new Location(journalToRead, 0));
+ while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
+ JournalCommand<?> command = null;
+ try {
+ command = load(nextLocation);
+ } catch (IOException ex) {
+ LOG.trace("Error loading command during ack forward: {}", nextLocation);
+ }
+
+ if (command != null && command instanceof KahaRemoveMessageCommand) {
+ payload = toByteSequence(command);
+ Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, isEnableJournalDiskSyncs());
+ updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
+ }
+
+ nextLocation = journal.getNextLocation(nextLocation);
+ }
+ }
+
+ LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
+
+ // Lock index while we update the ackMessageFileMap.
+ indexLock.writeLock().lock();
+
+ // Update the ack map with the new locations of the acks
+ for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
+ Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
+ if (referenceFileIds == null) {
+ referenceFileIds = new HashSet<Integer>();
+ referenceFileIds.addAll(entry.getValue());
+ metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
+ } else {
+ referenceFileIds.addAll(entry.getValue());
+ }
+ }
+
+ // remove the old location data from the ack map so that the old journal log file can
+ // be removed on next GC.
+ metadata.ackMessageFileMap.remove(journalToRead);
+
+ indexLock.writeLock().unlock();
+
+ LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
+ }
+
final Runnable nullCompletionCallback = new Runnable() {
@Override
public void run() {
@@ -1943,7 +2121,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
-
class StoredDestination {
MessageOrderIndex orderIndex = new MessageOrderIndex();
@@ -2708,7 +2885,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
}
}
-
}
class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
@@ -2728,7 +2904,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// /////////////////////////////////////////////////////////////////
private PageFile createPageFile() throws IOException {
- if( indexDirectory == null ) {
+ if (indexDirectory == null) {
indexDirectory = directory;
}
IOHelper.mkdirs(indexDirectory);
@@ -3456,4 +3632,43 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void setPreallocationStrategy(String preallocationStrategy) {
this.preallocationStrategy = preallocationStrategy;
}
+
+ public int getCompactAcksAfterNoGC() {
+ return compactAcksAfterNoGC;
+ }
+
+ /**
+ * Sets the number of GC cycles where no journal logs were removed before an attempt to
+ * move forward all the acks in the last log that contains them and is otherwise unreferenced.
+ * <p>
+ * A value of -1 will disable this feature.
+ *
+ * @param compactAcksAfterNoGC
+ * Number of empty GC cycles before we rewrite old ACKS.
+ */
+ public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
+ this.compactAcksAfterNoGC = compactAcksAfterNoGC;
+ }
+
+ /**
+ * Returns whether Ack compaction will ignore that the store is still growing
+ * and run more often.
+ *
+ * @return the compactAcksIgnoresStoreGrowth current value.
+ */
+ public boolean isCompactAcksIgnoresStoreGrowth() {
+ return compactAcksIgnoresStoreGrowth;
+ }
+
+ /**
+ * Configure if Ack compaction will occur regardless of continued growth of the
+ * journal logs meaning that the store has not run out of space yet. Because the
+ * compaction operation can be costly this value is defaulted to off and the Ack
+ * compaction is only done when it seems that the store cannot grow and larger.
+ *
+ * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
+ */
+ public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
+ this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
index 43fc152..641f176 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
@@ -30,6 +30,7 @@ import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
+import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -84,4 +85,7 @@ public class Visitor {
public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
}
+
+ public void visit(KahaRewrittenDataFileCommand kahaUpdateMessageCommand) throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index f1e078d..126d82b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -18,29 +18,23 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.RecoverableRandomAccessFile;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* DataFile
- *
- *
*/
public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile> {
- private static final Logger LOG = LoggerFactory.getLogger(DataFile.class);
+ public final static byte STANDARD_LOG_FILE = 0x0;
protected final File file;
protected final Integer dataFileId;
protected volatile int length;
+ protected int typeCode = STANDARD_LOG_FILE;
protected final SequenceSet corruptedBlocks = new SequenceSet();
DataFile(File file, int number) {
@@ -57,6 +51,14 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
return dataFileId;
}
+ public int getTypeCode() {
+ return typeCode;
+ }
+
+ public void setTypeCode(int typeCode) {
+ this.typeCode = typeCode;
+ }
+
public synchronized int getLength() {
return length;
}
@@ -70,7 +72,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
}
@Override
- public synchronized String toString() {
+ public synchronized String toString() {
return file.getName() + " number = " + dataFileId + " , length = " + length;
}
@@ -95,7 +97,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
}
@Override
- public int compareTo(DataFile df) {
+ public int compareTo(DataFile df) {
return dataFileId - df.dataFileId;
}
@@ -112,5 +114,4 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
public int hashCode() {
return dataFileId;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 6a49d06..de68cf0 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,7 +19,6 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.util.Map;
-import org.apache.activemq.store.kahadb.AbstractKahaDBStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
@@ -28,8 +27,6 @@ import org.slf4j.LoggerFactory;
/**
* Optimized Store reader and updater. Single threaded and synchronous. Use in
* conjunction with the DataFileAccessorPool of concurrent use.
- *
- *
*/
final class DataFileAccessor {
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 0ce647a..e2f173a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -54,33 +54,6 @@ class DataFileAppender implements FileAppender {
protected boolean running;
private Thread thread;
- public static class WriteKey {
- private final int file;
- private final long offset;
- private final int hash;
-
- public WriteKey(Location item) {
- file = item.getDataFileId();
- offset = item.getOffset();
- // TODO: see if we can build a better hash
- hash = (int)(file ^ offset);
- }
-
- @Override
- public int hashCode() {
- return hash;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof WriteKey) {
- WriteKey di = (WriteKey)obj;
- return di.file == file && di.offset == offset;
- }
- return false;
- }
- }
-
public class WriteBatch {
public final DataFile dataFile;
@@ -206,7 +179,7 @@ class DataFileAppender implements FileAppender {
while ( true ) {
if (nextWriteBatch == null) {
- DataFile file = journal.getCurrentWriteFile();
+ DataFile file = journal.getOrCreateCurrentWriteFile();
if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
@@ -287,9 +260,8 @@ class DataFileAppender implements FileAppender {
DataFile dataFile = null;
RecoverableRandomAccessFile file = null;
WriteBatch wb = null;
- try {
+ try (DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) {
- DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
while (true) {
// Block till we get a command.
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
index 6ed839f..e60096e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/FileAppender.java
@@ -16,11 +16,12 @@
*/
package org.apache.activemq.store.kahadb.disk.journal;
+import java.io.IOException;
+
import org.apache.activemq.util.ByteSequence;
-import java.io.IOException;
+public interface FileAppender extends AutoCloseable {
-public interface FileAppender {
public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
@@ -28,5 +29,6 @@ public interface FileAppender {
Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException;
+ @Override
void close() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index d00c377..f278e09 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -141,6 +141,7 @@ public class Journal {
protected boolean checkForCorruptionOnStartup;
protected boolean enableAsyncDiskSync = true;
private Timer timer;
+ private int nextDataFileId = 1;
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
@@ -163,6 +164,7 @@ public class Journal {
appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
File[] files = directory.listFiles(new FilenameFilter() {
+ @Override
public boolean accept(File dir, String n) {
return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
}
@@ -201,7 +203,9 @@ public class Journal {
}
}
- getCurrentWriteFile();
+ nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+
+ getOrCreateCurrentWriteFile();
if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
@@ -219,6 +223,7 @@ public class Journal {
cleanupTask = new Runnable() {
+ @Override
public void run() {
cleanup();
}
@@ -306,6 +311,7 @@ public class Journal {
return rc;
}
+
private static byte[] bytes(String string) {
try {
return string.getBytes("UTF-8");
@@ -321,16 +327,17 @@ public class Journal {
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
- while( true ) {
+ while (true) {
int size = checkBatchRecord(reader, location.getOffset());
- if ( size>=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) {
- location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+ if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
+ location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
} else {
- // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We
+ // Perhaps it's just some corruption... scan through the
+ // file to find the next valid batch record. We
// may have subsequent valid batch records.
- int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
- if( nextOffset >=0 ) {
+ int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
+ if (nextOffset >= 0) {
Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
dataFile.corruptedBlocks.add(sequence);
@@ -352,9 +359,9 @@ public class Journal {
totalLength.addAndGet(dataFile.getLength() - existingLen);
}
- if( !dataFile.corruptedBlocks.isEmpty() ) {
+ if (!dataFile.corruptedBlocks.isEmpty()) {
// Is the end of the data file corrupted?
- if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
+ if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
}
}
@@ -368,19 +375,19 @@ public class Journal {
ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
int pos = 0;
- while( true ) {
+ while (true) {
pos = bs.indexOf(header, pos);
- if( pos >= 0 ) {
- return offset+pos;
+ if (pos >= 0) {
+ return offset + pos;
} else {
// need to load the next data chunck in..
- if( bs.length != data.length ) {
+ if (bs.length != data.length) {
// If we had a short read then we were at EOF
return -1;
}
- offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
+ offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
bs = new ByteSequence(data, 0, reader.read(offset, data));
- pos=0;
+ pos = 0;
}
}
}
@@ -392,34 +399,34 @@ public class Journal {
reader.readFully(offset, controlRecord);
- // Assert that it's a batch record.
- for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
- if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
+ // Assert that it's a batch record.
+ for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
+ if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
return -1;
}
}
int size = controlIs.readInt();
- if( size > MAX_BATCH_SIZE ) {
+ if (size > MAX_BATCH_SIZE) {
return -1;
}
- if( isChecksum() ) {
+ if (isChecksum()) {
long expectedChecksum = controlIs.readLong();
- if( expectedChecksum == 0 ) {
+ if (expectedChecksum == 0) {
// Checksuming was not enabled when the record was stored.
// we can't validate the record :(
return size;
}
byte data[] = new byte[size];
- reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
+ reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
Checksum checksum = new Adler32();
checksum.update(data, 0, data.length);
- if( expectedChecksum!=checksum.getValue() ) {
+ if (expectedChecksum != checksum.getValue()) {
return -1;
}
@@ -436,15 +443,22 @@ public class Journal {
return totalLength.get();
}
- synchronized DataFile getCurrentWriteFile() throws IOException {
+ synchronized DataFile getOrCreateCurrentWriteFile() throws IOException {
if (dataFiles.isEmpty()) {
rotateWriteFile();
}
- return dataFiles.getTail();
+
+ DataFile current = dataFiles.getTail();
+
+ if (current != null) {
+ return current;
+ } else {
+ return rotateWriteFile();
+ }
}
synchronized DataFile rotateWriteFile() {
- int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+ int nextNum = nextDataFileId++;
File file = getFile(nextNum);
DataFile nextWriteFile = new DataFile(file, nextNum);
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
@@ -453,6 +467,20 @@ public class Journal {
return nextWriteFile;
}
+ public synchronized DataFile reserveDataFile() {
+ int nextNum = nextDataFileId++;
+ File file = getFile(nextNum);
+ DataFile reservedDataFile = new DataFile(file, nextNum);
+ fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
+ fileByFileMap.put(file, reservedDataFile);
+ if (dataFiles.isEmpty()) {
+ dataFiles.addLast(reservedDataFile);
+ } else {
+ dataFiles.getTail().linkBefore(reservedDataFile);
+ }
+ return reservedDataFile;
+ }
+
public File getFile(int nextNum) {
String fileName = filePrefix + nextNum + fileSuffix;
File file = new File(directory, fileName);
@@ -479,10 +507,6 @@ public class Journal {
return dataFile.getFile();
}
- private DataFile getNextDataFile(DataFile dataFile) {
- return dataFile.getNext();
- }
-
public void close() throws IOException {
synchronized (this) {
if (!started) {
@@ -521,6 +545,7 @@ public class Journal {
DataFile dataFile = i.next();
result &= dataFile.delete();
}
+
totalLength.set(0);
fileMap.clear();
fileByFileMap.clear();
@@ -536,11 +561,11 @@ public class Journal {
public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
for (Integer key : files) {
// Can't remove the data file (or subsequent files) that is currently being written to.
- if( key >= lastAppendLocation.get().getDataFileId() ) {
+ if (key >= lastAppendLocation.get().getDataFileId()) {
continue;
}
DataFile dataFile = fileMap.get(key);
- if( dataFile!=null ) {
+ if (dataFile != null) {
forceRemoveDataFile(dataFile);
}
}
@@ -569,7 +594,7 @@ public class Journal {
LOG.debug("Successfully moved data file");
} else {
LOG.debug("Deleting data file: {}", dataFile);
- if ( dataFile.delete() ) {
+ if (dataFile.delete()) {
LOG.debug("Discarded data file: {}", dataFile);
} else {
LOG.warn("Failed to discard data file : {}", dataFile.getFile());
@@ -606,7 +631,7 @@ public class Journal {
if (cur == null) {
if (location == null) {
DataFile head = dataFiles.getHead();
- if( head == null ) {
+ if (head == null) {
return null;
}
cur = new Location();
@@ -629,7 +654,7 @@ public class Journal {
// Did it go into the next file??
if (dataFile.getLength() <= cur.getOffset()) {
- dataFile = getNextDataFile(dataFile);
+ dataFile = dataFile.getNext();
if (dataFile == null) {
return null;
} else {
@@ -758,10 +783,35 @@ public class Journal {
this.archiveDataLogs = archiveDataLogs;
}
- synchronized public Integer getCurrentDataFileId() {
- if (dataFiles.isEmpty())
+ public synchronized DataFile getDataFileById(int dataFileId) {
+ if (dataFiles.isEmpty()) {
+ return null;
+ }
+
+ return fileMap.get(Integer.valueOf(dataFileId));
+ }
+
+ public synchronized DataFile getCurrentDataFile() {
+ if (dataFiles.isEmpty()) {
return null;
- return dataFiles.getTail().getDataFileId();
+ }
+
+ DataFile current = dataFiles.getTail();
+
+ if (current != null) {
+ return current;
+ } else {
+ return null;
+ }
+ }
+
+ public synchronized Integer getCurrentDataFileId() {
+ DataFile current = getCurrentDataFile();
+ if (current != null) {
+ return current.getDataFileId();
+ } else {
+ return null;
+ }
}
/**
@@ -869,10 +919,12 @@ public class Journal {
hash = (int)(file ^ offset);
}
+ @Override
public int hashCode() {
return hash;
}
+ @Override
public boolean equals(Object obj) {
if (obj instanceof WriteKey) {
WriteKey di = (WriteKey)obj;
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
new file mode 100644
index 0000000..3e3e090
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppender.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
+import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * File Appender instance that performs batched writes in the thread where the write is
+ * queued. This appender does not honor the maxFileLength value in the journal as the
+ * files created here are out-of-band logs used for other purposes such as journal level
+ * compaction.
+ */
+public class TargetedDataFileAppender implements FileAppender {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TargetedDataFileAppender.class);
+
+ private final Journal journal;
+ private final DataFile target;
+ private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
+ private final int maxWriteBatchSize;
+
+ private boolean closed;
+ private boolean preallocate;
+ private WriteBatch nextWriteBatch;
+ private int statIdx = 0;
+ private int[] stats = new int[maxStat];
+
+ public class WriteBatch {
+
+ protected final int offset;
+
+ public final DataFile dataFile;
+ public final LinkedNodeList<Journal.WriteCommand> writes = new LinkedNodeList<Journal.WriteCommand>();
+ public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+ public AtomicReference<IOException> exception = new AtomicReference<IOException>();
+
+ public WriteBatch(DataFile dataFile, int offset) {
+ this.dataFile = dataFile;
+ this.offset = offset;
+ this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+ this.size = Journal.BATCH_CONTROL_RECORD_SIZE;
+ journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+ }
+
+ public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand write) throws IOException {
+ this(dataFile, offset);
+ append(write);
+ }
+
+ public boolean canAppend(Journal.WriteCommand write) {
+ int newSize = size + write.location.getSize();
+ if (newSize >= maxWriteBatchSize) {
+ return false;
+ }
+ return true;
+ }
+
+ public void append(Journal.WriteCommand write) throws IOException {
+ this.writes.addLast(write);
+ write.location.setDataFileId(dataFile.getDataFileId());
+ write.location.setOffset(offset + size);
+ int s = write.location.getSize();
+ size += s;
+ dataFile.incrementLength(s);
+ journal.addToTotalLength(s);
+ }
+ }
+
+ /**
+ * Construct a Store writer
+ */
+ public TargetedDataFileAppender(Journal journal, DataFile target) {
+ this.journal = journal;
+ this.target = target;
+ this.inflightWrites = this.journal.getInflightWrites();
+ this.maxWriteBatchSize = this.journal.getWriteBatchSize();
+ }
+
+ @Override
+ public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
+ checkClosed();
+
+ // Write the packet our internal buffer.
+ int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+ final Location location = new Location();
+ location.setSize(size);
+ location.setType(type);
+
+ Journal.WriteCommand write = new Journal.WriteCommand(location, data, sync);
+
+ enqueueWrite(write);
+
+ if (sync) {
+ writePendingBatch();
+ }
+
+ return location;
+ }
+
+ @Override
+ public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
+ checkClosed();
+
+ // Write the packet our internal buffer.
+ int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
+
+ final Location location = new Location();
+ location.setSize(size);
+ location.setType(type);
+
+ Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
+
+ enqueueWrite(write);
+
+ return location;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ if (nextWriteBatch != null) {
+ // force sync of current in-progress batched write.
+ LOG.debug("Close of targeted appender flushing last batch.");
+ writePendingBatch();
+ }
+
+ closed = true;
+ }
+ }
+
+ //----- Appender Configuration -------------------------------------------//
+
+ public boolean isPreallocate() {
+ return preallocate;
+ }
+
+ public void setPreallocate(boolean preallocate) {
+ this.preallocate = preallocate;
+ }
+
+ //----- Internal Implementation ------------------------------------------//
+
+ private void checkClosed() throws IOException {
+ if (closed) {
+ throw new IOException("The appender is clsoed");
+ }
+ }
+
+ private WriteBatch enqueueWrite(Journal.WriteCommand write) throws IOException {
+ while (true) {
+ if (nextWriteBatch == null) {
+ nextWriteBatch = new WriteBatch(target, target.getLength(), write);
+ break;
+ } else {
+ // Append to current batch if possible..
+ if (nextWriteBatch.canAppend(write)) {
+ nextWriteBatch.append(write);
+ break;
+ } else {
+ // Flush current batch and start a new one.
+ writePendingBatch();
+ nextWriteBatch = null;
+ }
+ }
+ }
+
+ if (!write.sync) {
+ inflightWrites.put(new Journal.WriteKey(write.location), write);
+ }
+
+ return nextWriteBatch;
+ }
+
+ private void writePendingBatch() throws IOException {
+ DataFile dataFile = nextWriteBatch.dataFile;
+
+ try (RecoverableRandomAccessFile file = dataFile.openRandomAccessFile();
+ DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);) {
+
+ // preallocate on first open of new file (length == 0) if configured to do so.
+ // NOTE: dataFile.length cannot be used because it is updated in enqueue
+ if (file.length() == 0L && isPreallocate()) {
+ journal.preallocateEntireJournalDataFile(file);
+ }
+
+ Journal.WriteCommand write = nextWriteBatch.writes.getHead();
+
+ // Write an empty batch control record.
+ buff.reset();
+ buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
+ buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
+ buff.writeInt(0);
+ buff.writeLong(0);
+
+ while (write != null) {
+ buff.writeInt(write.location.getSize());
+ buff.writeByte(write.location.getType());
+ buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+ write = write.getNext();
+ }
+
+ // append 'unset' next batch (5 bytes) so read can always find eof
+ buff.writeInt(0);
+ buff.writeByte(0);
+
+ ByteSequence sequence = buff.toByteSequence();
+
+ // Now we can fill in the batch control record properly.
+ buff.reset();
+ buff.skip(5 + Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+ buff.writeInt(sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+ if (journal.isChecksum()) {
+ Checksum checksum = new Adler32();
+ checksum.update(sequence.getData(),
+ sequence.getOffset() + Journal.BATCH_CONTROL_RECORD_SIZE,
+ sequence.getLength() - Journal.BATCH_CONTROL_RECORD_SIZE - 5);
+ buff.writeLong(checksum.getValue());
+ }
+
+ // Now do the 1 big write.
+ file.seek(nextWriteBatch.offset);
+ if (maxStat > 0) {
+ if (statIdx < maxStat) {
+ stats[statIdx++] = sequence.getLength();
+ } else {
+ long all = 0;
+ for (; statIdx > 0;) {
+ all += stats[--statIdx];
+ }
+ LOG.trace("Ave writeSize: {}", all / maxStat);
+ }
+ }
+
+ file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+
+ ReplicationTarget replicationTarget = journal.getReplicationTarget();
+ if (replicationTarget != null) {
+ replicationTarget.replicate(nextWriteBatch.writes.getHead().location, sequence, true);
+ }
+
+ file.sync();
+
+ signalDone(nextWriteBatch);
+ } catch (IOException e) {
+ LOG.info("Journal failed while writing at: {}", nextWriteBatch.offset);
+ throw e;
+ }
+ }
+
+ private void signalDone(WriteBatch writeBatch) {
+ // Now that the data is on disk, remove the writes from the in
+ // flight cache and signal any onComplete requests.
+ Journal.WriteCommand write = writeBatch.writes.getHead();
+ while (write != null) {
+ if (!write.sync) {
+ inflightWrites.remove(new Journal.WriteKey(write.location));
+ }
+
+ if (write.onComplete != null) {
+ try {
+ write.onComplete.run();
+ } catch (Throwable e) {
+ LOG.info("Add exception was raised while executing the run command for onComplete", e);
+ }
+ }
+
+ write = write.getNext();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
index 7147fd9..455a020 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,19 +16,18 @@
*/
package org.apache.activemq.store.kahadb.disk.util;
-import org.apache.activemq.util.ByteSequence;
-
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
+import org.apache.activemq.util.ByteSequence;
+
/**
* Optimized ByteArrayInputStream that can be used more than once
- *
- *
*/
-public final class DataByteArrayInputStream extends InputStream implements DataInput {
+public final class DataByteArrayInputStream extends InputStream implements DataInput, AutoCloseable {
+
private byte[] buf;
private int pos;
private int offset;
@@ -137,6 +136,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
* @return the next byte of data, or <code>-1</code> if the end of the
* stream has been reached.
*/
+ @Override
public int read() {
return (pos < length) ? (buf[pos++] & 0xff) : -1;
}
@@ -152,6 +152,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
* <code>-1</code> if there is no more data because the end of the
* stream has been reached.
*/
+ @Override
public int read(byte b[], int off, int len) {
if (b == null) {
throw new NullPointerException();
@@ -174,18 +175,22 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
* @return the number of bytes that can be read from the input stream
* without blocking.
*/
+ @Override
public int available() {
return length - pos;
}
+ @Override
public void readFully(byte[] b) {
read(b, 0, b.length);
}
+ @Override
public void readFully(byte[] b, int off, int len) {
read(b, off, len);
}
+ @Override
public int skipBytes(int n) {
if (pos + n > length) {
n = length - pos;
@@ -197,39 +202,47 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return n;
}
+ @Override
public boolean readBoolean() {
return read() != 0;
}
+ @Override
public byte readByte() {
return (byte)read();
}
+ @Override
public int readUnsignedByte() {
return read();
}
+ @Override
public short readShort() {
this.read(work, 0, 2);
return (short) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
}
+ @Override
public int readUnsignedShort() {
this.read(work, 0, 2);
- return (int) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
+ return ((work[0] & 0xff) << 8) | (work[1] & 0xff);
}
+ @Override
public char readChar() {
this.read(work, 0, 2);
return (char) (((work[0] & 0xff) << 8) | (work[1] & 0xff));
}
+ @Override
public int readInt() {
this.read(work, 0, 4);
return ((work[0] & 0xff) << 24) | ((work[1] & 0xff) << 16) |
((work[2] & 0xff) << 8) | (work[3] & 0xff);
}
+ @Override
public long readLong() {
this.read(work, 0, 8);
@@ -241,14 +254,17 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
}
+ @Override
public float readFloat() throws IOException {
return Float.intBitsToFloat(readInt());
}
+ @Override
public double readDouble() throws IOException {
return Double.longBitsToDouble(readLong());
}
+ @Override
public String readLine() {
int start = pos;
while (pos < length) {
@@ -267,6 +283,7 @@ public final class DataByteArrayInputStream extends InputStream implements DataI
return new String(buf, start, pos);
}
+ @Override
public String readUTF() throws IOException {
int length = readUnsignedShort();
int endPos = pos + length;
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
index 469c853..d29b70f 100755
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DataByteArrayOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,21 +16,18 @@
*/
package org.apache.activemq.store.kahadb.disk.util;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.util.ByteSequence;
-
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.util.ByteSequence;
/**
* Optimized ByteArrayOutputStream
- *
- *
*/
-public class DataByteArrayOutputStream extends OutputStream implements DataOutput {
+public class DataByteArrayOutputStream extends OutputStream implements DataOutput, AutoCloseable {
private static final int DEFAULT_SIZE = PageFile.DEFAULT_PAGE_SIZE;
protected byte buf[];
protected int pos;
@@ -88,6 +85,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
* @param b the byte to be written.
* @throws IOException
*/
+ @Override
public void write(int b) throws IOException {
int newcount = pos + 1;
ensureEnoughBuffer(newcount);
@@ -105,6 +103,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
* @param len the number of bytes to write.
* @throws IOException
*/
+ @Override
public void write(byte b[], int off, int len) throws IOException {
if (len == 0) {
return;
@@ -146,18 +145,21 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
return pos;
}
+ @Override
public void writeBoolean(boolean v) throws IOException {
ensureEnoughBuffer(pos + 1);
buf[pos++] = (byte)(v ? 1 : 0);
onWrite();
}
+ @Override
public void writeByte(int v) throws IOException {
ensureEnoughBuffer(pos + 1);
buf[pos++] = (byte)(v >>> 0);
onWrite();
}
+ @Override
public void writeShort(int v) throws IOException {
ensureEnoughBuffer(pos + 2);
buf[pos++] = (byte)(v >>> 8);
@@ -165,6 +167,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
+ @Override
public void writeChar(int v) throws IOException {
ensureEnoughBuffer(pos + 2);
buf[pos++] = (byte)(v >>> 8);
@@ -172,6 +175,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
+ @Override
public void writeInt(int v) throws IOException {
ensureEnoughBuffer(pos + 4);
buf[pos++] = (byte)(v >>> 24);
@@ -181,6 +185,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
+ @Override
public void writeLong(long v) throws IOException {
ensureEnoughBuffer(pos + 8);
buf[pos++] = (byte)(v >>> 56);
@@ -194,14 +199,17 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
onWrite();
}
+ @Override
public void writeFloat(float v) throws IOException {
writeInt(Float.floatToIntBits(v));
}
+ @Override
public void writeDouble(double v) throws IOException {
writeLong(Double.doubleToLongBits(v));
}
+ @Override
public void writeBytes(String s) throws IOException {
int length = s.length();
for (int i = 0; i < length; i++) {
@@ -209,6 +217,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
}
}
+ @Override
public void writeChars(String s) throws IOException {
int length = s.length();
for (int i = 0; i < length; i++) {
@@ -218,6 +227,7 @@ public class DataByteArrayOutputStream extends OutputStream implements DataOutpu
}
}
+ @Override
public void writeUTF(String str) throws IOException {
int strlen = str.length();
int encodedsize = 0;
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/main/proto/journal-data.proto
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/proto/journal-data.proto b/activemq-kahadb-store/src/main/proto/journal-data.proto
index 01607a5..2dd97b9 100644
--- a/activemq-kahadb-store/src/main/proto/journal-data.proto
+++ b/activemq-kahadb-store/src/main/proto/journal-data.proto
@@ -37,6 +37,7 @@ enum KahaEntryType {
KAHA_REMOVE_SCHEDULED_JOB_COMMAND = 13;
KAHA_REMOVE_SCHEDULED_JOBS_COMMAND = 14;
KAHA_DESTROY_SCHEDULER_COMMAND = 15;
+ KAHA_REWRITTEN_DATA_FILE_COMMAND = 16;
}
message KahaTraceCommand {
@@ -240,6 +241,17 @@ message KahaDestroySchedulerCommand {
required string scheduler=1;
}
+message KahaRewrittenDataFileCommand {
+ //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaRewrittenDataFileCommand>";
+ //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+ //| option java_type_method = "KahaEntryType";
+
+ required int32 sourceDataFileId = 1;
+ optional int32 rewriteType = 2;
+ optional bool skipIfSourceExists = 3 [default = true];
+
+}
+
// TODO things to ponder
// should we move more message fields
// that are set by the sender (and rarely required by the broker
http://git-wip-us.apache.org/repos/asf/activemq/blob/2a7255ab/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
new file mode 100644
index 0000000..a6cdb9e
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/TargetedDataFileAppenderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the single threaded DataFileAppender class.
+ */
+public class TargetedDataFileAppenderTest {
+
+ private Journal dataManager;
+ private TargetedDataFileAppender appender;
+ private DataFile dataFile;
+ private File dir;
+
+ @Before
+ public void setUp() throws Exception {
+ dir = new File("target/tests/TargetedDataFileAppenderTest");
+ dir.mkdirs();
+ dataManager = new Journal();
+ dataManager.setDirectory(dir);
+ dataManager.start();
+
+ dataFile = dataManager.reserveDataFile();
+ appender = new TargetedDataFileAppender(dataManager, dataFile);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dataManager.close();
+ IOHelper.delete(dir);
+ }
+
+ @Test
+ public void testWritesAreBatched() throws Exception {
+ final int iterations = 10;
+ ByteSequence data = new ByteSequence("DATA".getBytes());
+ for (int i = 0; i < iterations; i++) {
+ appender.storeItem(data, Journal.USER_RECORD_TYPE, false);
+ }
+
+ assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+ assertTrue("Data file should be empty", dataFile.getFile().length() == 0);
+
+ appender.close();
+
+ // at this point most probably dataManager.getInflightWrites().size() >= 0
+ // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+ assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+ assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
+ }
+
+ @Test
+ public void testBatchWritesCompleteAfterClose() throws Exception {
+ final int iterations = 10;
+ ByteSequence data = new ByteSequence("DATA".getBytes());
+ for (int i = 0; i < iterations; i++) {
+ appender.storeItem(data, Journal.USER_RECORD_TYPE, false);
+ }
+
+ appender.close();
+
+ // at this point most probably dataManager.getInflightWrites().size() >= 0
+ // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+ assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+ assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
+ }
+
+ @Test
+ public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
+ final int iterations = 10;
+ final CountDownLatch latch = new CountDownLatch(iterations);
+ ByteSequence data = new ByteSequence("DATA".getBytes());
+ for (int i = 0; i < iterations; i++) {
+ appender.storeItem(data, Journal.USER_RECORD_TYPE, new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ });
+ }
+
+ appender.close();
+
+ // at this point most probably dataManager.getInflightWrites().size() >= 0
+ // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+ assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS));
+ assertTrue("Data file should not be empty", dataFile.getLength() > 0);
+ assertTrue("Data file should not be empty", dataFile.getFile().length() > 0);
+ }
+}
[3/4] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6203
Posted by cs...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6203
Adding a flag to enable or disable Ack Compaction
Currently defaults to enabled for 5.14.0 and above
(cherry picked from commit cbad8babe507f63cc80e42fe37070b898d4d8dbc)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b8ac1b4c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b8ac1b4c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b8ac1b4c
Branch: refs/heads/activemq-5.13.x
Commit: b8ac1b4c561da2087d923c3a32e9bc2624c508ea
Parents: 72eecbe
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Apr 12 13:30:10 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Apr 12 13:31:35 2016 +0000
----------------------------------------------------------------------
.../store/kahadb/KahaDBPersistenceAdapter.java | 18 +++++++++++++++++
.../activemq/store/kahadb/MessageDatabase.java | 21 +++++++++++++++++++-
2 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b8ac1b4c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index b6b2ca7..21cac0a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -669,6 +669,24 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth);
}
+ /**
+ * Returns whether Ack compaction is enabled
+ *
+ * @return enableAckCompaction
+ */
+ public boolean isEnableAckCompaction() {
+ return letter.isEnableAckCompaction();
+ }
+
+ /**
+ * Configure if the Ack compaction task should be enabled to run
+ *
+ * @param enableAckCompaction
+ */
+ public void setEnableAckCompaction(boolean enableAckCompaction) {
+ letter.setEnableAckCompaction(enableAckCompaction);
+ }
+
public KahaDBStore getStore() {
return letter;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/b8ac1b4c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 434e49c..8bb9491 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -274,6 +274,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean enableIndexPageCaching = true;
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
+ private boolean enableAckCompaction = true;
private int compactAcksAfterNoGC = 10;
private boolean compactAcksIgnoresStoreGrowth = false;
private int checkPointCyclesWithNoGC;
@@ -1817,7 +1818,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (ackMessageFileMapMod) {
checkpointUpdate(tx, false);
}
- } else {
+ } else if (isEnableAckCompaction()) {
if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
// First check length of journal to make sure it makes sense to even try.
//
@@ -3671,4 +3672,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
}
+
+ /**
+ * Returns whether Ack compaction is enabled
+ *
+ * @return enableAckCompaction
+ */
+ public boolean isEnableAckCompaction() {
+ return enableAckCompaction;
+ }
+
+ /**
+ * Configure if the Ack compaction task should be enabled to run
+ *
+ * @param enableAckCompaction
+ */
+ public void setEnableAckCompaction(boolean enableAckCompaction) {
+ this.enableAckCompaction = enableAckCompaction;
+ }
}
[4/4] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6203
Posted by cs...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6203
Disabling Ack Compaction by default for 5.13.x
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/91f05189
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/91f05189
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/91f05189
Branch: refs/heads/activemq-5.13.x
Commit: 91f05189ab68ca4ed44baa130d8653c0762a933e
Parents: b8ac1b4
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Apr 12 13:31:56 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Apr 12 13:31:56 2016 +0000
----------------------------------------------------------------------
.../java/org/apache/activemq/store/kahadb/MessageDatabase.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/91f05189/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8bb9491..d9223a8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -274,7 +274,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private boolean enableIndexPageCaching = true;
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
- private boolean enableAckCompaction = true;
+ private boolean enableAckCompaction = false;
private int compactAcksAfterNoGC = 10;
private boolean compactAcksIgnoresStoreGrowth = false;
private int checkPointCyclesWithNoGC;
[2/4] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6203
Posted by cs...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6203
Expose configuration on the persistence adapter level.
Double the test timeout and increase the compaction frequency to account
for very slow CI boxes.
(cherry picked from commit a9521dcebfb4e469dde7465ff95d8e8f1f050abd)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/72eecbe2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/72eecbe2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/72eecbe2
Branch: refs/heads/activemq-5.13.x
Commit: 72eecbe26002c29c10617713fae7d7ec6a97bc67
Parents: 2a7255a
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 15 12:02:40 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Apr 12 12:34:27 2016 +0000
----------------------------------------------------------------------
.../store/kahadb/KahaDBPersistenceAdapter.java | 33 ++++++++++++++++++++
.../TransactedStoreUsageSuspendResumeTest.java | 5 +--
2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/72eecbe2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index ebe12f3..b6b2ca7 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -636,6 +636,39 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
return letter.isEnableIndexPageCaching();
}
+ public int getCompactAcksAfterNoGC() {
+ return letter.getCompactAcksAfterNoGC();
+ }
+
+ /**
+ * Sets the number of GC cycles where no journal logs were removed before an attempt to
+ * move forward all the acks in the last log that contains them and is otherwise unreferenced.
+ * <p>
+ * A value of -1 will disable this feature.
+ *
+ * @param compactAcksAfterNoGC
+ * Number of empty GC cycles before we rewrite old ACKS.
+ */
+ public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
+ this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC);
+ }
+
+ public boolean isCompactAcksIgnoresStoreGrowth() {
+ return this.letter.isCompactAcksIgnoresStoreGrowth();
+ }
+
+ /**
+ * Configure if Ack compaction will occur regardless of continued growth of the
+ * journal logs meaning that the store has not run out of space yet. Because the
+ * compaction operation can be costly this value is defaulted to off and the Ack
+ * compaction is only done when it seems that the store cannot grow and larger.
+ *
+ * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
+ */
+ public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
+ this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth);
+ }
+
public KahaDBStore getStore() {
return letter;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/72eecbe2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
index 6b11f71..28da159 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TransactedStoreUsageSuspendResumeTest.java
@@ -110,6 +110,7 @@ public class TransactedStoreUsageSuspendResumeTest {
KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
kahaDB.setJournalMaxFileLength(256 * 1024);
kahaDB.setCleanupInterval(10*1000);
+ kahaDB.setCompactAcksAfterNoGC(5);
broker.setPersistenceAdapter(kahaDB);
broker.getSystemUsage().getStoreUsage().setLimit(7*1024*1024);
@@ -139,7 +140,7 @@ public class TransactedStoreUsageSuspendResumeTest {
}
});
sendExecutor.shutdown();
- sendExecutor.awaitTermination(5, TimeUnit.MINUTES);
+ sendExecutor.awaitTermination(10, TimeUnit.MINUTES);
boolean allMessagesReceived = messagesReceivedCountDown.await(10, TimeUnit.MINUTES);
if (!allMessagesReceived) {
@@ -148,7 +149,7 @@ public class TransactedStoreUsageSuspendResumeTest {
assertTrue("Got all messages: " + messagesReceivedCountDown, allMessagesReceived);
// give consumers a chance to exit gracefully
- TimeUnit.SECONDS.sleep(2);
+ TimeUnit.SECONDS.sleep(5);
}
private void sendMessages() throws Exception {