You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/12/05 13:12:02 UTC
[1/6] cassandra git commit: More frequent commitlog chained markers
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 eb05025c0 -> 05cb556f9
refs/heads/cassandra-3.11 d577918ba -> c3a1a4fa8
refs/heads/trunk d274c6ac0 -> 2402acd47
More frequent commitlog chained markers
patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f
Branch: refs/heads/cassandra-3.0
Commit: 05cb556f90dbd1929a180254809e05620265419b
Parents: eb05025
Author: Jason Brown <ja...@apple.com>
Authored: Tue Oct 17 15:37:13 2017 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:06:38 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 10 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../db/commitlog/AbstractCommitLogService.java | 86 ++++++++++---
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogSegment.java | 85 ++++++++----
.../db/commitlog/CompressedSegment.java | 12 ++
.../db/commitlog/MemoryMappedSegment.java | 7 +-
.../db/commitlog/PeriodicCommitLogService.java | 2 +-
.../commitlog/CommitLogChainedMarkersTest.java | 128 +++++++++++++++++++
.../CommitLogSegmentBackpressureTest.java | 4 +-
.../cassandra/db/commitlog/CommitLogTest.java | 6 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
14 files changed, 304 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c50a3f..2683dc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * More frequent commitlog chained markers (CASSANDRA-13987)
* Fix serialized size of DataLimits (CASSANDRA-14057)
* Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b783090..71e0b2a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -300,10 +300,18 @@ counter_cache_save_period: 7200
#
# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
+# Time interval in millis at which we should update the chained markers in the commitlog.
+# This allows more of the commitlog to be replayed from the mmapped file
+# if the cassandra process crashes; this does not help in durability for surviving a host fail.
+# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
+# and only applies to periodic mode when not using commitlog compression or encryption.
+# commitlog_marker_period_in_ms: 100
+
+
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
# in it (potentially from each columnfamily in the system) has been
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..0796183 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
public CommitLogSync commitlog_sync;
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
+ public Integer commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..169ed3d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,6 +1519,16 @@ public class DatabaseDescriptor
conf.commitlog_sync_period_in_ms = periodMillis;
}
+ public static void setCommitLogMarkerPeriod(int markerPeriod)
+ {
+ conf.commitlog_marker_period_in_ms = markerPeriod;
+ }
+
+ public static int getCommitLogMarkerPeriod()
+ {
+ return conf.commitlog_marker_period_in_ms;
+ }
+
public static Config.CommitLogSync getCommitLogSync()
{
return conf.commitlog_sync;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index e5a5887..8a03b2f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -18,6 +18,9 @@
package org.apache.cassandra.db.commitlog;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
public abstract class AbstractCommitLogService
{
@@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService
final CommitLog commitLog;
private final String name;
- private final long pollIntervalMillis;
+
+ /**
+ * The duration between syncs to disk.
+ */
+ private final long syncIntervalMillis;
+
+ /**
+ * The duration between updating the chained markers in the the commit log file. This value should be
+ * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
+ */
+ private final long markerIntervalMillis;
+
+ /**
+ * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+ * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+ * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+ */
+ private volatile boolean syncRequested;
private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
@@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService
*
* Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
*/
- AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+ {
+ this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+ }
+
+ /**
+ * CommitLogService provides a fsync service for Allocations, fulfilling either the
+ * Batch or Periodic contract.
+ *
+ * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+ */
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
{
this.commitLog = commitLog;
this.name = name;
- this.pollIntervalMillis = pollIntervalMillis;
+ this.syncIntervalMillis = syncIntervalMillis;
+
+ // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
+ // faster than the sync interval
+ if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
+ markerIntervalMillis = syncIntervalMillis;
+
+ // apply basic bounds checking on the marker interval
+ if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+ {
+ logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+ markerIntervalMillis, syncIntervalMillis);
+ markerIntervalMillis = syncIntervalMillis;
+ }
+
+ this.markerIntervalMillis = markerIntervalMillis;
}
// Separated into individual method to ensure relevant objects are constructed before this is started.
void start()
{
- if (pollIntervalMillis < 1)
- throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+ if (syncIntervalMillis < 1)
+ throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+ syncIntervalMillis * 1e-6));
Runnable runnable = new Runnable()
{
@@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService
run = !shutdown;
// sync and signal
- long syncStarted = System.currentTimeMillis();
- //This is a target for Byteman in CommitLogSegmentManagerTest
- commitLog.sync(shutdown);
- lastSyncedAt = syncStarted;
- syncComplete.signalAll();
-
+ long pollStarted = System.currentTimeMillis();
+ if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+ {
+ // in this branch, we want to flush the commit log to disk
+ commitLog.sync(shutdown, true);
+ syncRequested = false;
+ lastSyncedAt = pollStarted;
+ syncComplete.signalAll();
+ }
+ else
+ {
+ // in this branch, just update the commit log sync headers
+ commitLog.sync(false, false);
+ }
// sleep any time we have left before the next one is due
long now = System.currentTimeMillis();
- long sleep = syncStarted + pollIntervalMillis - now;
+ long sleep = pollStarted + markerIntervalMillis - now;
if (sleep < 0)
{
// if we have lagged noticeably, update our lag counter
@@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService
lagCount++;
}
syncCount++;
- totalSyncDuration += now - syncStarted;
+ totalSyncDuration += now - pollStarted;
if (firstLagAt > 0)
{
@@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService
// sleep for full poll-interval after an error, so we don't spam the log file
try
{
- haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+ haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
@@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService
*/
public WaitQueue.Signal requestExtraSync()
{
+ syncRequested = true;
WaitQueue.Signal signal = syncComplete.register();
haveWork.release(1);
return signal;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 40040ed..ff1b712 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync(boolean syncAllSegments)
+ public void sync(boolean syncAllSegments, boolean flush)
{
CommitLogSegment current = allocator.allocatingFrom();
for (CommitLogSegment segment : allocator.getActiveSegments())
{
if (!syncAllSegments && segment.id > current.id)
return;
- segment.sync();
+ segment.sync(flush);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 236a1b1..8834c8c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -94,6 +94,12 @@ public abstract class CommitLogSegment
// sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
private volatile int lastSyncedOffset;
+ /**
+ * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+ * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+ */
+ private volatile int lastMarkerOffset;
+
// The end position of the buffer. Initially set to its capacity and updated to point to the last written position
// as the segment is being closed.
// No need to be volatile as writes are protected by appendOrder barrier.
@@ -167,7 +173,8 @@ public abstract class CommitLogSegment
// write the header
CommitLogDescriptor.writeHeader(buffer, descriptor);
endOfBuffer = buffer.capacity();
- lastSyncedOffset = buffer.position();
+
+ lastSyncedOffset = lastMarkerOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
}
@@ -232,7 +239,7 @@ public abstract class CommitLogSegment
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
void discardUnusedTail()
{
- // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+ // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom()
// Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
// This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
// running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
@@ -271,13 +278,18 @@ public abstract class CommitLogSegment
}
/**
- * Forces a disk flush for this segment file.
+ * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+ *
+ * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
*/
- synchronized void sync()
+ synchronized void sync(boolean flush)
{
- boolean close = false;
+ assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+ lastMarkerOffset, lastSyncedOffset);
// check we have more work to do
- if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+ final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+ final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+ if (!(needToMarkData || hasDataToFlush))
return;
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
@@ -285,31 +297,48 @@ public abstract class CommitLogSegment
// succeeded in the previous sync.
assert buffer != null; // Only close once.
- int startMarker = lastSyncedOffset;
- // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
- // the point at which we can safely consider records to have been completely written to.
- int nextMarker = allocate(SYNC_MARKER_SIZE);
- if (nextMarker < 0)
+ boolean close = false;
+ int startMarker = lastMarkerOffset;
+ int nextMarker, sectionEnd;
+ if (needToMarkData)
{
- // Ensure no more of this CLS is writeable, and mark ourselves for closing.
- discardUnusedTail();
- close = true;
-
- // We use the buffer size as the synced position after a close instead of the end of the actual data
- // to make sure we only close the buffer once.
- // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
- nextMarker = buffer.capacity();
+ // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+ // the point at which we can safely consider records to have been completely written to.
+ nextMarker = allocate(SYNC_MARKER_SIZE);
+ if (nextMarker < 0)
+ {
+ // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+ discardUnusedTail();
+ close = true;
+
+ // We use the buffer size as the synced position after a close instead of the end of the actual data
+ // to make sure we only close the buffer once.
+ // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+ nextMarker = buffer.capacity();
+ }
+ // Wait for mutations to complete as well as endOfBuffer to have been written.
+ waitForModifications();
+ sectionEnd = close ? endOfBuffer : nextMarker;
+
+ // Possibly perform compression or encryption and update the chained markers
+ write(startMarker, sectionEnd);
+ lastMarkerOffset = sectionEnd;
+ }
+ else
+ {
+ // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+ // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+ nextMarker = lastMarkerOffset;
+ sectionEnd = nextMarker;
}
- // Wait for mutations to complete as well as endOfBuffer to have been written.
- waitForModifications();
- int sectionEnd = close ? endOfBuffer : nextMarker;
- // Perform compression, writing to file and flush.
- write(startMarker, sectionEnd);
+ if (flush || close)
+ {
+ flush(startMarker, sectionEnd);
+ lastSyncedOffset = lastMarkerOffset = nextMarker;
+ }
- // Signal the sync as complete.
- lastSyncedOffset = nextMarker;
if (close)
internalClose();
syncComplete.signalAll();
@@ -327,6 +356,8 @@ public abstract class CommitLogSegment
abstract void write(int lastSyncedOffset, int nextMarker);
+ abstract void flush(int startMarker, int nextMarker);
+
public boolean isStillAllocating()
{
return allocatePosition.get() < endOfBuffer;
@@ -404,7 +435,7 @@ public abstract class CommitLogSegment
synchronized void close()
{
discardUnusedTail();
- sync();
+ sync(true);
assert buffer == null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c00ce18..8e05112 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment
channel.write(compressedBuffer);
assert channel.position() - lastWrittenPos == compressedBuffer.limit();
lastWrittenPos = channel.position();
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+ try
+ {
SyncUtil.force(channel, true);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a16d91..8259f04 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment
// write previous sync marker to point to next sync marker
// we don't chain the crcs here to ensure this method is idempotent if it fails
writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
- try {
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+ try
+ {
SyncUtil.force((MappedByteBuffer) buffer);
}
catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..76419b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
public PeriodicCommitLogService(final CommitLog commitLog)
{
- super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+ super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
}
protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
new file mode 100644
index 0000000..e2b9f72
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+/**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+@RunWith(BMUnitRunner.class)
+public class CommitLogChainedMarkersTest
+{
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+ @Test
+ @BMRule(name = "force all calls to sync() to not flush to disk",
+ targetClass = "CommitLogSegment",
+ targetMethod = "sync(boolean)",
+ action = "$flush = false")
+ public void replayCommitLogWithoutFlushing() throws IOException
+ {
+ DatabaseDescriptor.setCommitLogSegmentSize(5);
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+ DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ byte[] entropy = new byte[1024];
+ new Random().nextBytes(entropy);
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ int samples = 10000;
+ for (int i = 0; i < samples; i++)
+ CommitLog.instance.add(m);
+
+ CommitLog.instance.sync(false, true);
+
+ Replayer replayer = new Replayer(cfs1.metadata);
+ File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
+ replayer.recover(commitLogDir.listFiles());
+ Assert.assertEquals(samples, replayer.count);
+ }
+
+ private static class Replayer extends CommitLogReplayer
+ {
+ private final CFMetaData cfm;
+ private int count;
+
+ Replayer(CFMetaData cfm)
+ {
+ super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
+ this.cfm = cfm;
+ }
+
+ @Override
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+ {
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ try
+ {
+ Mutation mutation = Mutation.serializer.deserialize(bufIn,
+ desc.getMessagingVersion(),
+ SerializationHelper.Flag.LOCAL);
+
+ if (cfm == null || mutation.get(cfm) != null)
+ count++;
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index b651098..a1999ef 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest
@BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
@BMRule(name = "Release Semaphore after sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
public void testCompressedCommitLogBackpressure() throws Throwable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9e9ee53..b8f68ed 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -339,7 +339,7 @@ public class CommitLogTest
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
@@ -652,7 +652,7 @@ public class CommitLogTest
DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
}
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
// If retries work subsequent flushes should clear up error and this should change to expect 0.
@@ -685,7 +685,7 @@ public class CommitLogTest
for (SSTableReader reader : cfs.getLiveSSTables())
reader.reloadSSTableMetadata();
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
// persisted all data in the commit log. Because we know there was an error, there must be something left to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..36973f2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
{
public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
{
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/6] cassandra git commit: More frequent commitlog chained markers
Posted by ja...@apache.org.
More frequent commitlog chained markers
patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f
Branch: refs/heads/cassandra-3.11
Commit: 05cb556f90dbd1929a180254809e05620265419b
Parents: eb05025
Author: Jason Brown <ja...@apple.com>
Authored: Tue Oct 17 15:37:13 2017 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:06:38 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 10 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../db/commitlog/AbstractCommitLogService.java | 86 ++++++++++---
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogSegment.java | 85 ++++++++----
.../db/commitlog/CompressedSegment.java | 12 ++
.../db/commitlog/MemoryMappedSegment.java | 7 +-
.../db/commitlog/PeriodicCommitLogService.java | 2 +-
.../commitlog/CommitLogChainedMarkersTest.java | 128 +++++++++++++++++++
.../CommitLogSegmentBackpressureTest.java | 4 +-
.../cassandra/db/commitlog/CommitLogTest.java | 6 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
14 files changed, 304 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c50a3f..2683dc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * More frequent commitlog chained markers (CASSANDRA-13987)
* Fix serialized size of DataLimits (CASSANDRA-14057)
* Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b783090..71e0b2a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -300,10 +300,18 @@ counter_cache_save_period: 7200
#
# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
+# Time interval in millis at which we should update the chained markers in the commitlog.
+# This allows more of the commitlog to be replayed from the mmapped file
+# if the cassandra process crashes; this does not help in durability for surviving a host fail.
+# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
+# and only applies to periodic mode when not using commitlog compression or encryption.
+# commitlog_marker_period_in_ms: 100
+
+
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
# in it (potentially from each columnfamily in the system) has been
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..0796183 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
public CommitLogSync commitlog_sync;
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
+ public Integer commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..169ed3d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,6 +1519,16 @@ public class DatabaseDescriptor
conf.commitlog_sync_period_in_ms = periodMillis;
}
+ public static void setCommitLogMarkerPeriod(int markerPeriod)
+ {
+ conf.commitlog_marker_period_in_ms = markerPeriod;
+ }
+
+ public static int getCommitLogMarkerPeriod()
+ {
+ return conf.commitlog_marker_period_in_ms;
+ }
+
public static Config.CommitLogSync getCommitLogSync()
{
return conf.commitlog_sync;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index e5a5887..8a03b2f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -18,6 +18,9 @@
package org.apache.cassandra.db.commitlog;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
public abstract class AbstractCommitLogService
{
@@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService
final CommitLog commitLog;
private final String name;
- private final long pollIntervalMillis;
+
+ /**
+ * The duration between syncs to disk.
+ */
+ private final long syncIntervalMillis;
+
+ /**
+ * The duration between updating the chained markers in the the commit log file. This value should be
+ * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
+ */
+ private final long markerIntervalMillis;
+
+ /**
+ * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+ * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+ * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+ */
+ private volatile boolean syncRequested;
private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
@@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService
*
* Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
*/
- AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+ {
+ this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+ }
+
+ /**
+ * CommitLogService provides a fsync service for Allocations, fulfilling either the
+ * Batch or Periodic contract.
+ *
+ * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+ */
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
{
this.commitLog = commitLog;
this.name = name;
- this.pollIntervalMillis = pollIntervalMillis;
+ this.syncIntervalMillis = syncIntervalMillis;
+
+ // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
+ // faster than the sync interval
+ if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
+ markerIntervalMillis = syncIntervalMillis;
+
+ // apply basic bounds checking on the marker interval
+ if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+ {
+ logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+ markerIntervalMillis, syncIntervalMillis);
+ markerIntervalMillis = syncIntervalMillis;
+ }
+
+ this.markerIntervalMillis = markerIntervalMillis;
}
// Separated into individual method to ensure relevant objects are constructed before this is started.
void start()
{
- if (pollIntervalMillis < 1)
- throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+ if (syncIntervalMillis < 1)
+ throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+ syncIntervalMillis * 1e-6));
Runnable runnable = new Runnable()
{
@@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService
run = !shutdown;
// sync and signal
- long syncStarted = System.currentTimeMillis();
- //This is a target for Byteman in CommitLogSegmentManagerTest
- commitLog.sync(shutdown);
- lastSyncedAt = syncStarted;
- syncComplete.signalAll();
-
+ long pollStarted = System.currentTimeMillis();
+ if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+ {
+ // in this branch, we want to flush the commit log to disk
+ commitLog.sync(shutdown, true);
+ syncRequested = false;
+ lastSyncedAt = pollStarted;
+ syncComplete.signalAll();
+ }
+ else
+ {
+ // in this branch, just update the commit log sync headers
+ commitLog.sync(false, false);
+ }
// sleep any time we have left before the next one is due
long now = System.currentTimeMillis();
- long sleep = syncStarted + pollIntervalMillis - now;
+ long sleep = pollStarted + markerIntervalMillis - now;
if (sleep < 0)
{
// if we have lagged noticeably, update our lag counter
@@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService
lagCount++;
}
syncCount++;
- totalSyncDuration += now - syncStarted;
+ totalSyncDuration += now - pollStarted;
if (firstLagAt > 0)
{
@@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService
// sleep for full poll-interval after an error, so we don't spam the log file
try
{
- haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+ haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
@@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService
*/
public WaitQueue.Signal requestExtraSync()
{
+ syncRequested = true;
WaitQueue.Signal signal = syncComplete.register();
haveWork.release(1);
return signal;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 40040ed..ff1b712 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync(boolean syncAllSegments)
+ public void sync(boolean syncAllSegments, boolean flush)
{
CommitLogSegment current = allocator.allocatingFrom();
for (CommitLogSegment segment : allocator.getActiveSegments())
{
if (!syncAllSegments && segment.id > current.id)
return;
- segment.sync();
+ segment.sync(flush);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 236a1b1..8834c8c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -94,6 +94,12 @@ public abstract class CommitLogSegment
// sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
private volatile int lastSyncedOffset;
+ /**
+ * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+ * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+ */
+ private volatile int lastMarkerOffset;
+
// The end position of the buffer. Initially set to its capacity and updated to point to the last written position
// as the segment is being closed.
// No need to be volatile as writes are protected by appendOrder barrier.
@@ -167,7 +173,8 @@ public abstract class CommitLogSegment
// write the header
CommitLogDescriptor.writeHeader(buffer, descriptor);
endOfBuffer = buffer.capacity();
- lastSyncedOffset = buffer.position();
+
+ lastSyncedOffset = lastMarkerOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
}
@@ -232,7 +239,7 @@ public abstract class CommitLogSegment
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
void discardUnusedTail()
{
- // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+ // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom()
// Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
// This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
// running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
@@ -271,13 +278,18 @@ public abstract class CommitLogSegment
}
/**
- * Forces a disk flush for this segment file.
+ * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+ *
+ * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
*/
- synchronized void sync()
+ synchronized void sync(boolean flush)
{
- boolean close = false;
+ assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+ lastMarkerOffset, lastSyncedOffset);
// check we have more work to do
- if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+ final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+ final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+ if (!(needToMarkData || hasDataToFlush))
return;
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
@@ -285,31 +297,48 @@ public abstract class CommitLogSegment
// succeeded in the previous sync.
assert buffer != null; // Only close once.
- int startMarker = lastSyncedOffset;
- // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
- // the point at which we can safely consider records to have been completely written to.
- int nextMarker = allocate(SYNC_MARKER_SIZE);
- if (nextMarker < 0)
+ boolean close = false;
+ int startMarker = lastMarkerOffset;
+ int nextMarker, sectionEnd;
+ if (needToMarkData)
{
- // Ensure no more of this CLS is writeable, and mark ourselves for closing.
- discardUnusedTail();
- close = true;
-
- // We use the buffer size as the synced position after a close instead of the end of the actual data
- // to make sure we only close the buffer once.
- // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
- nextMarker = buffer.capacity();
+ // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+ // the point at which we can safely consider records to have been completely written to.
+ nextMarker = allocate(SYNC_MARKER_SIZE);
+ if (nextMarker < 0)
+ {
+ // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+ discardUnusedTail();
+ close = true;
+
+ // We use the buffer size as the synced position after a close instead of the end of the actual data
+ // to make sure we only close the buffer once.
+ // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+ nextMarker = buffer.capacity();
+ }
+ // Wait for mutations to complete as well as endOfBuffer to have been written.
+ waitForModifications();
+ sectionEnd = close ? endOfBuffer : nextMarker;
+
+ // Possibly perform compression or encryption and update the chained markers
+ write(startMarker, sectionEnd);
+ lastMarkerOffset = sectionEnd;
+ }
+ else
+ {
+ // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+ // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+ nextMarker = lastMarkerOffset;
+ sectionEnd = nextMarker;
}
- // Wait for mutations to complete as well as endOfBuffer to have been written.
- waitForModifications();
- int sectionEnd = close ? endOfBuffer : nextMarker;
- // Perform compression, writing to file and flush.
- write(startMarker, sectionEnd);
+ if (flush || close)
+ {
+ flush(startMarker, sectionEnd);
+ lastSyncedOffset = lastMarkerOffset = nextMarker;
+ }
- // Signal the sync as complete.
- lastSyncedOffset = nextMarker;
if (close)
internalClose();
syncComplete.signalAll();
@@ -327,6 +356,8 @@ public abstract class CommitLogSegment
abstract void write(int lastSyncedOffset, int nextMarker);
+ abstract void flush(int startMarker, int nextMarker);
+
public boolean isStillAllocating()
{
return allocatePosition.get() < endOfBuffer;
@@ -404,7 +435,7 @@ public abstract class CommitLogSegment
synchronized void close()
{
discardUnusedTail();
- sync();
+ sync(true);
assert buffer == null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c00ce18..8e05112 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment
channel.write(compressedBuffer);
assert channel.position() - lastWrittenPos == compressedBuffer.limit();
lastWrittenPos = channel.position();
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+ try
+ {
SyncUtil.force(channel, true);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a16d91..8259f04 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment
// write previous sync marker to point to next sync marker
// we don't chain the crcs here to ensure this method is idempotent if it fails
writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
- try {
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+ try
+ {
SyncUtil.force((MappedByteBuffer) buffer);
}
catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..76419b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
public PeriodicCommitLogService(final CommitLog commitLog)
{
- super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+ super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
}
protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
new file mode 100644
index 0000000..e2b9f72
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+/**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+@RunWith(BMUnitRunner.class)
+public class CommitLogChainedMarkersTest
+{
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+ @Test
+ @BMRule(name = "force all calls to sync() to not flush to disk",
+ targetClass = "CommitLogSegment",
+ targetMethod = "sync(boolean)",
+ action = "$flush = false")
+ public void replayCommitLogWithoutFlushing() throws IOException
+ {
+ DatabaseDescriptor.setCommitLogSegmentSize(5);
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+ DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ byte[] entropy = new byte[1024];
+ new Random().nextBytes(entropy);
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ int samples = 10000;
+ for (int i = 0; i < samples; i++)
+ CommitLog.instance.add(m);
+
+ CommitLog.instance.sync(false, true);
+
+ Replayer replayer = new Replayer(cfs1.metadata);
+ File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
+ replayer.recover(commitLogDir.listFiles());
+ Assert.assertEquals(samples, replayer.count);
+ }
+
+ private static class Replayer extends CommitLogReplayer
+ {
+ private final CFMetaData cfm;
+ private int count;
+
+ Replayer(CFMetaData cfm)
+ {
+ super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
+ this.cfm = cfm;
+ }
+
+ @Override
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+ {
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ try
+ {
+ Mutation mutation = Mutation.serializer.deserialize(bufIn,
+ desc.getMessagingVersion(),
+ SerializationHelper.Flag.LOCAL);
+
+ if (cfm == null || mutation.get(cfm) != null)
+ count++;
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index b651098..a1999ef 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest
@BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
@BMRule(name = "Release Semaphore after sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
public void testCompressedCommitLogBackpressure() throws Throwable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9e9ee53..b8f68ed 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -339,7 +339,7 @@ public class CommitLogTest
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
@@ -652,7 +652,7 @@ public class CommitLogTest
DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
}
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
// If retries work subsequent flushes should clear up error and this should change to expect 0.
@@ -685,7 +685,7 @@ public class CommitLogTest
for (SSTableReader reader : cfs.getLiveSSTables())
reader.reloadSSTableMetadata();
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
// persisted all data in the commit log. Because we know there was an error, there must be something left to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..36973f2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
{
public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
{
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[5/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3a1a4fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3a1a4fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3a1a4fa
Branch: refs/heads/cassandra-3.11
Commit: c3a1a4fa8083b2c4c6a5454551979954a0a71339
Parents: d577918 05cb556
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:07:18 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:08:22 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 10 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../AbstractCommitLogSegmentManager.java | 8 +-
.../db/commitlog/AbstractCommitLogService.java | 84 ++++++++++++++---
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogSegment.java | 85 +++++++++++------
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 3 -
.../db/commitlog/FileDirectSegment.java | 14 +++
.../db/commitlog/MemoryMappedSegment.java | 4 +
.../db/commitlog/PeriodicCommitLogService.java | 2 +-
.../commitlog/CommitLogChainedMarkersTest.java | 98 ++++++++++++++++++++
.../CommitLogSegmentBackpressureTest.java | 4 +-
.../cassandra/db/commitlog/CommitLogTest.java | 10 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
17 files changed, 281 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e26aeb8,2683dc2..7c0af91
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
-3.0.16
+3.11.2
+ * Remove OpenJDK log warning (CASSANDRA-13916)
+ * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
+ * Cache disk boundaries (CASSANDRA-13215)
+ * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
+ * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
+ * Update jackson JSON jars (CASSANDRA-13949)
+ * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
+Merged from 3.0:
+ * More frequent commitlog chained markers (CASSANDRA-13987)
* Fix serialized size of DataLimits (CASSANDRA-14057)
* Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index a01203c,0796183..5fe752e
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -198,8 -190,9 +198,9 @@@ public class Confi
public String commitlog_directory;
public Integer commitlog_total_space_in_mb;
public CommitLogSync commitlog_sync;
- public Double commitlog_sync_batch_window_in_ms;
- public Integer commitlog_sync_period_in_ms;
- public Integer commitlog_marker_period_in_ms = 0;
+ public double commitlog_sync_batch_window_in_ms = Double.NaN;
+ public int commitlog_sync_period_in_ms;
++ public int commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7f3c9f8,0000000..2c324aa
mode 100755,000000..100755
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,552 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public abstract class AbstractCommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
+
+ /**
+ * Segment that is ready to be used. The management thread fills this and blocks until consumed.
+ *
+ * A single management thread produces this, and consumers are already synchronizing to make sure other work is
+ * performed atomically with consuming this. Volatile to make sure writes by the management thread become
+ * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
+ * synchronize on 'this'.
+ */
+ private volatile CommitLogSegment availableSegment = null;
+
+ private final WaitQueue segmentPrepared = new WaitQueue();
+
+ /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+ /**
+ * The segment we are currently allocating commit log records to.
+ *
+ * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
+ */
+ private volatile CommitLogSegment allocatingFrom = null;
+
+ final String storageDirectory;
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the manager thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ private Thread managerThread;
+ protected final CommitLog commitLog;
+ private volatile boolean shutdown;
+ private final BooleanSupplier managerThreadWaitCondition = () -> (availableSegment == null && !atSegmentBufferLimit()) || shutdown;
+ private final WaitQueue managerThreadWaitQueue = new WaitQueue();
+
+ private static final SimpleCachedBufferPool bufferPool =
+ new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
+
+ AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
+ {
+ this.commitLog = commitLog;
+ this.storageDirectory = storageDirectory;
+ }
+
+ void start()
+ {
+ // The run loop for the manager thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (!shutdown)
+ {
+ try
+ {
+ assert availableSegment == null;
+ logger.debug("No segments in reserve; creating a fresh one");
+ availableSegment = createSegment();
+ if (shutdown)
+ {
+ // If shutdown() started and finished during segment creation, we are now left with a
+ // segment that no one will consume. Discard it.
+ discardAvailableSegment();
+ return;
+ }
+
+ segmentPrepared.signalAll();
+ Thread.yield();
+
+ if (availableSegment == null && !atSegmentBufferLimit())
+ // Writing threads need another segment now.
+ continue;
+
+ // Writing threads are not waiting for new segments, we can spend time on other tasks.
+ // flush old Cfs if we're full
+ maybeFlushToReclaim();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // If we offered a segment, wait for it to be taken before reentering the loop.
+ // There could be a new segment in next not offered, but only on failure to discard it while
+ // shutting down-- nothing more can or needs to be done in that case.
+ }
+
+ WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue);
+ }
+ }
+ };
+
+ shutdown = false;
+ managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread.start();
+
+ // for simplicity, ensure the first segment is allocated before continuing
+ advanceAllocatingFrom(null);
+ }
+
+ private boolean atSegmentBufferLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ }
+
+ private void maybeFlushToReclaim()
+ {
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ long flushingSize = 0;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ flushingSize += segment.onDiskSize();
+ segmentsToRecycle.add(segment);
+ if (flushingSize + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
+ }
+ }
+
+
+ /**
+ * Allocate a segment within this CLSM. Should either succeed or throw.
+ */
+ public abstract Allocation allocate(Mutation mutation, int size);
+
+ /**
+ * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
+ * decide what to do with those segments on disk after they've been replayed.
+ */
+ abstract void handleReplayedSegment(final File file);
+
+ /**
+ * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
+ * to segment manager so it's performed on segment management thread.
+ */
+ abstract CommitLogSegment createSegment();
+
+ /**
+ * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
+ * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
+ * during testing resets.
+ *
+ * @param segment segment to be discarded
+ * @param delete whether or not the segment is safe to be deleted.
+ */
+ abstract void discard(CommitLogSegment segment, boolean delete);
+
+ /**
+ * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
+ *
+ * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
+ */
+ @DontInline
+ void advanceAllocatingFrom(CommitLogSegment old)
+ {
+ while (true)
+ {
+ synchronized (this)
+ {
+ // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
+ if (allocatingFrom != old)
+ return;
+
+ // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
+ if (availableSegment != null)
+ {
+ // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
+ // the critical section.
+ activeSegments.add(allocatingFrom = availableSegment);
+ availableSegment = null;
+ break;
+ }
+ }
+
+ awaitAvailableSegment(old);
+ }
+
+ // Signal the management thread to prepare a new segment.
+ wakeManager();
+
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ commitLog.archiver.maybeArchive(old);
+
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ old.discardUnusedTail();
+ }
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ commitLog.requestExtraSync();
+ }
+
+ void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
+ {
+ do
+ {
+ WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+ if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
+ prepared.awaitUninterruptibly();
+ else
+ prepared.cancel();
+ }
+ while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
+ }
+
+ /**
+ * Switch to a new segment, regardless of how much is left in the current one.
+ *
+ * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
+ */
+ void forceRecycleAll(Iterable<UUID> droppedCfs)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+ CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
+ advanceAllocatingFrom(last);
+
+ // wait for the commit log modifications
+ last.waitForModifications();
+
+ // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
+ // to complete
+ Keyspace.writeOrder.awaitNewBarrier();
+
+ // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+ Future<?> future = flushDataFrom(segmentsToRecycle, true);
+ try
+ {
+ future.get();
+
+ for (CommitLogSegment segment : activeSegments)
+ for (UUID cfId : droppedCfs)
+ segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+
+ // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+ // if the previous active segment was the only one to recycle (since an active segment isn't
+ // necessarily dirty, and we only call dCS after a flush).
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment.isUnused())
+ archiveAndDiscard(segment);
+ }
+
+ CommitLogSegment first;
+ if ((first = activeSegments.peek()) != null && first.id <= last.id)
+ logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
+ }
+ catch (Throwable t)
+ {
+ // for now just log the error
+ logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+ }
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be discarded.
+ *
+ * @param segment segment that is no longer in use
+ */
+ void archiveAndDiscard(final CommitLogSegment segment)
+ {
+ boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
+ if (!activeSegments.remove(segment))
+ return; // already discarded
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
+ discard(segment, archiveSuccess);
+ }
+
+ /**
+ * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
+ * @param addedSize
+ */
+ void addSize(long addedSize)
+ {
+ size.addAndGet(addedSize);
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long onDiskSize()
+ {
+ return size.get();
+ }
+
+ private long unusedCapacity()
+ {
+ long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+ long currentSize = size.get();
+ logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
+ return total - currentSize;
+ }
+
+ /**
+ * Force a flush on all CFs that are still dirty in @param segments.
+ *
+ * @return a Future that will finish when all the flushes are complete.
+ */
+ private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
+ {
+ if (segments.isEmpty())
+ return Futures.immediateFuture(null);
+ final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
+
+ // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+ final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+
+ for (CommitLogSegment segment : segments)
+ {
+ for (UUID dirtyCFId : segment.getDirtyCFIDs())
+ {
+ Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+ if (pair == null)
+ {
+ // even though we remove the schema entry before a final flush when dropping a CF,
+ // it's still possible for a writer to race and finish his append after the flush.
+ logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
+ segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+ }
+ else if (!flushes.containsKey(dirtyCFId))
+ {
+ String keyspace = pair.left;
+ final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+ // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
+ // no deadlock possibility since switchLock removal
+ flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
+ }
+ }
+ }
+
+ return Futures.allAsList(flushes.values());
+ }
+
+ /**
+ * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ * Only call this after the AbstractCommitLogService is shut down.
+ */
+ public void stopUnsafe(boolean deleteSegments)
+ {
+ logger.debug("CLSM closing and clearing existing commit log segments...");
+
+ shutdown();
+ try
+ {
+ awaitTermination();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
+
+ size.set(0L);
+
+ logger.trace("CLSM done with closing and clearing existing commit log segments.");
+ }
+
+ /**
+ * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
+ */
+ void awaitManagementTasksCompletion()
+ {
+ if (availableSegment == null && !atSegmentBufferLimit())
+ {
+ awaitAvailableSegment(allocatingFrom);
+ }
+ }
+
+ /**
+ * Explicitly for use only during resets in unit testing.
+ */
+ private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+ {
+ try
+ {
+ discard(segment, delete);
+ }
+ catch (AssertionError ignored)
+ {
+ // segment file does not exist
+ }
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ assert !shutdown;
+ shutdown = true;
+
+ // Release the management thread and delete prepared segment.
+ // Do not block as another thread may claim the segment (this can happen during unit test initialization).
+ discardAvailableSegment();
+ wakeManager();
+ }
+
+ private void discardAvailableSegment()
+ {
+ CommitLogSegment next = null;
+ synchronized (this)
+ {
+ next = availableSegment;
+ availableSegment = null;
+ }
+ if (next != null)
+ next.discard(true);
+ }
+
+ /**
+ * Returns when the management thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ managerThread.join();
+ managerThread = null;
+
+ for (CommitLogSegment segment : activeSegments)
+ segment.close();
+
+ bufferPool.shutdown();
+ }
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ @VisibleForTesting
+ public Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
+
+ /**
+ * @return the current CommitLogPosition of the active segment we're allocating from
+ */
+ CommitLogPosition getCurrentPosition()
+ {
+ return allocatingFrom.getCurrentCommitLogPosition();
+ }
+
+ /**
- * Forces a disk flush on the commit log files that need it. Blocking.
++ * Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk.
++ *
++ * @param flush Request that the sync operation flush the file to disk.
+ */
- public void sync() throws IOException
++ public void sync(boolean flush) throws IOException
+ {
+ CommitLogSegment current = allocatingFrom;
+ for (CommitLogSegment segment : getActiveSegments())
+ {
+ // Do not sync segments that became active after sync started.
+ if (segment.id > current.id)
+ return;
- segment.sync();
++ segment.sync(flush);
+ }
+ }
+
+ /**
+ * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
+ */
+ SimpleCachedBufferPool getBufferPool()
+ {
+ return bufferPool;
+ }
+
+ void wakeManager()
+ {
+ managerThreadWaitQueue.signalAll();
+ }
+
+ /**
+ * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
+ * a buffer to become available.
+ */
+ void notifyBufferFreed()
+ {
+ wakeManager();
+ }
+
+ /** Read-only access to current segment for subclasses. */
+ CommitLogSegment allocatingFrom()
+ {
+ return allocatingFrom;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 71100a3,8a03b2f..0410650
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,16 -17,9 +17,18 @@@
*/
package org.apache.cassandra.db.commitlog;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer.Context;
+
import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@@ -48,7 -47,25 +50,24 @@@ public abstract class AbstractCommitLog
final CommitLog commitLog;
private final String name;
- private final long pollIntervalNanos;
+
+ /**
+ * The duration between syncs to disk.
+ */
- private final long syncIntervalMillis;
++ private final long syncIntervalNanos;
+
+ /**
+ * The duration between updating the chained markers in the the commit log file. This value should be
- * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
++ * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}.
+ */
- private final long markerIntervalMillis;
++ private final long markerIntervalNanos;
+
+ /**
+ * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+ * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+ * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+ */
+ private volatile boolean syncRequested;
private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
@@@ -62,15 -90,30 +92,30 @@@
{
this.commitLog = commitLog;
this.name = name;
- this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS);
- this.syncIntervalMillis = syncIntervalMillis;
++ this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS);
+
- // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
++ // if we are not using periodic mode, or we using compression/encryption, we shouldn't update the chained markers
+ // faster than the sync interval
- if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
++ if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression() || commitLog.configuration.useEncryption())
+ markerIntervalMillis = syncIntervalMillis;
+
+ // apply basic bounds checking on the marker interval
+ if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+ {
+ logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+ markerIntervalMillis, syncIntervalMillis);
+ markerIntervalMillis = syncIntervalMillis;
+ }
+
- this.markerIntervalMillis = markerIntervalMillis;
++ this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS);
}
// Separated into individual method to ensure relevant objects are constructed before this is started.
void start()
{
- if (pollIntervalNanos < 1)
- if (syncIntervalMillis < 1)
++ if (syncIntervalNanos < 1)
throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
- pollIntervalNanos * 1e-6));
- syncIntervalMillis * 1e-6));
++ syncIntervalNanos * 1e-6));
Runnable runnable = new Runnable()
{
@@@ -82,25 -125,34 +127,33 @@@
int lagCount = 0;
int syncCount = 0;
- boolean run = true;
- while (run)
+ while (true)
{
+ // always run once after shutdown signalled
+ boolean shutdownRequested = shutdown;
+
try
{
- // always run once after shutdown signalled
- run = !shutdown;
-
// sync and signal
- long syncStarted = System.nanoTime();
- // This is a target for Byteman in CommitLogSegmentManagerTest
- commitLog.sync();
- lastSyncedAt = syncStarted;
- syncComplete.signalAll();
-
- long pollStarted = System.currentTimeMillis();
- if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
++ long pollStarted = System.nanoTime();
++ if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
+ {
+ // in this branch, we want to flush the commit log to disk
- commitLog.sync(shutdown, true);
++ commitLog.sync(true);
+ syncRequested = false;
+ lastSyncedAt = pollStarted;
+ syncComplete.signalAll();
+ }
+ else
+ {
+ // in this branch, just update the commit log sync headers
- commitLog.sync(false, false);
++ commitLog.sync(false);
+ }
// sleep any time we have left before the next one is due
- long now = System.currentTimeMillis();
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
+ long now = System.nanoTime();
- long wakeUpAt = syncStarted + pollIntervalNanos;
++ long wakeUpAt = pollStarted + markerIntervalNanos;
+ if (wakeUpAt < now)
{
// if we have lagged noticeably, update our lag counter
if (firstLagAt == 0)
@@@ -143,7 -200,14 +196,7 @@@
break;
// sleep for full poll-interval after an error, so we don't spam the log file
- LockSupport.parkNanos(pollIntervalNanos);
- try
- {
- haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
++ LockSupport.parkNanos(markerIntervalNanos);
}
}
}
@@@ -166,11 -229,14 +219,12 @@@
protected abstract void maybeWaitForSync(Allocation alloc);
/**
- * Sync immediately, but don't block for the sync to cmplete
+ * Request an additional sync cycle without blocking.
*/
- public void requestExtraSync()
- public WaitQueue.Signal requestExtraSync()
++ void requestExtraSync()
{
+ syncRequested = true;
- WaitQueue.Signal signal = syncComplete.register();
- haveWork.release(1);
- return signal;
+ LockSupport.unpark(thread);
}
public void shutdown()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 750fabc,ff1b712..da29258
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -221,9 -220,15 +221,9 @@@ public class CommitLog implements Commi
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync() throws IOException
- public void sync(boolean syncAllSegments, boolean flush)
++ public void sync(boolean flush) throws IOException
{
- segmentManager.sync();
- CommitLogSegment current = allocator.allocatingFrom();
- for (CommitLogSegment segment : allocator.getActiveSegments())
- {
- if (!syncAllSegments && segment.id > current.id)
- return;
- segment.sync(flush);
- }
++ segmentManager.sync(flush);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a618d0b,8834c8c..7c02892
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -169,26 -170,12 +175,27 @@@ public abstract class CommitLogSegmen
}
buffer = createBuffer(commitLog);
- // write the header
- CommitLogDescriptor.writeHeader(buffer, descriptor);
+ }
+
+ /**
+ * Deferred writing of the commit log header until subclasses have had a chance to initialize
+ */
+ void writeLogHeader()
+ {
+ CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
endOfBuffer = buffer.capacity();
- lastSyncedOffset = buffer.position();
+
+ lastSyncedOffset = lastMarkerOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
+ headerWritten = true;
+ }
+
+ /**
+ * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}.
+ */
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ return Collections.<String, String>emptyMap();
}
abstract ByteBuffer createBuffer(CommitLog commitLog);
@@@ -291,15 -278,18 +298,20 @@@
}
/**
- * Forces a disk flush for this segment file.
+ * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+ *
+ * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
*/
- synchronized void sync()
+ synchronized void sync(boolean flush)
{
+ if (!headerWritten)
+ throw new IllegalStateException("commit log header has not been written");
- boolean close = false;
+ assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+ lastMarkerOffset, lastSyncedOffset);
// check we have more work to do
- if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+ final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+ final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+ if (!(needToMarkData || hasDataToFlush))
return;
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 967db15,8e05112..d5e6113
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -17,27 -17,51 +17,26 @@@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.SyncUtil;
-/*
+/**
* Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
* section of the buffer and writes it to the destination channel.
+ *
+ * The format of the compressed commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
- * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
++ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a block of compressed data
*/
-public class CompressedSegment extends CommitLogSegment
+public class CompressedSegment extends FileDirectSegment
{
- private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
- protected ByteBuffer initialValue()
- {
- return ByteBuffer.allocate(0);
- }
- };
- static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
-
- /**
- * The number of buffers in use
- */
- private static AtomicInteger usedBuffers = new AtomicInteger(0);
-
-
- /**
- * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
- * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
- * more, depending on how soon the sync policy stops all writing threads.
- */
- static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
-
static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
final ICompressor compressor;
- final Runnable onClose;
-
- volatile long lastWrittenPos = 0;
/**
* Constructs a new segment file.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 87825ab,0000000..21b7c11
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,159 -1,0 +1,156 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.crypto.Cipher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.Hex;
- import org.apache.cassandra.utils.SyncUtil;
+
+import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
+
+/**
+ * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
+ * the encryption algorithms.
+ *
+ * The format of the encrypted commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a series of encrypted data blocks, each of which contains:
+ * --- the length of the encrypted block (cipher text)
+ * --- the length of the unencrypted data (compressed text)
+ * --- the encrypted block, which contains:
+ * ---- the length of the plain text (raw) data
+ * ---- block of compressed data
+ *
+ * Notes:
+ * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
+ * to the output buffer, and we need to ignore that padding when processing.
+ */
+public class EncryptedSegment extends FileDirectSegment
+{
+ private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
+
+ private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
+
+ private final EncryptionContext encryptionContext;
+ private final Cipher cipher;
+
+ public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
+ {
+ super(commitLog, manager);
+ this.encryptionContext = commitLog.configuration.getEncryptionContext();
+
+ try
+ {
+ cipher = encryptionContext.getEncryptor();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, logFile);
+ }
+ logger.debug("created a new encrypted commit log segment: {}", logFile);
+ // Keep reusable buffers on-heap regardless of compression preference so we avoid copy off/on repeatedly during decryption
+ manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP);
+ }
+
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ Map<String, String> map = encryptionContext.toHeaderParameters();
+ map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
+ return map;
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+ // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+ return manager.getBufferPool().createBuffer(BufferType.ON_HEAP);
+ }
+
+ void write(int startMarker, int nextMarker)
+ {
+ int contentStart = startMarker + SYNC_MARKER_SIZE;
+ final int length = nextMarker - contentStart;
+ // The length may be 0 when the segment is being closed.
+ assert length > 0 || length == 0 && !isStillAllocating();
+
+ final ICompressor compressor = encryptionContext.getCompressor();
+ final int blockSize = encryptionContext.getChunkLength();
+ try
+ {
+ ByteBuffer inputBuffer = buffer.duplicate();
+ inputBuffer.limit(contentStart + length).position(contentStart);
+ ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize());
+
+ // save space for the sync marker at the beginning of this section
+ final long syncMarkerPosition = lastWrittenPos;
+ channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
+
+ // loop over the segment data in encryption buffer sized chunks
+ while (contentStart < nextMarker)
+ {
+ int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
+ ByteBuffer slice = inputBuffer.duplicate();
+ slice.limit(contentStart + nextBlockSize).position(contentStart);
+
+ buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
+
+ // reuse the same buffer for the input and output of the encryption operation
+ buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
+
+ contentStart += nextBlockSize;
+ manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+ }
+
+ lastWrittenPos = channel.position();
+
+ // rewind to the beginning of the section and write out the sync marker
+ buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+ writeSyncMarker(id, buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
+ buffer.putInt(SYNC_MARKER_SIZE, length);
+ buffer.rewind();
+ manager.addSize(buffer.limit());
+
+ channel.position(syncMarkerPosition);
+ channel.write(buffer);
-
- SyncUtil.force(channel, true);
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ public long onDiskSize()
+ {
+ return lastWrittenPos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 55084be,0000000..d5431f8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@@ -1,66 -1,0 +1,80 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.FSWriteError;
++import org.apache.cassandra.utils.SyncUtil;
+
+/**
+ * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
+ * such as compression or encryption, before writing out to disk.
+ */
+public abstract class FileDirectSegment extends CommitLogSegment
+{
+ volatile long lastWrittenPos = 0;
+
+ FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
+ {
+ super(commitLog, manager);
+ }
+
+ @Override
+ void writeLogHeader()
+ {
+ super.writeLogHeader();
+ try
+ {
+ channel.write((ByteBuffer) buffer.duplicate().flip());
+ manager.addSize(lastWrittenPos = buffer.position());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ @Override
+ protected void internalClose()
+ {
+ try
+ {
+ manager.getBufferPool().releaseBuffer(buffer);
+ super.internalClose();
+ }
+ finally
+ {
+ manager.notifyBufferFreed();
+ }
+ }
++
++ @Override
++ protected void flush(int startMarker, int nextMarker)
++ {
++ try
++ {
++ SyncUtil.force(channel, true);
++ }
++ catch (Exception e)
++ {
++ throw new FSWriteError(e, getPath());
++ }
++ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,e2b9f72..663e7af
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,128 +1,98 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.commitlog;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
++import java.util.ArrayList;
+ import java.util.Random;
+
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+
+ import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.RebufferingInputStream;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+ /**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+ @Test
+ @BMRule(name = "force all calls to sync() to not flush to disk",
+ targetClass = "CommitLogSegment",
+ targetMethod = "sync(boolean)",
+ action = "$flush = false")
+ public void replayCommitLogWithoutFlushing() throws IOException
+ {
++ // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
++ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogSegmentSize(5);
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+ DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ byte[] entropy = new byte[1024];
+ new Random().nextBytes(entropy);
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ int samples = 10000;
+ for (int i = 0; i < samples; i++)
+ CommitLog.instance.add(m);
+
- CommitLog.instance.sync(false, true);
++ CommitLog.instance.sync(false);
+
- Replayer replayer = new Replayer(cfs1.metadata);
- File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
- replayer.recover(commitLogDir.listFiles());
- Assert.assertEquals(samples, replayer.count);
- }
-
- private static class Replayer extends CommitLogReplayer
- {
- private final CFMetaData cfm;
- private int count;
-
- Replayer(CFMetaData cfm)
- {
- super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
- this.cfm = cfm;
- }
-
- @Override
- void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
- {
- RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
- try
- {
- Mutation mutation = Mutation.serializer.deserialize(bufIn,
- desc.getMessagingVersion(),
- SerializationHelper.Flag.LOCAL);
++ ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
++ CommitLogReader reader = new CommitLogReader();
++ CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++ for (File f : toCheck)
++ reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+
- if (cfm == null || mutation.get(cfm) != null)
- count++;
- }
- catch (IOException e)
- {
- // Test fails.
- throw new AssertionError(e);
- }
- }
++ Assert.assertEquals(samples, testHandler.seenMutationCount());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index 3956de5,a1999ef..46a3fb0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@@ -67,12 -66,12 +67,12 @@@ public class CommitLogSegmentBackpressu
@BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
- targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
@BMRule(name = "Release Semaphore after sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
- targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++ targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
public void testCompressedCommitLogBackpressure() throws Throwable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 267813e,b8f68ed..215ad6c
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -360,10 -339,10 +360,10 @@@ public class CommitLogTes
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
- CommitLog.instance.sync();
- CommitLog.instance.sync(true, true);
- CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
++ CommitLog.instance.sync(true);
+ CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
- assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+ assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
// Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@@ -678,114 -612,6 +678,114 @@@
}
@Test
+ public void replaySimple() throws IOException
+ {
+ int cellCount = 0;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ CommitLog.instance.add(rm1);
+
+ final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ CommitLog.instance.add(rm2);
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
+ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.replayFiles(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ @Test
+ public void replayWithDiscard() throws IOException
+ {
+ int cellCount = 0;
+ int max = 1024;
+ int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
+ CommitLogPosition commitLogPosition = null;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ for (int i = 0; i < max; i++)
+ {
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ CommitLogPosition position = CommitLog.instance.add(rm1);
+
+ if (i == discardPosition)
+ commitLogPosition = position;
+ if (i > discardPosition)
+ {
+ cellCount += 1;
+ }
+ }
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
+ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.replayFiles(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ class SimpleCountingReplayer extends CommitLogReplayer
+ {
+ private final CommitLogPosition filterPosition;
+ private final CFMetaData metadata;
+ int cells;
+ int skipped;
+
+ SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm)
+ {
+ super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
+ this.filterPosition = filterPosition;
+ this.metadata = cfm;
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
+ {
+ // Filter out system writes that could flake the test.
+ if (!KEYSPACE1.equals(m.getKeyspaceName()))
+ return;
+
+ if (entryLocation <= filterPosition.position)
+ {
+ // Skip over this mutation.
+ skipped++;
+ return;
+ }
+ for (PartitionUpdate partitionUpdate : m.getPartitionUpdates())
+ {
+ // Only process mutations for the CF's we're testing against, since we can't deterministically predict
+ // whether or not system keyspaces will be mutated during a test.
+ if (partitionUpdate.metadata().cfName.equals(metadata.cfName))
+ {
+ for (Row row : partitionUpdate)
+ cells += Iterables.size(row.cells());
+ }
+ }
+ }
+ }
+
public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException
{
CommitLog.instance.resetUnsafe(true);
@@@ -826,7 -652,7 +826,7 @@@
DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
}
- CommitLog.instance.sync();
- CommitLog.instance.sync(true, true);
++ CommitLog.instance.sync(true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
// If retries work subsequent flushes should clear up error and this should change to expect 0.
@@@ -859,7 -685,7 +859,7 @@@
for (SSTableReader reader : cfs.getLiveSSTables())
reader.reloadSSTableMetadata();
- CommitLog.instance.sync();
- CommitLog.instance.sync(true, true);
++ CommitLog.instance.sync(true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
// persisted all data in the commit log. Because we know there was an error, there must be something left to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 7f43378,36973f2..9a22b04
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@@ -35,44 -36,44 +35,44 @@@ import org.apache.cassandra.io.util.Reb
*/
public class CommitLogTestReplayer extends CommitLogReplayer
{
- public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
- {
- CommitLog.instance.sync(true, true);
-
- CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
- File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
- replayer.recover(commitLogDir.listFiles());
- }
-
- final private Predicate<Mutation> processor;
+ private final Predicate<Mutation> processor;
- public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor)
+ public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException
{
- this(log, ReplayPosition.NONE, processor);
+ super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+
+ this.processor = processor;
+ commitLogReader = new CommitLogTestReader();
}
- public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predicate<Mutation> processor)
+ public void examineCommitLog() throws IOException
{
- super(log, discardedPos, null, ReplayFilter.create());
- this.processor = processor;
+ replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
}
- @Override
- void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+ private class CommitLogTestReader extends CommitLogReader
{
- RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
- Mutation mutation;
- try
- {
- mutation = Mutation.serializer.deserialize(bufIn,
- desc.getMessagingVersion(),
- SerializationHelper.Flag.LOCAL);
- Assert.assertTrue(processor.apply(mutation));
- }
- catch (IOException e)
+ @Override
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
{
- // Test fails.
- throw new AssertionError(e);
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ Mutation mutation;
+ try
+ {
+ mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ Assert.assertTrue(processor.apply(mutation));
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/6] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by ja...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c3a1a4fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c3a1a4fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c3a1a4fa
Branch: refs/heads/trunk
Commit: c3a1a4fa8083b2c4c6a5454551979954a0a71339
Parents: d577918 05cb556
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:07:18 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:08:22 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 10 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../AbstractCommitLogSegmentManager.java | 8 +-
.../db/commitlog/AbstractCommitLogService.java | 84 ++++++++++++++---
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogSegment.java | 85 +++++++++++------
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 3 -
.../db/commitlog/FileDirectSegment.java | 14 +++
.../db/commitlog/MemoryMappedSegment.java | 4 +
.../db/commitlog/PeriodicCommitLogService.java | 2 +-
.../commitlog/CommitLogChainedMarkersTest.java | 98 ++++++++++++++++++++
.../CommitLogSegmentBackpressureTest.java | 4 +-
.../cassandra/db/commitlog/CommitLogTest.java | 10 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
17 files changed, 281 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e26aeb8,2683dc2..7c0af91
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,5 +1,13 @@@
-3.0.16
+3.11.2
+ * Remove OpenJDK log warning (CASSANDRA-13916)
+ * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
+ * Cache disk boundaries (CASSANDRA-13215)
+ * Add asm jar to build.xml for maven builds (CASSANDRA-11193)
+ * Round buffer size to powers of 2 for the chunk cache (CASSANDRA-13897)
+ * Update jackson JSON jars (CASSANDRA-13949)
+ * Avoid locks when checking LCS fanout and if we should defrag (CASSANDRA-13930)
+Merged from 3.0:
+ * More frequent commitlog chained markers (CASSANDRA-13987)
* Fix serialized size of DataLimits (CASSANDRA-14057)
* Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index a01203c,0796183..5fe752e
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -198,8 -190,9 +198,9 @@@ public class Confi
public String commitlog_directory;
public Integer commitlog_total_space_in_mb;
public CommitLogSync commitlog_sync;
- public Double commitlog_sync_batch_window_in_ms;
- public Integer commitlog_sync_period_in_ms;
- public Integer commitlog_marker_period_in_ms = 0;
+ public double commitlog_sync_batch_window_in_ms = Double.NaN;
+ public int commitlog_sync_period_in_ms;
++ public int commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 7f3c9f8,0000000..2c324aa
mode 100755,000000..100755
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,552 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public abstract class AbstractCommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
+
+ /**
+ * Segment that is ready to be used. The management thread fills this and blocks until consumed.
+ *
+ * A single management thread produces this, and consumers are already synchronizing to make sure other work is
+ * performed atomically with consuming this. Volatile to make sure writes by the management thread become
+ * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
+ * synchronize on 'this'.
+ */
+ private volatile CommitLogSegment availableSegment = null;
+
+ private final WaitQueue segmentPrepared = new WaitQueue();
+
+ /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+ /**
+ * The segment we are currently allocating commit log records to.
+ *
+ * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
+ */
+ private volatile CommitLogSegment allocatingFrom = null;
+
+ final String storageDirectory;
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the manager thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ private Thread managerThread;
+ protected final CommitLog commitLog;
+ private volatile boolean shutdown;
+ private final BooleanSupplier managerThreadWaitCondition = () -> (availableSegment == null && !atSegmentBufferLimit()) || shutdown;
+ private final WaitQueue managerThreadWaitQueue = new WaitQueue();
+
+ private static final SimpleCachedBufferPool bufferPool =
+ new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
+
+ AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
+ {
+ this.commitLog = commitLog;
+ this.storageDirectory = storageDirectory;
+ }
+
+ void start()
+ {
+ // The run loop for the manager thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (!shutdown)
+ {
+ try
+ {
+ assert availableSegment == null;
+ logger.debug("No segments in reserve; creating a fresh one");
+ availableSegment = createSegment();
+ if (shutdown)
+ {
+ // If shutdown() started and finished during segment creation, we are now left with a
+ // segment that no one will consume. Discard it.
+ discardAvailableSegment();
+ return;
+ }
+
+ segmentPrepared.signalAll();
+ Thread.yield();
+
+ if (availableSegment == null && !atSegmentBufferLimit())
+ // Writing threads need another segment now.
+ continue;
+
+ // Writing threads are not waiting for new segments, we can spend time on other tasks.
+ // flush old Cfs if we're full
+ maybeFlushToReclaim();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // If we offered a segment, wait for it to be taken before reentering the loop.
+ // There could be a new segment in next not offered, but only on failure to discard it while
+ // shutting down-- nothing more can or needs to be done in that case.
+ }
+
+ WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue);
+ }
+ }
+ };
+
+ shutdown = false;
+ managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread.start();
+
+ // for simplicity, ensure the first segment is allocated before continuing
+ advanceAllocatingFrom(null);
+ }
+
+ private boolean atSegmentBufferLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ }
+
+ private void maybeFlushToReclaim()
+ {
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ long flushingSize = 0;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ flushingSize += segment.onDiskSize();
+ segmentsToRecycle.add(segment);
+ if (flushingSize + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
+ }
+ }
+
+
+ /**
+ * Allocate a segment within this CLSM. Should either succeed or throw.
+ */
+ public abstract Allocation allocate(Mutation mutation, int size);
+
+ /**
+ * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
+ * decide what to do with those segments on disk after they've been replayed.
+ */
+ abstract void handleReplayedSegment(final File file);
+
+ /**
+ * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
+ * to segment manager so it's performed on segment management thread.
+ */
+ abstract CommitLogSegment createSegment();
+
+ /**
+ * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
+ * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
+ * during testing resets.
+ *
+ * @param segment segment to be discarded
+ * @param delete whether or not the segment is safe to be deleted.
+ */
+ abstract void discard(CommitLogSegment segment, boolean delete);
+
+ /**
+ * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
+ *
+ * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
+ */
+ @DontInline
+ void advanceAllocatingFrom(CommitLogSegment old)
+ {
+ while (true)
+ {
+ synchronized (this)
+ {
+ // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
+ if (allocatingFrom != old)
+ return;
+
+ // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
+ if (availableSegment != null)
+ {
+ // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
+ // the critical section.
+ activeSegments.add(allocatingFrom = availableSegment);
+ availableSegment = null;
+ break;
+ }
+ }
+
+ awaitAvailableSegment(old);
+ }
+
+ // Signal the management thread to prepare a new segment.
+ wakeManager();
+
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ commitLog.archiver.maybeArchive(old);
+
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ old.discardUnusedTail();
+ }
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ commitLog.requestExtraSync();
+ }
+
+ void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
+ {
+ do
+ {
+ WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+ if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
+ prepared.awaitUninterruptibly();
+ else
+ prepared.cancel();
+ }
+ while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
+ }
+
+ /**
+ * Switch to a new segment, regardless of how much is left in the current one.
+ *
+ * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
+ */
+ void forceRecycleAll(Iterable<UUID> droppedCfs)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+ CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
+ advanceAllocatingFrom(last);
+
+ // wait for the commit log modifications
+ last.waitForModifications();
+
+ // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
+ // to complete
+ Keyspace.writeOrder.awaitNewBarrier();
+
+ // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+ Future<?> future = flushDataFrom(segmentsToRecycle, true);
+ try
+ {
+ future.get();
+
+ for (CommitLogSegment segment : activeSegments)
+ for (UUID cfId : droppedCfs)
+ segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+
+ // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+ // if the previous active segment was the only one to recycle (since an active segment isn't
+ // necessarily dirty, and we only call dCS after a flush).
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment.isUnused())
+ archiveAndDiscard(segment);
+ }
+
+ CommitLogSegment first;
+ if ((first = activeSegments.peek()) != null && first.id <= last.id)
+ logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
+ }
+ catch (Throwable t)
+ {
+ // for now just log the error
+ logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+ }
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be discarded.
+ *
+ * @param segment segment that is no longer in use
+ */
+ void archiveAndDiscard(final CommitLogSegment segment)
+ {
+ boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
+ if (!activeSegments.remove(segment))
+ return; // already discarded
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
+ discard(segment, archiveSuccess);
+ }
+
+ /**
+ * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
+ * @param addedSize
+ */
+ void addSize(long addedSize)
+ {
+ size.addAndGet(addedSize);
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long onDiskSize()
+ {
+ return size.get();
+ }
+
+ private long unusedCapacity()
+ {
+ long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+ long currentSize = size.get();
+ logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
+ return total - currentSize;
+ }
+
+ /**
+ * Force a flush on all CFs that are still dirty in @param segments.
+ *
+ * @return a Future that will finish when all the flushes are complete.
+ */
+ private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
+ {
+ if (segments.isEmpty())
+ return Futures.immediateFuture(null);
+ final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
+
+ // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+ final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+
+ for (CommitLogSegment segment : segments)
+ {
+ for (UUID dirtyCFId : segment.getDirtyCFIDs())
+ {
+ Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+ if (pair == null)
+ {
+ // even though we remove the schema entry before a final flush when dropping a CF,
+ // it's still possible for a writer to race and finish his append after the flush.
+ logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
+ segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+ }
+ else if (!flushes.containsKey(dirtyCFId))
+ {
+ String keyspace = pair.left;
+ final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+ // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
+ // no deadlock possibility since switchLock removal
+ flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
+ }
+ }
+ }
+
+ return Futures.allAsList(flushes.values());
+ }
+
+ /**
+ * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ * Only call this after the AbstractCommitLogService is shut down.
+ */
+ public void stopUnsafe(boolean deleteSegments)
+ {
+ logger.debug("CLSM closing and clearing existing commit log segments...");
+
+ shutdown();
+ try
+ {
+ awaitTermination();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
+
+ size.set(0L);
+
+ logger.trace("CLSM done with closing and clearing existing commit log segments.");
+ }
+
+ /**
+ * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
+ */
+ void awaitManagementTasksCompletion()
+ {
+ if (availableSegment == null && !atSegmentBufferLimit())
+ {
+ awaitAvailableSegment(allocatingFrom);
+ }
+ }
+
+ /**
+ * Explicitly for use only during resets in unit testing.
+ */
+ private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+ {
+ try
+ {
+ discard(segment, delete);
+ }
+ catch (AssertionError ignored)
+ {
+ // segment file does not exist
+ }
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ assert !shutdown;
+ shutdown = true;
+
+ // Release the management thread and delete prepared segment.
+ // Do not block as another thread may claim the segment (this can happen during unit test initialization).
+ discardAvailableSegment();
+ wakeManager();
+ }
+
+ private void discardAvailableSegment()
+ {
+ CommitLogSegment next = null;
+ synchronized (this)
+ {
+ next = availableSegment;
+ availableSegment = null;
+ }
+ if (next != null)
+ next.discard(true);
+ }
+
+ /**
+ * Returns when the management thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ managerThread.join();
+ managerThread = null;
+
+ for (CommitLogSegment segment : activeSegments)
+ segment.close();
+
+ bufferPool.shutdown();
+ }
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ @VisibleForTesting
+ public Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
+
+ /**
+ * @return the current CommitLogPosition of the active segment we're allocating from
+ */
+ CommitLogPosition getCurrentPosition()
+ {
+ return allocatingFrom.getCurrentCommitLogPosition();
+ }
+
+ /**
- * Forces a disk flush on the commit log files that need it. Blocking.
++ * Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk.
++ *
++ * @param flush Request that the sync operation flush the file to disk.
+ */
- public void sync() throws IOException
++ public void sync(boolean flush) throws IOException
+ {
+ CommitLogSegment current = allocatingFrom;
+ for (CommitLogSegment segment : getActiveSegments())
+ {
+ // Do not sync segments that became active after sync started.
+ if (segment.id > current.id)
+ return;
- segment.sync();
++ segment.sync(flush);
+ }
+ }
+
+ /**
+ * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
+ */
+ SimpleCachedBufferPool getBufferPool()
+ {
+ return bufferPool;
+ }
+
+ void wakeManager()
+ {
+ managerThreadWaitQueue.signalAll();
+ }
+
+ /**
+ * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
+ * a buffer to become available.
+ */
+ void notifyBufferFreed()
+ {
+ wakeManager();
+ }
+
+ /** Read-only access to current segment for subclasses. */
+ CommitLogSegment allocatingFrom()
+ {
+ return allocatingFrom;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 71100a3,8a03b2f..0410650
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,16 -17,9 +17,18 @@@
*/
package org.apache.cassandra.db.commitlog;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer.Context;
+
import org.apache.cassandra.concurrent.NamedThreadFactory;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@@ -48,7 -47,25 +50,24 @@@ public abstract class AbstractCommitLog
final CommitLog commitLog;
private final String name;
- private final long pollIntervalNanos;
+
+ /**
+ * The duration between syncs to disk.
+ */
- private final long syncIntervalMillis;
++ private final long syncIntervalNanos;
+
+ /**
+ * The duration between updating the chained markers in the the commit log file. This value should be
- * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
++ * 0 < {@link #markerIntervalNanos} <= {@link #syncIntervalNanos}.
+ */
- private final long markerIntervalMillis;
++ private final long markerIntervalNanos;
+
+ /**
+ * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+ * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+ * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+ */
+ private volatile boolean syncRequested;
private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
@@@ -62,15 -90,30 +92,30 @@@
{
this.commitLog = commitLog;
this.name = name;
- this.pollIntervalNanos = TimeUnit.NANOSECONDS.convert(pollIntervalMillis, TimeUnit.MILLISECONDS);
- this.syncIntervalMillis = syncIntervalMillis;
++ this.syncIntervalNanos = TimeUnit.NANOSECONDS.convert(syncIntervalMillis, TimeUnit.MILLISECONDS);
+
- // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
++ // if we are not using periodic mode, or we using compression/encryption, we shouldn't update the chained markers
+ // faster than the sync interval
- if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
++ if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression() || commitLog.configuration.useEncryption())
+ markerIntervalMillis = syncIntervalMillis;
+
+ // apply basic bounds checking on the marker interval
+ if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+ {
+ logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+ markerIntervalMillis, syncIntervalMillis);
+ markerIntervalMillis = syncIntervalMillis;
+ }
+
- this.markerIntervalMillis = markerIntervalMillis;
++ this.markerIntervalNanos = TimeUnit.NANOSECONDS.convert(markerIntervalMillis, TimeUnit.MILLISECONDS);
}
// Separated into individual method to ensure relevant objects are constructed before this is started.
void start()
{
- if (pollIntervalNanos < 1)
- if (syncIntervalMillis < 1)
++ if (syncIntervalNanos < 1)
throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
- pollIntervalNanos * 1e-6));
- syncIntervalMillis * 1e-6));
++ syncIntervalNanos * 1e-6));
Runnable runnable = new Runnable()
{
@@@ -82,25 -125,34 +127,33 @@@
int lagCount = 0;
int syncCount = 0;
- boolean run = true;
- while (run)
+ while (true)
{
+ // always run once after shutdown signalled
+ boolean shutdownRequested = shutdown;
+
try
{
- // always run once after shutdown signalled
- run = !shutdown;
-
// sync and signal
- long syncStarted = System.nanoTime();
- // This is a target for Byteman in CommitLogSegmentManagerTest
- commitLog.sync();
- lastSyncedAt = syncStarted;
- syncComplete.signalAll();
-
- long pollStarted = System.currentTimeMillis();
- if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
++ long pollStarted = System.nanoTime();
++ if (lastSyncedAt + syncIntervalNanos <= pollStarted || shutdownRequested || syncRequested)
+ {
+ // in this branch, we want to flush the commit log to disk
- commitLog.sync(shutdown, true);
++ commitLog.sync(true);
+ syncRequested = false;
+ lastSyncedAt = pollStarted;
+ syncComplete.signalAll();
+ }
+ else
+ {
+ // in this branch, just update the commit log sync headers
- commitLog.sync(false, false);
++ commitLog.sync(false);
+ }
// sleep any time we have left before the next one is due
- long now = System.currentTimeMillis();
- long sleep = pollStarted + markerIntervalMillis - now;
- if (sleep < 0)
+ long now = System.nanoTime();
- long wakeUpAt = syncStarted + pollIntervalNanos;
++ long wakeUpAt = pollStarted + markerIntervalNanos;
+ if (wakeUpAt < now)
{
// if we have lagged noticeably, update our lag counter
if (firstLagAt == 0)
@@@ -143,7 -200,14 +196,7 @@@
break;
// sleep for full poll-interval after an error, so we don't spam the log file
- LockSupport.parkNanos(pollIntervalNanos);
- try
- {
- haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
++ LockSupport.parkNanos(markerIntervalNanos);
}
}
}
@@@ -166,11 -229,14 +219,12 @@@
protected abstract void maybeWaitForSync(Allocation alloc);
/**
- * Sync immediately, but don't block for the sync to cmplete
+ * Request an additional sync cycle without blocking.
*/
- public void requestExtraSync()
- public WaitQueue.Signal requestExtraSync()
++ void requestExtraSync()
{
+ syncRequested = true;
- WaitQueue.Signal signal = syncComplete.register();
- haveWork.release(1);
- return signal;
+ LockSupport.unpark(thread);
}
public void shutdown()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 750fabc,ff1b712..da29258
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -221,9 -220,15 +221,9 @@@ public class CommitLog implements Commi
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync() throws IOException
- public void sync(boolean syncAllSegments, boolean flush)
++ public void sync(boolean flush) throws IOException
{
- segmentManager.sync();
- CommitLogSegment current = allocator.allocatingFrom();
- for (CommitLogSegment segment : allocator.getActiveSegments())
- {
- if (!syncAllSegments && segment.id > current.id)
- return;
- segment.sync(flush);
- }
++ segmentManager.sync(flush);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index a618d0b,8834c8c..7c02892
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -169,26 -170,12 +175,27 @@@ public abstract class CommitLogSegmen
}
buffer = createBuffer(commitLog);
- // write the header
- CommitLogDescriptor.writeHeader(buffer, descriptor);
+ }
+
+ /**
+ * Deferred writing of the commit log header until subclasses have had a chance to initialize
+ */
+ void writeLogHeader()
+ {
+ CommitLogDescriptor.writeHeader(buffer, descriptor, additionalHeaderParameters());
endOfBuffer = buffer.capacity();
- lastSyncedOffset = buffer.position();
+
+ lastSyncedOffset = lastMarkerOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
+ headerWritten = true;
+ }
+
+ /**
+ * Provide any additional header data that should be stored in the {@link CommitLogDescriptor}.
+ */
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ return Collections.<String, String>emptyMap();
}
abstract ByteBuffer createBuffer(CommitLog commitLog);
@@@ -291,15 -278,18 +298,20 @@@
}
/**
- * Forces a disk flush for this segment file.
+ * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+ *
+ * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
*/
- synchronized void sync()
+ synchronized void sync(boolean flush)
{
+ if (!headerWritten)
+ throw new IllegalStateException("commit log header has not been written");
- boolean close = false;
+ assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+ lastMarkerOffset, lastSyncedOffset);
// check we have more work to do
- if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+ final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+ final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+ if (!(needToMarkData || hasDataToFlush))
return;
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 967db15,8e05112..d5e6113
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -17,27 -17,51 +17,26 @@@
*/
package org.apache.cassandra.db.commitlog;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.SyncUtil;
-/*
+/**
* Compressed commit log segment. Provides an in-memory buffer for the mutation threads. On sync compresses the written
* section of the buffer and writes it to the destination channel.
+ *
+ * The format of the compressed commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
- * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
++ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a block of compressed data
*/
-public class CompressedSegment extends CommitLogSegment
+public class CompressedSegment extends FileDirectSegment
{
- private static final ThreadLocal<ByteBuffer> compressedBufferHolder = new ThreadLocal<ByteBuffer>() {
- protected ByteBuffer initialValue()
- {
- return ByteBuffer.allocate(0);
- }
- };
- static Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
-
- /**
- * The number of buffers in use
- */
- private static AtomicInteger usedBuffers = new AtomicInteger(0);
-
-
- /**
- * Maximum number of buffers in the compression pool. The default value is 3, it should not be set lower than that
- * (one segment in compression, one written to, one in reserve); delays in compression may cause the log to use
- * more, depending on how soon the sync policy stops all writing threads.
- */
- static final int MAX_BUFFERPOOL_SIZE = DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool();
-
static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
final ICompressor compressor;
- final Runnable onClose;
-
- volatile long lastWrittenPos = 0;
/**
* Constructs a new segment file.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 87825ab,0000000..21b7c11
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,159 -1,0 +1,156 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.crypto.Cipher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.Hex;
- import org.apache.cassandra.utils.SyncUtil;
+
+import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
+
+/**
+ * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
+ * the encryption algorithms.
+ *
+ * The format of the encrypted commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a series of encrypted data blocks, each of which contains:
+ * --- the length of the encrypted block (cipher text)
+ * --- the length of the unencrypted data (compressed text)
+ * --- the encrypted block, which contains:
+ * ---- the length of the plain text (raw) data
+ * ---- block of compressed data
+ *
+ * Notes:
+ * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
+ * to the output buffer, and we need to ignore that padding when processing.
+ */
+public class EncryptedSegment extends FileDirectSegment
+{
+ private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
+
+ private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
+
+ private final EncryptionContext encryptionContext;
+ private final Cipher cipher;
+
+ public EncryptedSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
+ {
+ super(commitLog, manager);
+ this.encryptionContext = commitLog.configuration.getEncryptionContext();
+
+ try
+ {
+ cipher = encryptionContext.getEncryptor();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, logFile);
+ }
+ logger.debug("created a new encrypted commit log segment: {}", logFile);
+ // Keep reusable buffers on-heap regardless of compression preference so we avoid copy off/on repeatedly during decryption
+ manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP);
+ }
+
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ Map<String, String> map = encryptionContext.toHeaderParameters();
+ map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
+ return map;
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ // Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+ // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+ return manager.getBufferPool().createBuffer(BufferType.ON_HEAP);
+ }
+
+ void write(int startMarker, int nextMarker)
+ {
+ int contentStart = startMarker + SYNC_MARKER_SIZE;
+ final int length = nextMarker - contentStart;
+ // The length may be 0 when the segment is being closed.
+ assert length > 0 || length == 0 && !isStillAllocating();
+
+ final ICompressor compressor = encryptionContext.getCompressor();
+ final int blockSize = encryptionContext.getChunkLength();
+ try
+ {
+ ByteBuffer inputBuffer = buffer.duplicate();
+ inputBuffer.limit(contentStart + length).position(contentStart);
+ ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize());
+
+ // save space for the sync marker at the beginning of this section
+ final long syncMarkerPosition = lastWrittenPos;
+ channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
+
+ // loop over the segment data in encryption buffer sized chunks
+ while (contentStart < nextMarker)
+ {
+ int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
+ ByteBuffer slice = inputBuffer.duplicate();
+ slice.limit(contentStart + nextBlockSize).position(contentStart);
+
+ buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
+
+ // reuse the same buffer for the input and output of the encryption operation
+ buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
+
+ contentStart += nextBlockSize;
+ manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+ }
+
+ lastWrittenPos = channel.position();
+
+ // rewind to the beginning of the section and write out the sync marker
+ buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+ writeSyncMarker(id, buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
+ buffer.putInt(SYNC_MARKER_SIZE, length);
+ buffer.rewind();
+ manager.addSize(buffer.limit());
+
+ channel.position(syncMarkerPosition);
+ channel.write(buffer);
-
- SyncUtil.force(channel, true);
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ public long onDiskSize()
+ {
+ return lastWrittenPos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index 55084be,0000000..d5431f8
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@@ -1,66 -1,0 +1,80 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.io.FSWriteError;
++import org.apache.cassandra.utils.SyncUtil;
+
+/**
+ * Writes to the backing commit log file only on sync, allowing transformations of the mutations,
+ * such as compression or encryption, before writing out to disk.
+ */
+public abstract class FileDirectSegment extends CommitLogSegment
+{
+ volatile long lastWrittenPos = 0;
+
+ FileDirectSegment(CommitLog commitLog, AbstractCommitLogSegmentManager manager)
+ {
+ super(commitLog, manager);
+ }
+
+ @Override
+ void writeLogHeader()
+ {
+ super.writeLogHeader();
+ try
+ {
+ channel.write((ByteBuffer) buffer.duplicate().flip());
+ manager.addSize(lastWrittenPos = buffer.position());
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ @Override
+ protected void internalClose()
+ {
+ try
+ {
+ manager.getBufferPool().releaseBuffer(buffer);
+ super.internalClose();
+ }
+ finally
+ {
+ manager.notifyBufferFreed();
+ }
+ }
++
++ @Override
++ protected void flush(int startMarker, int nextMarker)
++ {
++ try
++ {
++ SyncUtil.force(channel, true);
++ }
++ catch (Exception e)
++ {
++ throw new FSWriteError(e, getPath());
++ }
++ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,e2b9f72..663e7af
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,128 +1,98 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.commitlog;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
++import java.util.ArrayList;
+ import java.util.Random;
+
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+
+ import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.RebufferingInputStream;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+ /**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+ @Test
+ @BMRule(name = "force all calls to sync() to not flush to disk",
+ targetClass = "CommitLogSegment",
+ targetMethod = "sync(boolean)",
+ action = "$flush = false")
+ public void replayCommitLogWithoutFlushing() throws IOException
+ {
++ // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
++ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogSegmentSize(5);
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+ DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ byte[] entropy = new byte[1024];
+ new Random().nextBytes(entropy);
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ int samples = 10000;
+ for (int i = 0; i < samples; i++)
+ CommitLog.instance.add(m);
+
- CommitLog.instance.sync(false, true);
++ CommitLog.instance.sync(false);
+
- Replayer replayer = new Replayer(cfs1.metadata);
- File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
- replayer.recover(commitLogDir.listFiles());
- Assert.assertEquals(samples, replayer.count);
- }
-
- private static class Replayer extends CommitLogReplayer
- {
- private final CFMetaData cfm;
- private int count;
-
- Replayer(CFMetaData cfm)
- {
- super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
- this.cfm = cfm;
- }
-
- @Override
- void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
- {
- RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
- try
- {
- Mutation mutation = Mutation.serializer.deserialize(bufIn,
- desc.getMessagingVersion(),
- SerializationHelper.Flag.LOCAL);
++ ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
++ CommitLogReader reader = new CommitLogReader();
++ CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++ for (File f : toCheck)
++ reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+
- if (cfm == null || mutation.get(cfm) != null)
- count++;
- }
- catch (IOException e)
- {
- // Test fails.
- throw new AssertionError(e);
- }
- }
++ Assert.assertEquals(samples, testHandler.seenMutationCount());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index 3956de5,a1999ef..46a3fb0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@@ -67,12 -66,12 +67,12 @@@ public class CommitLogSegmentBackpressu
@BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
- targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
@BMRule(name = "Release Semaphore after sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
- targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
++ targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
public void testCompressedCommitLogBackpressure() throws Throwable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 267813e,b8f68ed..215ad6c
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -360,10 -339,10 +360,10 @@@ public class CommitLogTes
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
- CommitLog.instance.sync();
- CommitLog.instance.sync(true, true);
- CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
++ CommitLog.instance.sync(true);
+ CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
- assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+ assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
// Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
@@@ -678,114 -612,6 +678,114 @@@
}
@Test
+ public void replaySimple() throws IOException
+ {
+ int cellCount = 0;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ CommitLog.instance.add(rm1);
+
+ final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
+ CommitLog.instance.add(rm2);
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
+ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.replayFiles(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ @Test
+ public void replayWithDiscard() throws IOException
+ {
+ int cellCount = 0;
+ int max = 1024;
+ int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
+ CommitLogPosition commitLogPosition = null;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ for (int i = 0; i < max; i++)
+ {
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ CommitLogPosition position = CommitLog.instance.add(rm1);
+
+ if (i == discardPosition)
+ commitLogPosition = position;
+ if (i > discardPosition)
+ {
+ cellCount += 1;
+ }
+ }
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
+ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
+ File[] files = new File(CommitLog.instance.segmentManager.storageDirectory).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.replayFiles(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ class SimpleCountingReplayer extends CommitLogReplayer
+ {
+ private final CommitLogPosition filterPosition;
+ private final CFMetaData metadata;
+ int cells;
+ int skipped;
+
+ SimpleCountingReplayer(CommitLog commitLog, CommitLogPosition filterPosition, CFMetaData cfm)
+ {
+ super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
+ this.filterPosition = filterPosition;
+ this.metadata = cfm;
+ }
+
+ @SuppressWarnings("resource")
+ @Override
+ public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc)
+ {
+ // Filter out system writes that could flake the test.
+ if (!KEYSPACE1.equals(m.getKeyspaceName()))
+ return;
+
+ if (entryLocation <= filterPosition.position)
+ {
+ // Skip over this mutation.
+ skipped++;
+ return;
+ }
+ for (PartitionUpdate partitionUpdate : m.getPartitionUpdates())
+ {
+ // Only process mutations for the CF's we're testing against, since we can't deterministically predict
+ // whether or not system keyspaces will be mutated during a test.
+ if (partitionUpdate.metadata().cfName.equals(metadata.cfName))
+ {
+ for (Row row : partitionUpdate)
+ cells += Iterables.size(row.cells());
+ }
+ }
+ }
+ }
+
public void testUnwriteableFlushRecovery() throws ExecutionException, InterruptedException, IOException
{
CommitLog.instance.resetUnsafe(true);
@@@ -826,7 -652,7 +826,7 @@@
DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
}
- CommitLog.instance.sync();
- CommitLog.instance.sync(true, true);
++ CommitLog.instance.sync(true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
// If retries work subsequent flushes should clear up error and this should change to expect 0.
@@@ -859,7 -685,7 +859,7 @@@
for (SSTableReader reader : cfs.getLiveSSTables())
reader.reloadSSTableMetadata();
- CommitLog.instance.sync();
- CommitLog.instance.sync(true, true);
++ CommitLog.instance.sync(true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
// persisted all data in the commit log. Because we know there was an error, there must be something left to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c3a1a4fa/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 7f43378,36973f2..9a22b04
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@@ -35,44 -36,44 +35,44 @@@ import org.apache.cassandra.io.util.Reb
*/
public class CommitLogTestReplayer extends CommitLogReplayer
{
- public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
- {
- CommitLog.instance.sync(true, true);
-
- CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
- File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
- replayer.recover(commitLogDir.listFiles());
- }
-
- final private Predicate<Mutation> processor;
+ private final Predicate<Mutation> processor;
- public CommitLogTestReplayer(CommitLog log, Predicate<Mutation> processor)
+ public CommitLogTestReplayer(Predicate<Mutation> processor) throws IOException
{
- this(log, ReplayPosition.NONE, processor);
+ super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+
+ this.processor = processor;
+ commitLogReader = new CommitLogTestReader();
}
- public CommitLogTestReplayer(CommitLog log, ReplayPosition discardedPos, Predicate<Mutation> processor)
+ public void examineCommitLog() throws IOException
{
- super(log, discardedPos, null, ReplayFilter.create());
- this.processor = processor;
+ replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
}
- @Override
- void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+ private class CommitLogTestReader extends CommitLogReader
{
- RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
- Mutation mutation;
- try
- {
- mutation = Mutation.serializer.deserialize(bufIn,
- desc.getMessagingVersion(),
- SerializationHelper.Flag.LOCAL);
- Assert.assertTrue(processor.apply(mutation));
- }
- catch (IOException e)
+ @Override
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
{
- // Test fails.
- throw new AssertionError(e);
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ Mutation mutation;
+ try
+ {
+ mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ Assert.assertTrue(processor.apply(mutation));
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/6] cassandra git commit: More frequent commitlog chained markers
Posted by ja...@apache.org.
More frequent commitlog chained markers
patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f
Branch: refs/heads/trunk
Commit: 05cb556f90dbd1929a180254809e05620265419b
Parents: eb05025
Author: Jason Brown <ja...@apple.com>
Authored: Tue Oct 17 15:37:13 2017 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:06:38 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 10 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../db/commitlog/AbstractCommitLogService.java | 86 ++++++++++---
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogSegment.java | 85 ++++++++----
.../db/commitlog/CompressedSegment.java | 12 ++
.../db/commitlog/MemoryMappedSegment.java | 7 +-
.../db/commitlog/PeriodicCommitLogService.java | 2 +-
.../commitlog/CommitLogChainedMarkersTest.java | 128 +++++++++++++++++++
.../CommitLogSegmentBackpressureTest.java | 4 +-
.../cassandra/db/commitlog/CommitLogTest.java | 6 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
14 files changed, 304 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c50a3f..2683dc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * More frequent commitlog chained markers (CASSANDRA-13987)
* Fix serialized size of DataLimits (CASSANDRA-14057)
* Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
* Fix SSTableLoader logger message (CASSANDRA-14003)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b783090..71e0b2a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -300,10 +300,18 @@ counter_cache_save_period: 7200
#
# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
+# Time interval in millis at which we should update the chained markers in the commitlog.
+# This allows more of the commitlog to be replayed from the mmapped file
+# if the cassandra process crashes; this does not help in durability for surviving a host fail.
+# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
+# and only applies to periodic mode when not using commitlog compression or encryption.
+# commitlog_marker_period_in_ms: 100
+
+
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
# in it (potentially from each columnfamily in the system) has been
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..0796183 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
public CommitLogSync commitlog_sync;
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
+ public Integer commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..169ed3d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,6 +1519,16 @@ public class DatabaseDescriptor
conf.commitlog_sync_period_in_ms = periodMillis;
}
+ public static void setCommitLogMarkerPeriod(int markerPeriod)
+ {
+ conf.commitlog_marker_period_in_ms = markerPeriod;
+ }
+
+ public static int getCommitLogMarkerPeriod()
+ {
+ return conf.commitlog_marker_period_in_ms;
+ }
+
public static Config.CommitLogSync getCommitLogSync()
{
return conf.commitlog_sync;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index e5a5887..8a03b2f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -18,6 +18,9 @@
package org.apache.cassandra.db.commitlog;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
public abstract class AbstractCommitLogService
{
@@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService
final CommitLog commitLog;
private final String name;
- private final long pollIntervalMillis;
+
+ /**
+ * The duration between syncs to disk.
+ */
+ private final long syncIntervalMillis;
+
+ /**
+ * The duration between updating the chained markers in the the commit log file. This value should be
+ * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
+ */
+ private final long markerIntervalMillis;
+
+ /**
+ * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+ * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+ * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+ */
+ private volatile boolean syncRequested;
private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
@@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService
*
* Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
*/
- AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+ {
+ this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+ }
+
+ /**
+ * CommitLogService provides a fsync service for Allocations, fulfilling either the
+ * Batch or Periodic contract.
+ *
+ * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+ */
+ AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
{
this.commitLog = commitLog;
this.name = name;
- this.pollIntervalMillis = pollIntervalMillis;
+ this.syncIntervalMillis = syncIntervalMillis;
+
+ // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
+ // faster than the sync interval
+ if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
+ markerIntervalMillis = syncIntervalMillis;
+
+ // apply basic bounds checking on the marker interval
+ if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+ {
+ logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+ markerIntervalMillis, syncIntervalMillis);
+ markerIntervalMillis = syncIntervalMillis;
+ }
+
+ this.markerIntervalMillis = markerIntervalMillis;
}
// Separated into individual method to ensure relevant objects are constructed before this is started.
void start()
{
- if (pollIntervalMillis < 1)
- throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+ if (syncIntervalMillis < 1)
+ throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+ syncIntervalMillis * 1e-6));
Runnable runnable = new Runnable()
{
@@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService
run = !shutdown;
// sync and signal
- long syncStarted = System.currentTimeMillis();
- //This is a target for Byteman in CommitLogSegmentManagerTest
- commitLog.sync(shutdown);
- lastSyncedAt = syncStarted;
- syncComplete.signalAll();
-
+ long pollStarted = System.currentTimeMillis();
+ if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+ {
+ // in this branch, we want to flush the commit log to disk
+ commitLog.sync(shutdown, true);
+ syncRequested = false;
+ lastSyncedAt = pollStarted;
+ syncComplete.signalAll();
+ }
+ else
+ {
+ // in this branch, just update the commit log sync headers
+ commitLog.sync(false, false);
+ }
// sleep any time we have left before the next one is due
long now = System.currentTimeMillis();
- long sleep = syncStarted + pollIntervalMillis - now;
+ long sleep = pollStarted + markerIntervalMillis - now;
if (sleep < 0)
{
// if we have lagged noticeably, update our lag counter
@@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService
lagCount++;
}
syncCount++;
- totalSyncDuration += now - syncStarted;
+ totalSyncDuration += now - pollStarted;
if (firstLagAt > 0)
{
@@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService
// sleep for full poll-interval after an error, so we don't spam the log file
try
{
- haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+ haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
@@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService
*/
public WaitQueue.Signal requestExtraSync()
{
+ syncRequested = true;
WaitQueue.Signal signal = syncComplete.register();
haveWork.release(1);
return signal;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 40040ed..ff1b712 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean
/**
* Forces a disk flush on the commit log files that need it. Blocking.
*/
- public void sync(boolean syncAllSegments)
+ public void sync(boolean syncAllSegments, boolean flush)
{
CommitLogSegment current = allocator.allocatingFrom();
for (CommitLogSegment segment : allocator.getActiveSegments())
{
if (!syncAllSegments && segment.id > current.id)
return;
- segment.sync();
+ segment.sync(flush);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 236a1b1..8834c8c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -94,6 +94,12 @@ public abstract class CommitLogSegment
// sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
private volatile int lastSyncedOffset;
+ /**
+ * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+ * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+ */
+ private volatile int lastMarkerOffset;
+
// The end position of the buffer. Initially set to its capacity and updated to point to the last written position
// as the segment is being closed.
// No need to be volatile as writes are protected by appendOrder barrier.
@@ -167,7 +173,8 @@ public abstract class CommitLogSegment
// write the header
CommitLogDescriptor.writeHeader(buffer, descriptor);
endOfBuffer = buffer.capacity();
- lastSyncedOffset = buffer.position();
+
+ lastSyncedOffset = lastMarkerOffset = buffer.position();
allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
}
@@ -232,7 +239,7 @@ public abstract class CommitLogSegment
// ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
void discardUnusedTail()
{
- // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+ // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom()
// Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
// This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
// running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
@@ -271,13 +278,18 @@ public abstract class CommitLogSegment
}
/**
- * Forces a disk flush for this segment file.
+ * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+ *
+ * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
*/
- synchronized void sync()
+ synchronized void sync(boolean flush)
{
- boolean close = false;
+ assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+ lastMarkerOffset, lastSyncedOffset);
// check we have more work to do
- if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+ final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+ final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+ if (!(needToMarkData || hasDataToFlush))
return;
// Note: Even if the very first allocation of this sync section failed, we still want to enter this
// to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
@@ -285,31 +297,48 @@ public abstract class CommitLogSegment
// succeeded in the previous sync.
assert buffer != null; // Only close once.
- int startMarker = lastSyncedOffset;
- // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
- // the point at which we can safely consider records to have been completely written to.
- int nextMarker = allocate(SYNC_MARKER_SIZE);
- if (nextMarker < 0)
+ boolean close = false;
+ int startMarker = lastMarkerOffset;
+ int nextMarker, sectionEnd;
+ if (needToMarkData)
{
- // Ensure no more of this CLS is writeable, and mark ourselves for closing.
- discardUnusedTail();
- close = true;
-
- // We use the buffer size as the synced position after a close instead of the end of the actual data
- // to make sure we only close the buffer once.
- // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
- nextMarker = buffer.capacity();
+ // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+ // the point at which we can safely consider records to have been completely written to.
+ nextMarker = allocate(SYNC_MARKER_SIZE);
+ if (nextMarker < 0)
+ {
+ // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+ discardUnusedTail();
+ close = true;
+
+ // We use the buffer size as the synced position after a close instead of the end of the actual data
+ // to make sure we only close the buffer once.
+ // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+ nextMarker = buffer.capacity();
+ }
+ // Wait for mutations to complete as well as endOfBuffer to have been written.
+ waitForModifications();
+ sectionEnd = close ? endOfBuffer : nextMarker;
+
+ // Possibly perform compression or encryption and update the chained markers
+ write(startMarker, sectionEnd);
+ lastMarkerOffset = sectionEnd;
+ }
+ else
+ {
+ // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+ // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+ nextMarker = lastMarkerOffset;
+ sectionEnd = nextMarker;
}
- // Wait for mutations to complete as well as endOfBuffer to have been written.
- waitForModifications();
- int sectionEnd = close ? endOfBuffer : nextMarker;
- // Perform compression, writing to file and flush.
- write(startMarker, sectionEnd);
+ if (flush || close)
+ {
+ flush(startMarker, sectionEnd);
+ lastSyncedOffset = lastMarkerOffset = nextMarker;
+ }
- // Signal the sync as complete.
- lastSyncedOffset = nextMarker;
if (close)
internalClose();
syncComplete.signalAll();
@@ -327,6 +356,8 @@ public abstract class CommitLogSegment
abstract void write(int lastSyncedOffset, int nextMarker);
+ abstract void flush(int startMarker, int nextMarker);
+
public boolean isStillAllocating()
{
return allocatePosition.get() < endOfBuffer;
@@ -404,7 +435,7 @@ public abstract class CommitLogSegment
synchronized void close()
{
discardUnusedTail();
- sync();
+ sync(true);
assert buffer == null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c00ce18..8e05112 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment
channel.write(compressedBuffer);
assert channel.position() - lastWrittenPos == compressedBuffer.limit();
lastWrittenPos = channel.position();
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+ try
+ {
SyncUtil.force(channel, true);
}
catch (Exception e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a16d91..8259f04 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment
// write previous sync marker to point to next sync marker
// we don't chain the crcs here to ensure this method is idempotent if it fails
writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+ }
- try {
+ @Override
+ protected void flush(int startMarker, int nextMarker)
+ {
+ try
+ {
SyncUtil.force((MappedByteBuffer) buffer);
}
catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..76419b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
public PeriodicCommitLogService(final CommitLog commitLog)
{
- super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+ super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
}
protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
new file mode 100644
index 0000000..e2b9f72
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+/**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+@RunWith(BMUnitRunner.class)
+public class CommitLogChainedMarkersTest
+{
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+ @Test
+ @BMRule(name = "force all calls to sync() to not flush to disk",
+ targetClass = "CommitLogSegment",
+ targetMethod = "sync(boolean)",
+ action = "$flush = false")
+ public void replayCommitLogWithoutFlushing() throws IOException
+ {
+ DatabaseDescriptor.setCommitLogSegmentSize(5);
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+ DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ byte[] entropy = new byte[1024];
+ new Random().nextBytes(entropy);
+ final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ int samples = 10000;
+ for (int i = 0; i < samples; i++)
+ CommitLog.instance.add(m);
+
+ CommitLog.instance.sync(false, true);
+
+ Replayer replayer = new Replayer(cfs1.metadata);
+ File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
+ replayer.recover(commitLogDir.listFiles());
+ Assert.assertEquals(samples, replayer.count);
+ }
+
+ private static class Replayer extends CommitLogReplayer
+ {
+ private final CFMetaData cfm;
+ private int count;
+
+ Replayer(CFMetaData cfm)
+ {
+ super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
+ this.cfm = cfm;
+ }
+
+ @Override
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+ {
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ try
+ {
+ Mutation mutation = Mutation.serializer.deserialize(bufIn,
+ desc.getMessagingVersion(),
+ SerializationHelper.Flag.LOCAL);
+
+ if (cfm == null || mutation.get(cfm) != null)
+ count++;
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index b651098..a1999ef 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest
@BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
@BMRule(name = "Release Semaphore after sync",
targetClass = "AbstractCommitLogService$1",
targetMethod = "run",
- targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+ targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
public void testCompressedCommitLogBackpressure() throws Throwable
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9e9ee53..b8f68ed 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -339,7 +339,7 @@ public class CommitLogTest
// "Flush": this won't delete anything
UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
@@ -652,7 +652,7 @@ public class CommitLogTest
DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
}
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
// If retries work subsequent flushes should clear up error and this should change to expect 0.
@@ -685,7 +685,7 @@ public class CommitLogTest
for (SSTableReader reader : cfs.getLiveSSTables())
reader.reloadSSTableMetadata();
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
// In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
// persisted all data in the commit log. Because we know there was an error, there must be something left to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..36973f2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
{
public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
{
- CommitLog.instance.sync(true);
+ CommitLog.instance.sync(true, true);
CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2402acd4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2402acd4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2402acd4
Branch: refs/heads/trunk
Commit: 2402acd47e3bb514981cde742b7330666c564869
Parents: d274c6a c3a1a4f
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:10:08 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:11:21 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 10 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../AbstractCommitLogSegmentManager.java | 8 +-
.../db/commitlog/AbstractCommitLogService.java | 84 ++++++++++++++---
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogSegment.java | 90 ++++++++++++------
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 3 -
.../db/commitlog/FileDirectSegment.java | 14 +++
.../db/commitlog/MemoryMappedSegment.java | 4 +
.../db/commitlog/PeriodicCommitLogService.java | 2 +-
.../cassandra/db/commitlog/CDCTestReplayer.java | 2 +-
.../commitlog/CommitLogChainedMarkersTest.java | 98 ++++++++++++++++++++
.../CommitLogSegmentBackpressureTest.java | 4 +-
.../CommitLogSegmentManagerCDCTest.java | 8 +-
.../cassandra/db/commitlog/CommitLogTest.java | 10 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
19 files changed, 288 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index ba478e7,3569d36..02cec12
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -358,24 -367,21 +358,24 @@@ counter_cache_save_period: 720
# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
# saved_caches_directory: /var/lib/cassandra/saved_caches
-# commitlog_sync may be either "periodic" or "batch."
+# commitlog_sync may be either "periodic", "group", or "batch."
#
# When in batch mode, Cassandra won't ack writes until the commit log
-# has been fsynced to disk. It will wait
-# commitlog_sync_batch_window_in_ms milliseconds between fsyncs.
-# This window should be kept short because the writer threads will
-# be unable to do extra work while waiting. (You may need to increase
-# concurrent_writes for the same reason.)
+# has been flushed to disk. Each incoming write will trigger the flush task.
+# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had
+# almost no value, and is being removed.
#
-# commitlog_sync: batch
# commitlog_sync_batch_window_in_ms: 2
#
-# the other option is "periodic" where writes may be acked immediately
+# group mode is similar to batch mode, where Cassandra will not ack writes
+# until the commit log has been flushed to disk. The difference is group
+# mode will wait up to commitlog_sync_group_window_in_ms between flushes.
+#
+# commitlog_sync_group_window_in_ms: 1000
+#
+# the default option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
- # milliseconds.
+ # milliseconds.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 1db8217,5fe752e..4fa3bed
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -185,13 -198,9 +185,14 @@@ public class Confi
public String commitlog_directory;
public Integer commitlog_total_space_in_mb;
public CommitLogSync commitlog_sync;
+
+ /**
+ * @deprecated since 4.0 This value was near useless, and we're not using it anymore
+ */
public double commitlog_sync_batch_window_in_ms = Double.NaN;
+ public double commitlog_sync_group_window_in_ms = Double.NaN;
public int commitlog_sync_period_in_ms;
+ public int commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 3a57139,7c02892..016dbc1
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -94,9 -89,14 +94,15 @@@ public abstract class CommitLogSegmen
// Everything before this offset has been synced and written. The SYNC_MARKER_SIZE bytes after
// each sync are reserved, and point forwards to the next such offset. The final
// sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
- private volatile int lastSyncedOffset;
+ @VisibleForTesting
+ volatile int lastSyncedOffset;
+ /**
+ * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+ * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+ */
+ private volatile int lastMarkerOffset;
+
// The end position of the buffer. Initially set to its capacity and updated to point to the last written position
// as the segment is being closed.
// No need to be volatile as writes are protected by appendOrder barrier.
@@@ -316,34 -319,48 +328,50 @@@
// succeeded in the previous sync.
assert buffer != null; // Only close once.
- int startMarker = lastSyncedOffset;
- // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
- // the point at which we can safely consider records to have been completely written to.
- int nextMarker = allocate(SYNC_MARKER_SIZE);
- if (nextMarker < 0)
+ boolean close = false;
+ int startMarker = lastMarkerOffset;
+ int nextMarker, sectionEnd;
+ if (needToMarkData)
{
- // Ensure no more of this CLS is writeable, and mark ourselves for closing.
- discardUnusedTail();
- close = true;
-
- // We use the buffer size as the synced position after a close instead of the end of the actual data
- // to make sure we only close the buffer once.
- // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
- nextMarker = buffer.capacity();
- }
+ // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+ // the point at which we can safely consider records to have been completely written to.
+ nextMarker = allocate(SYNC_MARKER_SIZE);
+ if (nextMarker < 0)
+ {
+ // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+ discardUnusedTail();
+ close = true;
+
+ // We use the buffer size as the synced position after a close instead of the end of the actual data
+ // to make sure we only close the buffer once.
+ // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+ nextMarker = buffer.capacity();
+ }
+ // Wait for mutations to complete as well as endOfBuffer to have been written.
+ waitForModifications();
+ sectionEnd = close ? endOfBuffer : nextMarker;
- // Wait for mutations to complete as well as endOfBuffer to have been written.
- waitForModifications();
- int sectionEnd = close ? endOfBuffer : nextMarker;
+ // Possibly perform compression or encryption and update the chained markers
+ write(startMarker, sectionEnd);
+ lastMarkerOffset = sectionEnd;
+ }
+ else
+ {
+ // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+ // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+ nextMarker = lastMarkerOffset;
+ sectionEnd = nextMarker;
+ }
- // Possibly perform compression or encryption, writing to file and flush.
- write(startMarker, sectionEnd);
- if (cdcState == CDCState.CONTAINS)
- writeCDCIndexFile(descriptor, sectionEnd, close);
+ if (flush || close)
+ {
+ flush(startMarker, sectionEnd);
++ if (cdcState == CDCState.CONTAINS)
++ writeCDCIndexFile(descriptor, sectionEnd, close);
+ lastSyncedOffset = lastMarkerOffset = nextMarker;
+ }
- // Signal the sync as complete.
- lastSyncedOffset = nextMarker;
if (close)
internalClose();
syncComplete.signalAll();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
index 3695da8,0000000..18bc6e0
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
@@@ -1,76 -1,0 +1,76 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+
+/**
+ * Utility class that flags the replayer as having seen a CDC mutation and calculates offset but doesn't apply mutations
+ */
+public class CDCTestReplayer extends CommitLogReplayer
+{
+ private static final Logger logger = LoggerFactory.getLogger(CDCTestReplayer.class);
+
+ public CDCTestReplayer() throws IOException
+ {
+ super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ commitLogReader = new CommitLogTestReader();
+ }
+
+ public void examineCommitLog() throws IOException
+ {
+ replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
+ }
+
+ private class CommitLogTestReader extends CommitLogReader
+ {
+ @Override
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
+ {
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ Mutation mutation;
+ try
+ {
+ mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ if (mutation.trackedByCDC())
+ sawCDCMutation = true;
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,663e7af..be44ec3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,98 +1,98 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.commitlog;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Random;
+
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+ /**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+ @Test
+ @BMRule(name = "force all calls to sync() to not flush to disk",
+ targetClass = "CommitLogSegment",
+ targetMethod = "sync(boolean)",
+ action = "$flush = false")
+ public void replayCommitLogWithoutFlushing() throws IOException
+ {
+ // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogSegmentSize(5);
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+ DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ byte[] entropy = new byte[1024];
+ new Random().nextBytes(entropy);
- final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
++ final Mutation m = new RowUpdateBuilder(cfs1.metadata.get(), 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ int samples = 10000;
+ for (int i = 0; i < samples; i++)
+ CommitLog.instance.add(m);
+
+ CommitLog.instance.sync(false);
+
+ ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
+ CommitLogReader reader = new CommitLogReader();
- CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++ CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata.get());
+ for (File f : toCheck)
+ reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+
+ Assert.assertEquals(samples, testHandler.seenMutationCount());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 80dfd01,68ce57d..7417cd3
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@@ -159,259 -199,6 +159,259 @@@ public class CommitLogSegmentManagerCDC
}
}
+ @Test
+ public void testCDCIndexFileWriteOnSync() throws IOException
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
+ int syncOffset = currentSegment.lastSyncedOffset;
+
+ // Confirm index file is written
+ File cdcIndexFile = currentSegment.getCDCIndexFile();
+ Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Read index value and confirm it's == end from last sync
+ BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
+ String input = in.readLine();
+ Integer offset = Integer.parseInt(input);
+ Assert.assertEquals(syncOffset, (long)offset);
+ in.close();
+ }
+
+ @Test
+ public void testCompletedFlag() throws IOException
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+ CommitLogSegment initialSegment = CommitLog.instance.segmentManager.allocatingFrom();
+ DatabaseDescriptor.setCDCSpaceInMB(8);
+ try
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+ }
+ }
+ catch (CDCWriteException ce)
+ {
+ // pass. Expected since we'll have a file or two linked on restart of CommitLog due to replay
+ }
+
+ CommitLog.instance.forceRecycleAllSegments();
+
+ // Confirm index file is written
+ File cdcIndexFile = initialSegment.getCDCIndexFile();
+ Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Read index file and confirm second line is COMPLETED
+ BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
+ String input = in.readLine();
+ input = in.readLine();
+ Assert.assertTrue("Expected COMPLETED in index file, got: " + input, input.equals("COMPLETED"));
+ in.close();
+ }
+
+ @Test
+ public void testDeleteLinkOnDiscardNoCDC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;");
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+ CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
+
+ // Confirm that, with no CDC data present, we've hard-linked but have no index file
+ Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
+ File cdcIndexFile = currentSegment.getCDCIndexFile();
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Sync and confirm no index written as index is written on flush
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Force a full recycle and confirm hard-link is deleted
+ CommitLog.instance.forceRecycleAllSegments();
+ CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+ Assert.assertFalse("Expected hard link to CLS to be deleted on non-cdc segment: " + linked, Files.exists(linked));
+ }
+
+ @Test
+ public void testRetainLinkOnDiscardCDC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+ CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
+ File cdcIndexFile = currentSegment.getCDCIndexFile();
+ Assert.assertFalse("Expected no index file before flush but found: " + cdcIndexFile, cdcIndexFile.exists());
+
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+
+ Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
+ // Confirm that, with CDC data present but not yet flushed, we've hard-linked but have no index file
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+
+ // Sync and confirm index written as index is written on flush
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertTrue("Expected cdc index file after flush but found none: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Force a full recycle and confirm all files remain
+ CommitLog.instance.forceRecycleAllSegments();
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertTrue("Expected cdc index file after recycle but found none: " + cdcIndexFile, cdcIndexFile.exists());
+ }
+
+ @Test
+ public void testReplayLogic() throws IOException
+ {
+ // Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
+ String table_name = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+
+ DatabaseDescriptor.setCDCSpaceInMB(8);
+ TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata();
+ try
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ new RowUpdateBuilder(ccfm, 0, i)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+ }
+ Assert.fail("Expected CDCWriteException from full CDC but did not receive it.");
+ }
+ catch (CDCWriteException e)
+ {
+ // pass
+ }
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ CommitLog.instance.stopUnsafe(false);
+
+ // Build up a list of expected index files after replay and then clear out cdc_raw
+ List<CDCIndexData> oldData = parseCDCIndexData();
+ for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+ FileUtils.deleteWithConfirm(f.getAbsolutePath());
+
+ try
+ {
+ Assert.assertEquals("Expected 0 files in CDC folder after deletion. ",
+ 0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
+ }
+ finally
+ {
+ // If we don't have a started commitlog, assertions will cause the test to hang. I assume it's some assumption
+ // hang in the shutdown on CQLTester trying to clean up / drop keyspaces / tables and hanging applying
+ // mutations.
+ CommitLog.instance.start();
+ CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+ }
+ CDCTestReplayer replayer = new CDCTestReplayer();
+ replayer.examineCommitLog();
+
+ // Rough sanity check -> should be files there now.
+ Assert.assertTrue("Expected non-zero number of files in CDC folder after restart.",
+ new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length > 0);
+
+ // Confirm all the old indexes in old are present and >= the original offset, as we flag the entire segment
+ // as cdc written on a replay.
+ List<CDCIndexData> newData = parseCDCIndexData();
+ for (CDCIndexData cid : oldData)
+ {
+ boolean found = false;
+ for (CDCIndexData ncid : newData)
+ {
+ if (cid.fileName.equals(ncid.fileName))
+ {
+ Assert.assertTrue("New CDC index file expected to have >= offset in old.", ncid.offset >= cid.offset);
+ found = true;
+ }
+ }
+ if (!found)
+ {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append(String.format("Missing old CDCIndexData in new set after replay: %s\n", cid));
+ errorMessage.append("List of CDCIndexData in new set of indexes after replay:\n");
+ for (CDCIndexData ncid : newData)
+ errorMessage.append(String.format(" %s\n", ncid));
+ Assert.fail(errorMessage.toString());
+ }
+ }
+
+ // And make sure we don't have new CDC Indexes we don't expect
+ for (CDCIndexData ncid : newData)
+ {
+ boolean found = false;
+ for (CDCIndexData cid : oldData)
+ {
+ if (cid.fileName.equals(ncid.fileName))
+ found = true;
+ }
+ if (!found)
+ Assert.fail(String.format("Unexpected new CDCIndexData found after replay: %s\n", ncid));
+ }
+ }
+
+ private List<CDCIndexData> parseCDCIndexData()
+ {
+ List<CDCIndexData> results = new ArrayList<>();
+ try
+ {
+ for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+ {
+ if (f.getName().contains("_cdc.idx"))
+ results.add(new CDCIndexData(f));
+ }
+ }
+ catch (IOException e)
+ {
+ Assert.fail(String.format("Failed to parse CDCIndexData: %s", e.getMessage()));
+ }
+ return results;
+ }
+
+ private static class CDCIndexData
+ {
+ private final String fileName;
+ private final int offset;
+
+ CDCIndexData(File f) throws IOException
+ {
+ String line = "";
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(f))))
+ {
+ line = br.readLine();
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ fileName = f.getName();
+ offset = Integer.parseInt(line);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s,%d", fileName, offset);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ CDCIndexData cid = (CDCIndexData)other;
+ return fileName.equals(cid.fileName) && offset == cid.offset;
+ }
+ }
+
private ByteBuffer randomizeBuffer(int size)
{
byte[] toWrap = new byte[size];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 8d04ecc,215ad6c..da895a0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -340,9 -359,9 +340,9 @@@ public abstract class CommitLogTes
assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
// "Flush": this won't delete anything
- UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
+ TableId id1 = rm.getTableIds().iterator().next();
- CommitLog.instance.sync();
+ CommitLog.instance.sync(true);
- CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
+ CommitLog.instance.discardCompletedSegments(id1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
@@@ -676,9 -696,9 +676,9 @@@
cellCount += 1;
CommitLog.instance.add(rm2);
- CommitLog.instance.sync();
+ CommitLog.instance.sync(true);
- SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata());
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
Assert.assertFalse(activeSegments.isEmpty());
@@@ -713,9 -733,9 +713,9 @@@
}
}
- CommitLog.instance.sync();
+ CommitLog.instance.sync(true);
- SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata());
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
Assert.assertFalse(activeSegments.isEmpty());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org