You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/07/20 15:00:47 UTC
cassandra git commit: Simplify commit log code
Repository: cassandra
Updated Branches:
refs/heads/trunk 058faff6f -> e8907c16a
Simplify commit log code
patch by Branimir Lambov; reviewed by Ariel Weisberg for CASSANDRA-10202
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e8907c16
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e8907c16
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e8907c16
Branch: refs/heads/trunk
Commit: e8907c16abcd84021a39cdaac79b609fcc64a43c
Parents: 058faff
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue May 3 17:36:25 2016 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jul 20 15:59:38 2016 +0100
----------------------------------------------------------------------
.../cassandra/config/DatabaseDescriptor.java | 5 +
.../AbstractCommitLogSegmentManager.java | 405 +++++++++----------
.../db/commitlog/AbstractCommitLogService.java | 132 +++---
.../db/commitlog/BatchCommitLogService.java | 2 +-
.../cassandra/db/commitlog/CommitLog.java | 47 +--
.../db/commitlog/CommitLogArchiver.java | 25 +-
.../db/commitlog/CommitLogSegment.java | 25 +-
.../commitlog/CommitLogSegmentManagerCDC.java | 13 +-
.../CommitLogSegmentManagerStandard.java | 13 +-
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 4 +-
.../db/commitlog/FileDirectSegment.java | 6 +-
.../db/commitlog/PeriodicCommitLogService.java | 25 +-
.../db/commitlog/CommitLogStressTest.java | 27 +-
test/unit/org/apache/cassandra/Util.java | 2 +-
.../CommitLogSegmentManagerCDCTest.java | 9 +-
.../commitlog/CommitLogSegmentManagerTest.java | 10 +-
.../cassandra/db/commitlog/CommitLogTest.java | 10 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
19 files changed, 352 insertions(+), 414 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 374c64d..4f188bb 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1442,6 +1442,11 @@ public class DatabaseDescriptor
return conf.commitlog_max_compression_buffers_in_pool;
}
+ public static void setCommitLogMaxCompressionBuffersPerPool(int buffers)
+ {
+ conf.commitlog_max_compression_buffers_in_pool = buffers;
+ }
+
public static int getMaxMutationSize()
{
return conf.max_mutation_size_in_kb * 1024;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7ea7439..275d5b3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -22,16 +22,18 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -45,19 +47,27 @@ public abstract class AbstractCommitLogSegmentManager
{
static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
- // Queue of work to be done by the manager thread, also used to wake the thread to perform segment allocation.
- private final BlockingQueue<Runnable> segmentManagementTasks = new LinkedBlockingQueue<>();
+ /**
+ * Segment that is ready to be used. The management thread fills this and blocks until consumed.
+ *
+ * A single management thread produces this, and consumers are already synchronizing to make sure other work is
+ * performed atomically with consuming this. Volatile to make sure writes by the management thread become
+ * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
+ * synchronize on 'this'.
+ */
+ private volatile CommitLogSegment availableSegment = null;
- /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */
- private final ConcurrentLinkedQueue<CommitLogSegment> availableSegments = new ConcurrentLinkedQueue<>();
+ private final WaitQueue segmentPrepared = new WaitQueue();
- /** Active segments, containing unflushed data */
+ /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
- /** The segment we are currently allocating commit log records to */
- protected volatile CommitLogSegment allocatingFrom = null;
-
- private final WaitQueue hasAvailableSegments = new WaitQueue();
+ /**
+ * The segment we are currently allocating commit log records to.
+ *
+ * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
+ */
+ private volatile CommitLogSegment allocatingFrom = null;
final String storageDirectory;
@@ -69,15 +79,9 @@ public abstract class AbstractCommitLogSegmentManager
*/
private final AtomicLong size = new AtomicLong();
- /**
- * New segment creation is initially disabled because we'll typically get some "free" segments
- * recycled after log replay.
- */
- volatile boolean createReserveSegments = false;
-
private Thread managerThread;
- protected volatile boolean run = true;
protected final CommitLog commitLog;
+ private volatile boolean shutdown;
private static final SimpleCachedBufferPool bufferPool =
new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
@@ -95,54 +99,33 @@ public abstract class AbstractCommitLogSegmentManager
{
public void runMayThrow() throws Exception
{
- while (run)
+ while (!shutdown)
{
try
{
- Runnable task = segmentManagementTasks.poll();
- if (task == null)
+ assert availableSegment == null;
+ logger.debug("No segments in reserve; creating a fresh one");
+ availableSegment = createSegment();
+ if (shutdown)
{
- // if we have no more work to do, check if we should create a new segment
- if (!atSegmentLimit() &&
- availableSegments.isEmpty() &&
- (activeSegments.isEmpty() || createReserveSegments))
- {
- logger.trace("No segments in reserve; creating a fresh one");
- // TODO : some error handling in case we fail to create a new segment
- availableSegments.add(createSegment());
- hasAvailableSegments.signalAll();
- }
-
- // flush old Cfs if we're full
- long unused = unusedCapacity();
- if (unused < 0)
- {
- List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
- long spaceToReclaim = 0;
- for (CommitLogSegment segment : activeSegments)
- {
- if (segment == allocatingFrom)
- break;
- segmentsToRecycle.add(segment);
- spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
- if (spaceToReclaim + unused >= 0)
- break;
- }
- flushDataFrom(segmentsToRecycle, false);
- }
-
- // Since we're operating on a "null" allocation task, block here for the next task on the
- // queue rather than looping, grabbing another null, and repeating the above work.
- try
- {
- task = segmentManagementTasks.take();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
+ // If shutdown() started and finished during segment creation, we are now left with a
+ // segment that no one will consume. Discard it.
+ discardAvailableSegment();
+ return;
}
- task.run();
+
+ segmentPrepared.signalAll();
+ Thread.yield();
+
+ if (availableSegment == null && !atSegmentBufferLimit())
+ // Writing threads need another segment now.
+ continue;
+
+ // Writing threads are not waiting for new segments, we can spend time on other tasks.
+ // flush old Cfs if we're full
+ maybeFlushToReclaim();
+
+ LockSupport.park();
}
catch (Throwable t)
{
@@ -151,27 +134,51 @@ public abstract class AbstractCommitLogSegmentManager
return;
// sleep some arbitrary period to avoid spamming CL
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // If we offered a segment, wait for it to be taken before reentering the loop.
+ // There could be a new segment in next not offered, but only on failure to discard it while
+ // shutting down-- nothing more can or needs to be done in that case.
}
- }
- }
- private boolean atSegmentLimit()
- {
- return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ while (availableSegment != null || atSegmentBufferLimit() && !shutdown)
+ LockSupport.park();
+ }
}
};
- run = true;
-
+ shutdown = false;
managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
managerThread.start();
+
+ // for simplicity, ensure the first segment is allocated before continuing
+ advanceAllocatingFrom(null);
}
+ private boolean atSegmentBufferLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ }
+
+ private void maybeFlushToReclaim()
+ {
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ long flushingSize = 0;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ flushingSize += segment.onDiskSize();
+ segmentsToRecycle.add(segment);
+ if (flushingSize + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
+ }
+ }
- /**
- * Shut down the CLSM. Used both during testing and during regular shutdown, so needs to stop everything.
- */
- public abstract void shutdown();
/**
* Allocate a segment within this CLSM. Should either succeed or throw.
@@ -200,102 +207,69 @@ public abstract class AbstractCommitLogSegmentManager
*/
abstract void discard(CommitLogSegment segment, boolean delete);
-
/**
- * Grab the current CommitLogSegment we're allocating from. Also serves as a utility method to block while the allocator
- * is working on initial allocation of a CommitLogSegment.
- */
- CommitLogSegment allocatingFrom()
- {
- CommitLogSegment r = allocatingFrom;
- if (r == null)
- {
- advanceAllocatingFrom(null);
- r = allocatingFrom;
- }
- return r;
- }
-
- /**
- * Fetches a new segment from the queue, signaling the management thread to create a new one if necessary, and "activates" it.
- * Blocks until a new segment is allocated and the thread requesting an advanceAllocatingFrom is signalled.
+ * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
*
* WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
*/
- protected void advanceAllocatingFrom(CommitLogSegment old)
+ @DontInline
+ void advanceAllocatingFrom(CommitLogSegment old)
{
- while (true)
- {
- CommitLogSegment next;
+ while (true) {
synchronized (this)
{
- // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments
- // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432
+ // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
if (allocatingFrom != old)
return;
- next = availableSegments.poll();
- if (next != null)
- {
- allocatingFrom = next;
- activeSegments.add(next);
- }
- }
- if (next != null)
- {
- if (old != null)
+ // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
+ if (availableSegment != null)
{
- // Now we can run the user defined command just after switching to the new commit log.
- // (Do this here instead of in the recycle call so we can get a head start on the archive.)
- commitLog.archiver.maybeArchive(old);
-
- // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
- old.discardUnusedTail();
+ // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
+ // the critical section.
+ activeSegments.add(allocatingFrom = availableSegment);
+ availableSegment = null;
+ break;
}
-
- // request that the CL be synced out-of-band, as we've finished a segment
- commitLog.requestExtraSync();
- return;
}
- // no more segments, so register to receive a signal when not empty
- WaitQueue.Signal signal = hasAvailableSegments.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+ awaitAvailableSegment(old);
+ }
- // trigger the management thread; this must occur after registering
- // the signal to ensure we are woken by any new segment creation
- wakeManager();
+ // Signal the management thread to prepare a new segment.
+ wakeManager();
- // check if the queue has already been added to before waiting on the signal, to catch modifications
- // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change
- if (!availableSegments.isEmpty() || allocatingFrom != old)
- {
- signal.cancel();
- // if we've been beaten, just stop immediately
- if (allocatingFrom != old)
- return;
- // otherwise try again, as there should be an available segment
- continue;
- }
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ commitLog.archiver.maybeArchive(old);
- // can only reach here if the queue hasn't been inserted into
- // before we registered the signal, as we only remove items from the queue
- // after updating allocatingFrom. Can safely block until we are signalled
- // by the allocator that new segments have been published
- signal.awaitUninterruptibly();
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ old.discardUnusedTail();
}
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ commitLog.requestExtraSync();
}
- protected void wakeManager()
+ void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
{
- // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary)
- segmentManagementTasks.add(Runnables.doNothing());
+ do
+ {
+ WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+ if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
+ prepared.awaitUninterruptibly();
+ else
+ prepared.cancel();
+ }
+ while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
}
/**
* Switch to a new segment, regardless of how much is left in the current one.
*
- * Flushes any dirty CFs for this segment and any older segments, and then recycles
- * the segments
+ * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
*/
void forceRecycleAll(Iterable<UUID> droppedCfs)
{
@@ -307,7 +281,7 @@ public abstract class AbstractCommitLogSegmentManager
last.waitForModifications();
// make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
- // on the relevant keyspaces to complete
+ // to complete
Keyspace.writeOrder.awaitNewBarrier();
// flush and wait for all CFs that are dirty in segments up-to and including 'last'
@@ -326,7 +300,7 @@ public abstract class AbstractCommitLogSegmentManager
for (CommitLogSegment segment : activeSegments)
{
if (segment.isUnused())
- recycleSegment(segment);
+ archiveAndDiscard(segment);
}
CommitLogSegment first;
@@ -341,33 +315,18 @@ public abstract class AbstractCommitLogSegmentManager
}
/**
- * Indicates that a segment is no longer in use and that it should be recycled.
+ * Indicates that a segment is no longer in use and that it should be discarded.
*
* @param segment segment that is no longer in use
*/
- void recycleSegment(final CommitLogSegment segment)
+ void archiveAndDiscard(final CommitLogSegment segment)
{
boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
- if (activeSegments.remove(segment))
- {
- // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
- discardSegment(segment, archiveSuccess);
- }
- else
- {
- logger.warn("segment {} not found in activeSegments queue", segment);
- }
- }
-
- /**
- * Indicates that a segment file should be deleted.
- *
- * @param segment segment to be discarded
- */
- private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
- {
- logger.trace("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script");
- segmentManagementTasks.add(() -> discard(segment, deleteFile));
+ if (!activeSegments.remove(segment))
+ return; // already discarded
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
+ discard(segment, archiveSuccess);
}
/**
@@ -396,28 +355,6 @@ public abstract class AbstractCommitLogSegmentManager
}
/**
- * @param name the filename to check
- * @return true if file is managed by this manager.
- */
- public boolean manages(String name)
- {
- for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments))
- if (segment.getName().equals(name))
- return true;
- return false;
- }
-
- /**
- * Throws a flag that enables the behavior of keeping at least one spare segment
- * available at all times.
- */
- void enableReserveSegmentCreation()
- {
- createReserveSegments = true;
- wakeManager();
- }
-
- /**
* Force a flush on all CFs that are still dirty in @param segments.
*
* @return a Future that will finish when all the flushes are complete.
@@ -463,10 +400,7 @@ public abstract class AbstractCommitLogSegmentManager
*/
public void stopUnsafe(boolean deleteSegments)
{
- logger.trace("CLSM closing and clearing existing commit log segments...");
- createReserveSegments = false;
-
- awaitManagementTasksCompletion();
+ logger.debug("CLSM closing and clearing existing commit log segments...");
shutdown();
try
@@ -478,35 +412,24 @@ public abstract class AbstractCommitLogSegmentManager
throw new RuntimeException(e);
}
- synchronized (this)
- {
- for (CommitLogSegment segment : activeSegments)
- closeAndDeleteSegmentUnsafe(segment, deleteSegments);
- activeSegments.clear();
-
- for (CommitLogSegment segment : availableSegments)
- closeAndDeleteSegmentUnsafe(segment, deleteSegments);
- availableSegments.clear();
- }
-
- allocatingFrom = null;
-
- segmentManagementTasks.clear();
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
size.set(0L);
logger.trace("CLSM done with closing and clearing existing commit log segments.");
}
- // Used by tests only.
+ /**
+ * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
+ */
void awaitManagementTasksCompletion()
{
- while (!segmentManagementTasks.isEmpty())
- Thread.yield();
- // The last management task is not yet complete. Wait a while for it.
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- // TODO: If this functionality is required by anything other than tests, signalling must be used to ensure
- // waiting completes correctly.
+ if (availableSegment == null && !atSegmentBufferLimit())
+ {
+ awaitAvailableSegment(allocatingFrom);
+ }
}
/**
@@ -525,18 +448,41 @@ public abstract class AbstractCommitLogSegmentManager
}
/**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ assert !shutdown;
+ shutdown = true;
+
+ // Release the management thread and delete prepared segment.
+ // Do not block as another thread may claim the segment (this can happen during unit test initialization).
+ discardAvailableSegment();
+ wakeManager();
+ }
+
+ private void discardAvailableSegment()
+ {
+ CommitLogSegment next = null;
+ synchronized (this) {
+ next = availableSegment;
+ availableSegment = null;
+ }
+ if (next != null)
+ next.discard(true);
+ }
+
+ /**
* Returns when the management thread terminates.
*/
public void awaitTermination() throws InterruptedException
{
managerThread.join();
+ managerThread = null;
for (CommitLogSegment segment : activeSegments)
segment.close();
- for (CommitLogSegment segment : availableSegments)
- segment.close();
-
bufferPool.shutdown();
}
@@ -554,18 +500,19 @@ public abstract class AbstractCommitLogSegmentManager
*/
CommitLogPosition getCurrentPosition()
{
- return allocatingFrom().getCurrentCommitLogPosition();
+ return allocatingFrom.getCurrentCommitLogPosition();
}
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync(boolean syncAllSegments) throws IOException
+ public void sync() throws IOException
{
- CommitLogSegment current = allocatingFrom();
+ CommitLogSegment current = allocatingFrom;
for (CommitLogSegment segment : getActiveSegments())
{
- if (!syncAllSegments && segment.id > current.id)
+ // Do not sync segments that became active after sync started.
+ if (segment.id > current.id)
return;
segment.sync();
}
@@ -578,5 +525,25 @@ public abstract class AbstractCommitLogSegmentManager
{
return bufferPool;
}
+
+ void wakeManager()
+ {
+ LockSupport.unpark(managerThread);
+ }
+
+ /**
+ * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
+ * a buffer to become available.
+ */
+ void notifyBufferFreed()
+ {
+ wakeManager();
+ }
+
+ /** Read-only access to current segment for subclasses. */
+ CommitLogSegment allocatingFrom()
+ {
+ return allocatingFrom;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 0ba4f55..7b56da3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -17,15 +17,18 @@
*/
package org.apache.cassandra.db.commitlog;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-import org.slf4j.*;
-
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import com.codahale.metrics.Timer.Context;
+
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
public abstract class AbstractCommitLogService
{
@@ -41,11 +44,10 @@ public abstract class AbstractCommitLogService
// signal that writers can wait on to be notified of a completed sync
protected final WaitQueue syncComplete = new WaitQueue();
- protected final Semaphore haveWork = new Semaphore(1);
final CommitLog commitLog;
private final String name;
- private final long pollIntervalMillis;
+ private final long pollIntervalNanos;
private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
@@ -59,14 +61,15 @@ public abstract class AbstractCommitLogService
{
this.commitLog = commitLog;
this.name = name;
- this.pollIntervalMillis = pollIntervalMillis;
+ this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS);
}
// Separated into individual method to ensure relevant objects are constructed before this is started.
void start()
{
- if (pollIntervalMillis < 1)
- throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+ if (pollIntervalNanos < 1)
+ throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+ pollIntervalNanos * 1e-6));
Runnable runnable = new Runnable()
{
@@ -78,26 +81,25 @@ public abstract class AbstractCommitLogService
int lagCount = 0;
int syncCount = 0;
- boolean run = true;
- while (run)
+ while (true)
{
+ // always run once after shutdown signalled
+ boolean shutdownRequested = shutdown;
+
try
{
- // always run once after shutdown signalled
- run = !shutdown;
-
// sync and signal
- long syncStarted = System.currentTimeMillis();
+ long syncStarted = System.nanoTime();
// This is a target for Byteman in CommitLogSegmentManagerTest
- commitLog.sync(shutdown);
+ commitLog.sync();
lastSyncedAt = syncStarted;
syncComplete.signalAll();
// sleep any time we have left before the next one is due
- long now = System.currentTimeMillis();
- long sleep = syncStarted + pollIntervalMillis - now;
- if (sleep < 0)
+ long now = System.nanoTime();
+ long wakeUpAt = syncStarted + pollIntervalNanos;
+ if (wakeUpAt < now)
{
// if we have lagged noticeably, update our lag counter
if (firstLagAt == 0)
@@ -105,7 +107,7 @@ public abstract class AbstractCommitLogService
firstLagAt = now;
totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0;
}
- syncExceededIntervalBy -= sleep;
+ syncExceededIntervalBy += now - wakeUpAt;
lagCount++;
}
syncCount++;
@@ -114,30 +116,25 @@ public abstract class AbstractCommitLogService
if (firstLagAt > 0)
{
//Only reset the lag tracking if it actually logged this time
- boolean logged = NoSpamLogger.log(
- logger,
- NoSpamLogger.Level.WARN,
- 5,
- TimeUnit.MINUTES,
- "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
- syncCount, (now - firstLagAt) / 1000, String.format("%.2f", (double) totalSyncDuration / syncCount), lagCount, String.format("%.2f", (double) syncExceededIntervalBy / lagCount));
+ boolean logged = NoSpamLogger.log(logger,
+ NoSpamLogger.Level.WARN,
+ 5,
+ TimeUnit.MINUTES,
+ "Out of {} commit log syncs over the past {}s with average duration of {}ms, {} have exceeded the configured commit interval by an average of {}ms",
+ syncCount,
+ String.format("%.2f", (now - firstLagAt) * 1e-9d),
+ String.format("%.2f", totalSyncDuration * 1e-6d / syncCount),
+ lagCount,
+ String.format("%.2f", syncExceededIntervalBy * 1e-6d / lagCount));
if (logged)
firstLagAt = 0;
}
- // if we have lagged this round, we probably have work to do already so we don't sleep
- if (sleep < 0 || !run)
- continue;
+ if (shutdownRequested)
+ return;
- try
- {
- haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS);
- haveWork.drainPermits();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
+ if (wakeUpAt > now)
+ LockSupport.parkNanos(wakeUpAt - now);
}
catch (Throwable t)
{
@@ -145,19 +142,13 @@ public abstract class AbstractCommitLogService
break;
// sleep for full poll-interval after an error, so we don't spam the log file
- try
- {
- haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
+ LockSupport.parkNanos(pollIntervalNanos);
}
}
}
};
+ shutdown = false;
thread = new Thread(runnable, name);
thread.start();
}
@@ -174,42 +165,43 @@ public abstract class AbstractCommitLogService
protected abstract void maybeWaitForSync(Allocation alloc);
/**
- * Sync immediately, but don't block for the sync to cmplete
+ * Request an additional sync cycle without blocking.
*/
- public WaitQueue.Signal requestExtraSync()
+ public void requestExtraSync()
{
- WaitQueue.Signal signal = syncComplete.register();
- haveWork.release(1);
- return signal;
+ LockSupport.unpark(thread);
}
public void shutdown()
{
shutdown = true;
- haveWork.release(1);
+ requestExtraSync();
}
/**
- * FOR TESTING ONLY
+ * Request sync and wait until the current state is synced.
+ *
+ * Note: If a sync is in progress at the time of this request, the call will return after both it and a cycle
+ * initiated immediately afterwards complete.
*/
- public void restartUnsafe()
+ public void syncBlocking()
{
- while (haveWork.availablePermits() < 1)
- haveWork.release();
+ long requestTime = System.nanoTime();
+ requestExtraSync();
+ awaitSyncAt(requestTime, null);
+ }
- while (haveWork.availablePermits() > 1)
+ void awaitSyncAt(long syncTime, Context context)
+ {
+ do
{
- try
- {
- haveWork.acquire();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ WaitQueue.Signal signal = context != null ? syncComplete.register(context) : syncComplete.register();
+ if (lastSyncedAt < syncTime)
+ signal.awaitUninterruptibly();
+ else
+ signal.cancel();
}
- shutdown = false;
- start();
+ while (lastSyncedAt < syncTime);
}
public void awaitTermination() throws InterruptedException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
index ceb5d64..4edfa34 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogService.java
@@ -30,7 +30,7 @@ class BatchCommitLogService extends AbstractCommitLogService
{
// wait until record has been safely persisted to disk
pending.incrementAndGet();
- haveWork.release();
+ requestExtraSync();
alloc.awaitDiskSync(commitLog.metrics.waitingOnCommit);
pending.decrementAndGet();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index b66221c..d76b9cb 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -116,8 +116,8 @@ public class CommitLog implements CommitLogMBean
CommitLog start()
{
- executor.start();
segmentManager.start();
+ executor.start();
return this;
}
@@ -129,22 +129,11 @@ public class CommitLog implements CommitLogMBean
*/
public int recoverSegmentsOnDisk() throws IOException
{
- // If createReserveSegments is already flipped, the CLSM is running and recovery has already taken place.
- if (segmentManager.createReserveSegments)
- return 0;
-
- FilenameFilter unmanagedFilesFilter = new FilenameFilter()
- {
- public boolean accept(File dir, String name)
- {
- // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes)
- // until after recover was finished. this turns out to be fragile; it is less error-prone to go
- // ahead and allow writes before recover, and just skip active segments when we do.
- return CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
- }
- };
+ FilenameFilter unmanagedFilesFilter = (dir, name) -> CommitLogDescriptor.isValid(name) && CommitLogSegment.shouldReplay(name);
// submit all files for this segment manager for archiving prior to recovery - CASSANDRA-6904
+ // The files may have already been archived by normal CommitLog operation. This may cause errors in this
+ // archiving pass, which we should not treat as serious.
for (File file : new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter))
{
archiver.maybeArchive(file.getPath(), file.getName());
@@ -154,6 +143,7 @@ public class CommitLog implements CommitLogMBean
assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore";
archiver.maybeRestoreArchive();
+ // List the files again as archiver may have added segments.
File[] files = new File(segmentManager.storageDirectory).listFiles(unmanagedFilesFilter);
int replayed = 0;
if (files.length == 0)
@@ -171,7 +161,6 @@ public class CommitLog implements CommitLogMBean
segmentManager.handleReplayedSegment(f);
}
- segmentManager.enableReserveSegmentCreation();
return replayed;
}
@@ -231,9 +220,9 @@ public class CommitLog implements CommitLogMBean
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync(boolean syncAllSegments) throws IOException
+ public void sync() throws IOException
{
- segmentManager.sync(syncAllSegments);
+ segmentManager.sync();
}
/**
@@ -315,8 +304,8 @@ public class CommitLog implements CommitLogMBean
if (segment.isUnused())
{
- logger.trace("Commit log segment {} is unused", segment);
- segmentManager.recycleSegment(segment);
+ logger.debug("Commit log segment {} is unused", segment);
+ segmentManager.archiveAndDiscard(segment);
}
else
{
@@ -455,23 +444,7 @@ public class CommitLog implements CommitLogMBean
*/
public int restartUnsafe() throws IOException
{
- segmentManager.start();
- executor.restartUnsafe();
- try
- {
- return recoverSegmentsOnDisk();
- }
- catch (FSWriteError e)
- {
- // Workaround for a class of races that keeps showing up on Windows tests.
- // stop/start/reset path on Windows with segment deletion is very touchy/brittle
- // and the timing keeps getting screwed up. Rather than chasing our tail further
- // or rewriting the CLSM, just report that we didn't recover anything back up
- // the chain. This will silence most intermittent test failures on Windows
- // and appropriately fail tests that expected segments to be recovered that
- // were not.
- return 0;
- }
+ return start().recoverSegmentsOnDisk();
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 044f2db..1abdd79 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
public class CommitLogArchiver
{
@@ -151,22 +152,32 @@ public class CommitLogArchiver
/**
* Differs from the above because it can be used on any file, rather than only
- * managed commit log segments (and thus cannot call waitForFinalSync).
+ * managed commit log segments (and thus cannot call waitForFinalSync), and in
+ * the treatment of failures.
*
- * Used to archive files present in the commit log directory at startup (CASSANDRA-6904)
+ * Used to archive files present in the commit log directory at startup (CASSANDRA-6904).
+ * Since the files being already archived by normal operation could cause subsequent
+ * hard-linking or other operations to fail, we should not throw errors on failure
*/
public void maybeArchive(final String path, final String name)
{
if (Strings.isNullOrEmpty(archiveCommand))
return;
- archivePending.put(name, executor.submit(new WrappedRunnable()
+ archivePending.put(name, executor.submit(new Runnable()
{
- protected void runMayThrow() throws IOException
+ public void run()
{
- String command = NAME.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(name));
- command = PATH.matcher(command).replaceAll(Matcher.quoteReplacement(path));
- exec(command);
+ try
+ {
+ String command = NAME.matcher(archiveCommand).replaceAll(Matcher.quoteReplacement(name));
+ command = PATH.matcher(command).replaceAll(Matcher.quoteReplacement(path));
+ exec(command);
+ }
+ catch (IOException e)
+ {
+ logger.warn("Archiving file {} failed, file may have already been archived.", name, e);
+ }
}
}));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a1158be..eb9759e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -29,8 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
import org.apache.cassandra.config.*;
@@ -38,6 +36,7 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.CLibrary;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -51,8 +50,6 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
*/
public abstract class CommitLogSegment
{
- private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class);
-
private final static long idBase;
private CDCState cdcState = CDCState.PERMITTED;
@@ -117,14 +114,13 @@ public abstract class CommitLogSegment
ByteBuffer buffer;
private volatile boolean headerWritten;
- final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
- static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+ static CommitLogSegment createSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
Configuration config = commitLog.configuration;
- CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager, onClose)
- : config.useCompression() ? new CompressedSegment(commitLog, manager, onClose)
+ CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, manager)
+ : config.useCompression() ? new CompressedSegment(commitLog, manager)
: new MemoryMappedSegment(commitLog, manager);
segment.writeLogHeader();
return segment;
@@ -152,7 +148,6 @@ public abstract class CommitLogSegment
*/
CommitLogSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
- this.commitLog = commitLog;
this.manager = manager;
id = getNextId();
@@ -370,6 +365,18 @@ public abstract class CommitLogSegment
}
/**
+ * Discards a segment file when the log no longer requires it. The file may be left on disk if the archive script
+ * requires it. (Potentially blocking operation)
+ */
+ void discard(boolean deleteFile)
+ {
+ close();
+ if (deleteFile)
+ FileUtils.deleteWithConfirm(logFile);
+ manager.addSize(-onDiskSize());
+ }
+
+ /**
* @return the current CommitLogPosition for this log segment
*/
public CommitLogPosition getCurrentCommitLogPosition()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
index 306cec8..a91384f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -53,8 +53,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
@Override
void start()
{
- super.start();
cdcSizeTracker.start();
+ super.start();
}
public void discard(CommitLogSegment segment, boolean delete)
@@ -78,9 +78,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
*/
public void shutdown()
{
- run = false;
cdcSizeTracker.shutdown();
- wakeManager();
+ super.shutdown();
}
/**
@@ -103,7 +102,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
{
// Failed to allocate, so move to a new segment with enough room if possible.
advanceAllocatingFrom(segment);
- segment = allocatingFrom;
+ segment = allocatingFrom();
throwIfForbidden(mutation, segment);
}
@@ -146,7 +145,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
*/
public CommitLogSegment createSegment()
{
- CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+ CommitLogSegment segment = CommitLogSegment.createSegment(commitLog, this);
cdcSizeTracker.processNewSegment(segment);
return segment;
}
@@ -179,6 +178,8 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
*/
public void start()
{
+ size = 0;
+ unflushedCDCSize = 0;
cdcSizeCalculationExecutor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy());
}
@@ -245,7 +246,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
{
rateLimiter.acquire();
calculateSize();
- CommitLogSegment allocatingFrom = segmentManager.allocatingFrom;
+ CommitLogSegment allocatingFrom = segmentManager.allocatingFrom();
if (allocatingFrom.getCDCState() == CDCState.FORBIDDEN)
processNewSegment(allocatingFrom);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
index 333077c..86e886b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerStandard.java
@@ -39,15 +39,6 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan
}
/**
- * Initiates the shutdown process for the management thread.
- */
- public void shutdown()
- {
- run = false;
- wakeManager();
- }
-
- /**
* Reserve space in the current segment for the provided mutation or, if there isn't space available,
* create a new segment. allocate() is blocking until allocation succeeds as it waits on a signal in advanceAllocatingFrom
*
@@ -64,7 +55,7 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan
{
// failed to allocate, so move to a new segment with enough room
advanceAllocatingFrom(segment);
- segment = allocatingFrom;
+ segment = allocatingFrom();
}
return alloc;
@@ -84,6 +75,6 @@ public class CommitLogSegmentManagerStandard extends AbstractCommitLogSegmentMan
public CommitLogSegment createSegment()
{
- return CommitLogSegment.createSegment(commitLog, this, () -> wakeManager());
+ return CommitLogSegment.createSegment(commitLog, this);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 2e46028..288b766 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -42,9 +42,9 @@ public class CompressedSegment extends FileDirectSegment
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+ CompressedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
- super(commitLog, manager, onClose);
+ super(commitLog, manager);
this.compressor = commitLog.configuration.getCompressor();
manager.getBufferPool().setPreferredReusableBufferType(compressor.preferredBufferType());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 103351e..4ca1ede 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -65,9 +65,9 @@ public class EncryptedSegment extends FileDirectSegment
private final EncryptionContext encryptionContext;
private final Cipher cipher;
- public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+ public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
- super(commitLog, manager, onClose);
+ super(commitLog, manager);
this.encryptionContext = commitLog.configuration.getEncryptionContext();
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index d4160e4..55084be 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -29,12 +29,10 @@ import org.apache.cassandra.io.FSWriteError;
public abstract class FileDirectSegment extends CommitLogSegment
{
volatile long lastWrittenPos = 0;
- private final Runnable onClose;
- FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager, Runnable onClose)
+ FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
{
super(commitLog, manager);
- this.onClose = onClose;
}
@Override
@@ -62,7 +60,7 @@ public abstract class FileDirectSegment extends CommitLogSegment
}
finally
{
- onClose.run();
+ manager.notifyBufferFreed();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..bde832b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -18,11 +18,10 @@
package org.apache.cassandra.db.commitlog;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
class PeriodicCommitLogService extends AbstractCommitLogService
{
- private static final int blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5);
+ private static final long blockWhenSyncLagsNanos = (long) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5e6);
public PeriodicCommitLogService(final CommitLog commitLog)
{
@@ -31,28 +30,12 @@ class PeriodicCommitLogService extends AbstractCommitLogService
protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
{
- if (waitForSyncToCatchUp(Long.MAX_VALUE))
+ long expectedSyncTime = System.nanoTime() - blockWhenSyncLagsNanos;
+ if (lastSyncedAt < expectedSyncTime)
{
- // wait until periodic sync() catches up with its schedule
- long started = System.currentTimeMillis();
pending.incrementAndGet();
- while (waitForSyncToCatchUp(started))
- {
- WaitQueue.Signal signal = syncComplete.register(commitLog.metrics.waitingOnCommit.time());
- if (waitForSyncToCatchUp(started))
- signal.awaitUninterruptibly();
- else
- signal.cancel();
- }
+ awaitSyncAt(expectedSyncTime, commitLog.metrics.waitingOnCommit.time());
pending.decrementAndGet();
}
}
-
- /**
- * @return true if sync is currently lagging behind inserts
- */
- private boolean waitForSyncToCatchUp(long started)
- {
- return started > lastSyncedAt + blockWhenSyncLagsMillis;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 06c252f..b86a15b 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -29,9 +29,15 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.RateLimiter;
-import org.junit.*;
-import org.apache.cassandra.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.config.Config.CommitLogSync;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.Mutation;
@@ -97,6 +103,7 @@ public class CommitLogStressTest
initialize();
CommitLogStressTest tester = new CommitLogStressTest();
+ tester.cleanDir();
tester.testFixedSize();
}
catch (Throwable e)
@@ -130,12 +137,13 @@ public class CommitLogStressTest
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
+
+ CommitLog.instance.stopUnsafe(true);
}
@Before
public void cleanDir() throws IOException
{
- CommitLog.instance.stopUnsafe(true);
File dir = new File(location);
if (dir.isDirectory())
{
@@ -209,8 +217,6 @@ public class CommitLogStressTest
{
DatabaseDescriptor.setCommitLogSync(sync);
CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start();
- // Need to enable reserve segment creation as close to test start as possible to minimize race
- commitLog.segmentManager.enableReserveSegmentCreation();
testLog(commitLog);
assert !failed;
}
@@ -307,17 +313,12 @@ public class CommitLogStressTest
private void verifySizes(CommitLog commitLog)
{
// Complete anything that's still left to write.
- commitLog.executor.requestExtraSync().awaitUninterruptibly();
- // One await() does not suffice as we may be signalled when an ongoing sync finished. Request another
- // (which shouldn't write anything) to make sure the first we triggered completes.
- // FIXME: The executor should give us a chance to await completion of the sync we requested.
- commitLog.executor.requestExtraSync().awaitUninterruptibly();
-
- // Wait for any pending deletes or segment allocations to complete.
+ commitLog.executor.syncBlocking();
+ // Wait for any concurrent segment allocations to complete.
commitLog.segmentManager.awaitManagementTasksCompletion();
long combinedSize = 0;
- for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+ for (File f : new File(commitLog.segmentManager.storageDirectory).listFiles())
combinedSize += f.length();
Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index df83ff1..e5c1831 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -530,7 +530,7 @@ public class Util
public static void spinAssertEquals(Object expected, Supplier<Object> s, int timeoutInSeconds)
{
long now = System.currentTimeMillis();
- while (System.currentTimeMillis() - now < now + (1000 * timeoutInSeconds))
+ while (System.currentTimeMillis() < now + (1000 * timeoutInSeconds))
{
if (s.get().equals(expected))
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index e308a2f..68ce57d 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@ -51,8 +51,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
@Before
public void before() throws IOException
{
- // disable reserve segment to get more deterministic allocation/testing of CDC boundary states
- CommitLog.instance.forceRecycleAllSegments();
+ CommitLog.instance.resetUnsafe(true);
for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
FileUtils.deleteWithConfirm(f);
}
@@ -120,7 +119,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
for (int i = 0; i < 8; i++)
{
new RowUpdateBuilder(currentTableMetadata(), 0, i)
- .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4)) // fit 3 in a segment
.build().apply();
}
@@ -136,7 +135,7 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
for (int i = 0; i < 8; i++)
{
new RowUpdateBuilder(currentTableMetadata(), 0, i)
- .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
.build().apply();
}
// 4 total again, 3 CONTAINS, 1 in waiting PERMITTED
@@ -215,6 +214,6 @@ public class CommitLogSegmentManagerCDCTest extends CQLTester
private void expectCurrentCDCState(CDCState state)
{
Assert.assertEquals("Received unexpected CDCState on current allocatingFrom segment.",
- state, CommitLog.instance.segmentManager.allocatingFrom.getCDCState());
+ state, CommitLog.instance.segmentManager.allocatingFrom().getCDCState());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
index b777389..397a8eb 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerTest.java
@@ -21,6 +21,7 @@
package org.apache.cassandra.db.commitlog;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Semaphore;
import javax.naming.ConfigurationException;
@@ -51,6 +52,7 @@ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
public class CommitLogSegmentManagerTest
{
//Block commit log service from syncing
+ @SuppressWarnings("unused")
private static final Semaphore allowSync = new Semaphore(0);
private static final String KEYSPACE1 = "CommitLogTest";
@@ -66,6 +68,7 @@ public class CommitLogSegmentManagerTest
DatabaseDescriptor.setCommitLogSegmentSize(1);
DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
DatabaseDescriptor.setCommitLogSyncPeriod(10 * 1000);
+ DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3);
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
@@ -109,11 +112,14 @@ public class CommitLogSegmentManagerTest
}
Thread.sleep(1000);
- // Should only be able to create 3 segments (not 7) because it blocks waiting for truncation that never comes.
+ // Should only be able to create 3 segments not 7 because it blocks waiting for truncation that never comes
Assert.assertEquals(3, clsm.getActiveSegments().size());
- clsm.getActiveSegments().forEach(segment -> clsm.recycleSegment(segment));
+ // Discard the currently active segments so allocation can continue.
+ // Take snapshot of the list, otherwise this will also discard newly allocated segments.
+ new ArrayList<>(clsm.getActiveSegments()).forEach( clsm::archiveAndDiscard );
+ // The allocated count should reach the limit again.
Util.spinAssertEquals(3, () -> clsm.getActiveSegments().size(), 5);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 79051e0..4bc5f6b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -277,7 +277,7 @@ public class CommitLogTest
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync();
CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getCurrentPosition());
assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
@@ -594,7 +594,7 @@ public class CommitLogTest
cellCount += 1;
CommitLog.instance.add(rm2);
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync();
SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
@@ -631,7 +631,7 @@ public class CommitLogTest
}
}
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync();
SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
@@ -661,6 +661,10 @@ public class CommitLogTest
@Override
public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
{
+ // Filter out system writes that could flake the test.
+ if (!KEYSPACE1.equals(m.getKeyspaceName()))
+ return;
+
if (entryLocation <= filterPosition.position)
{
// Skip over this mutation.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e8907c16/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 9a22b04..7f43378 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -40,7 +40,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException
{
super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync();
this.processor = processor;
commitLogReader = new CommitLogTestReader();