You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/12/05 13:12:03 UTC

[2/6] cassandra git commit: More frequent commitlog chained markers

More frequent commitlog chained markers

patch by jasobrown; reviewed by Sam Tunnecliffe for CASSANDRA-13987


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/05cb556f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/05cb556f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/05cb556f

Branch: refs/heads/cassandra-3.11
Commit: 05cb556f90dbd1929a180254809e05620265419b
Parents: eb05025
Author: Jason Brown <ja...@apple.com>
Authored: Tue Oct 17 15:37:13 2017 -0700
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:06:38 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |  10 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 .../db/commitlog/AbstractCommitLogService.java  |  86 ++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../db/commitlog/CommitLogSegment.java          |  85 ++++++++----
 .../db/commitlog/CompressedSegment.java         |  12 ++
 .../db/commitlog/MemoryMappedSegment.java       |   7 +-
 .../db/commitlog/PeriodicCommitLogService.java  |   2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 128 +++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |   4 +-
 .../cassandra/db/commitlog/CommitLogTest.java   |   6 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   2 +-
 14 files changed, 304 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6c50a3f..2683dc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * More frequent commitlog chained markers (CASSANDRA-13987)
  * Fix serialized size of DataLimits (CASSANDRA-14057)
  * Add flag to allow dropping oversized read repair mutations (CASSANDRA-13975)
  * Fix SSTableLoader logger message (CASSANDRA-14003)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index b783090..71e0b2a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -300,10 +300,18 @@ counter_cache_save_period: 7200
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds. 
+# milliseconds.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
 
+# Time interval in millis at which we should update the chained markers in the commitlog.
+# This allows more of the commitlog to be replayed from the mmapped file
+# if the cassandra process crashes; this does not help in durability for surviving a host fail.
+# This value only makes sense if it is significantly less that commitlog_sync_period_in_ms,
+# and only applies to periodic mode when not using commitlog compression or encryption.
+# commitlog_marker_period_in_ms: 100
+
+
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data
 # in it (potentially from each columnfamily in the system) has been

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64d41bb..0796183 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -192,6 +192,7 @@ public class Config
     public CommitLogSync commitlog_sync;
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
+    public Integer commitlog_marker_period_in_ms = 0;
     public int commitlog_segment_size_in_mb = 32;
     public ParameterizedClass commitlog_compression;
     public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index efc71ef..169ed3d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1519,6 +1519,16 @@ public class DatabaseDescriptor
         conf.commitlog_sync_period_in_ms = periodMillis;
     }
 
+    public static void setCommitLogMarkerPeriod(int markerPeriod)
+    {
+        conf.commitlog_marker_period_in_ms = markerPeriod;
+    }
+
+    public static int getCommitLogMarkerPeriod()
+    {
+        return conf.commitlog_marker_period_in_ms;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index e5a5887..8a03b2f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.db.commitlog;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.slf4j.*;
@@ -26,8 +29,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
-
 public abstract class AbstractCommitLogService
 {
 
@@ -47,7 +48,24 @@ public abstract class AbstractCommitLogService
 
     final CommitLog commitLog;
     private final String name;
-    private final long pollIntervalMillis;
+
+    /**
+     * The duration between syncs to disk.
+     */
+    private final long syncIntervalMillis;
+
+    /**
+     * The duration between updating the chained markers in the the commit log file. This value should be
+     * 0 < {@link #markerIntervalMillis} <= {@link #syncIntervalMillis}.
+     */
+    private final long markerIntervalMillis;
+
+    /**
+     * A flag that callers outside of the sync thread can use to signal they want the commitlog segments
+     * to be flushed to disk. Note: this flag is primarily to support commit log's batch mode, which requires
+     * an immediate flush to disk on every mutation; see {@link BatchCommitLogService#maybeWaitForSync(Allocation)}.
+     */
+    private volatile boolean syncRequested;
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class);
 
@@ -57,18 +75,45 @@ public abstract class AbstractCommitLogService
      *
      * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
      */
-    AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis)
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis)
+    {
+        this(commitLog, name, syncIntervalMillis, syncIntervalMillis);
+    }
+
+    /**
+     * CommitLogService provides a fsync service for Allocations, fulfilling either the
+     * Batch or Periodic contract.
+     *
+     * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue.
+     */
+    AbstractCommitLogService(final CommitLog commitLog, final String name, final long syncIntervalMillis, long markerIntervalMillis)
     {
         this.commitLog = commitLog;
         this.name = name;
-        this.pollIntervalMillis = pollIntervalMillis;
+        this.syncIntervalMillis = syncIntervalMillis;
+
+        // if we are not using periodic mode, or we using compression, we shouldn't update the chained markers
+        // faster than the sync interval
+        if (DatabaseDescriptor.getCommitLogSync() != Config.CommitLogSync.periodic || commitLog.configuration.useCompression())
+            markerIntervalMillis = syncIntervalMillis;
+
+        // apply basic bounds checking on the marker interval
+        if (markerIntervalMillis <= 0 || markerIntervalMillis > syncIntervalMillis)
+        {
+            logger.debug("commit log marker interval {} is less than zero or above the sync interval {}; setting value to sync interval",
+                        markerIntervalMillis, syncIntervalMillis);
+            markerIntervalMillis = syncIntervalMillis;
+        }
+
+        this.markerIntervalMillis = markerIntervalMillis;
     }
 
     // Separated into individual method to ensure relevant objects are constructed before this is started.
     void start()
     {
-        if (pollIntervalMillis < 1)
-            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis));
+        if (syncIntervalMillis < 1)
+            throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
+                                                             syncIntervalMillis * 1e-6));
 
         Runnable runnable = new Runnable()
         {
@@ -89,16 +134,24 @@ public abstract class AbstractCommitLogService
                         run = !shutdown;
 
                         // sync and signal
-                        long syncStarted = System.currentTimeMillis();
-                        //This is a target for Byteman in CommitLogSegmentManagerTest
-                        commitLog.sync(shutdown);
-                        lastSyncedAt = syncStarted;
-                        syncComplete.signalAll();
-
+                        long pollStarted = System.currentTimeMillis();
+                        if (lastSyncedAt + syncIntervalMillis <= pollStarted || shutdown || syncRequested)
+                        {
+                            // in this branch, we want to flush the commit log to disk
+                            commitLog.sync(shutdown, true);
+                            syncRequested = false;
+                            lastSyncedAt = pollStarted;
+                            syncComplete.signalAll();
+                        }
+                        else
+                        {
+                            // in this branch, just update the commit log sync headers
+                            commitLog.sync(false, false);
+                        }
 
                         // sleep any time we have left before the next one is due
                         long now = System.currentTimeMillis();
-                        long sleep = syncStarted + pollIntervalMillis - now;
+                        long sleep = pollStarted + markerIntervalMillis - now;
                         if (sleep < 0)
                         {
                             // if we have lagged noticeably, update our lag counter
@@ -111,7 +164,7 @@ public abstract class AbstractCommitLogService
                             lagCount++;
                         }
                         syncCount++;
-                        totalSyncDuration += now - syncStarted;
+                        totalSyncDuration += now - pollStarted;
 
                         if (firstLagAt > 0)
                         {
@@ -149,7 +202,7 @@ public abstract class AbstractCommitLogService
                         // sleep for full poll-interval after an error, so we don't spam the log file
                         try
                         {
-                            haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS);
+                            haveWork.tryAcquire(markerIntervalMillis, TimeUnit.MILLISECONDS);
                         }
                         catch (InterruptedException e)
                         {
@@ -180,6 +233,7 @@ public abstract class AbstractCommitLogService
      */
     public WaitQueue.Signal requestExtraSync()
     {
+        syncRequested = true;
         WaitQueue.Signal signal = syncComplete.register();
         haveWork.release(1);
         return signal;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 40040ed..ff1b712 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -220,14 +220,14 @@ public class CommitLog implements CommitLogMBean
     /**
      * Forces a disk flush on the commit log files that need it.  Blocking.
      */
-    public void sync(boolean syncAllSegments)
+    public void sync(boolean syncAllSegments, boolean flush)
     {
         CommitLogSegment current = allocator.allocatingFrom();
         for (CommitLogSegment segment : allocator.getActiveSegments())
         {
             if (!syncAllSegments && segment.id > current.id)
                 return;
-            segment.sync();
+            segment.sync(flush);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 236a1b1..8834c8c 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -94,6 +94,12 @@ public abstract class CommitLogSegment
     // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
     private volatile int lastSyncedOffset;
 
+    /**
+     * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+     * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+     */
+    private volatile int lastMarkerOffset;
+
     // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
     // as the segment is being closed.
     // No need to be volatile as writes are protected by appendOrder barrier.
@@ -167,7 +173,8 @@ public abstract class CommitLogSegment
         // write the header
         CommitLogDescriptor.writeHeader(buffer, descriptor);
         endOfBuffer = buffer.capacity();
-        lastSyncedOffset = buffer.position();
+
+        lastSyncedOffset = lastMarkerOffset = buffer.position();
         allocatePosition.set(lastSyncedOffset + SYNC_MARKER_SIZE);
     }
 
@@ -232,7 +239,7 @@ public abstract class CommitLogSegment
     // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded
     void discardUnusedTail()
     {
-        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom()
+        // We guard this with the OpOrdering instead of synchronised due to potential dead-lock with ACLSM.advanceAllocatingFrom()
         // Ensures endOfBuffer update is reflected in the buffer end position picked up by sync().
         // This actually isn't strictly necessary, as currently all calls to discardUnusedTail are executed either by the thread
         // running sync or within a mutation already protected by this OpOrdering, but to prevent future potential mistakes,
@@ -271,13 +278,18 @@ public abstract class CommitLogSegment
     }
 
     /**
-     * Forces a disk flush for this segment file.
+     * Update the chained markers in the commit log buffer and possibly force a disk flush for this segment file.
+     *
+     * @param flush true if the segment should flush to disk; else, false for just updating the chained markers.
      */
-    synchronized void sync()
+    synchronized void sync(boolean flush)
     {
-        boolean close = false;
+        assert lastMarkerOffset >= lastSyncedOffset : String.format("commit log segment positions are incorrect: last marked = %d, last synced = %d",
+                                                                    lastMarkerOffset, lastSyncedOffset);
         // check we have more work to do
-        if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE)
+        final boolean needToMarkData = allocatePosition.get() > lastMarkerOffset + SYNC_MARKER_SIZE;
+        final boolean hasDataToFlush = lastSyncedOffset != lastMarkerOffset;
+        if (!(needToMarkData || hasDataToFlush))
             return;
         // Note: Even if the very first allocation of this sync section failed, we still want to enter this
         // to ensure the segment is closed. As allocatePosition is set to 1 beyond the capacity of the buffer,
@@ -285,31 +297,48 @@ public abstract class CommitLogSegment
         // succeeded in the previous sync.
         assert buffer != null;  // Only close once.
 
-        int startMarker = lastSyncedOffset;
-        // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
-        // the point at which we can safely consider records to have been completely written to.
-        int nextMarker = allocate(SYNC_MARKER_SIZE);
-        if (nextMarker < 0)
+        boolean close = false;
+        int startMarker = lastMarkerOffset;
+        int nextMarker, sectionEnd;
+        if (needToMarkData)
         {
-            // Ensure no more of this CLS is writeable, and mark ourselves for closing.
-            discardUnusedTail();
-            close = true;
-
-            // We use the buffer size as the synced position after a close instead of the end of the actual data
-            // to make sure we only close the buffer once.
-            // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
-            nextMarker = buffer.capacity();
+            // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+            // the point at which we can safely consider records to have been completely written to.
+            nextMarker = allocate(SYNC_MARKER_SIZE);
+            if (nextMarker < 0)
+            {
+                // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+                discardUnusedTail();
+                close = true;
+
+                // We use the buffer size as the synced position after a close instead of the end of the actual data
+                // to make sure we only close the buffer once.
+                // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+                nextMarker = buffer.capacity();
+            }
+            // Wait for mutations to complete as well as endOfBuffer to have been written.
+            waitForModifications();
+            sectionEnd = close ? endOfBuffer : nextMarker;
+
+            // Possibly perform compression or encryption and update the chained markers
+            write(startMarker, sectionEnd);
+            lastMarkerOffset = sectionEnd;
+        }
+        else
+        {
+            // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+            // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+            nextMarker = lastMarkerOffset;
+            sectionEnd = nextMarker;
         }
 
-        // Wait for mutations to complete as well as endOfBuffer to have been written.
-        waitForModifications();
-        int sectionEnd = close ? endOfBuffer : nextMarker;
 
-        // Perform compression, writing to file and flush.
-        write(startMarker, sectionEnd);
+        if (flush || close)
+        {
+            flush(startMarker, sectionEnd);
+            lastSyncedOffset = lastMarkerOffset = nextMarker;
+        }
 
-        // Signal the sync as complete.
-        lastSyncedOffset = nextMarker;
         if (close)
             internalClose();
         syncComplete.signalAll();
@@ -327,6 +356,8 @@ public abstract class CommitLogSegment
 
     abstract void write(int lastSyncedOffset, int nextMarker);
 
+    abstract void flush(int startMarker, int nextMarker);
+
     public boolean isStillAllocating()
     {
         return allocatePosition.get() < endOfBuffer;
@@ -404,7 +435,7 @@ public abstract class CommitLogSegment
     synchronized void close()
     {
         discardUnusedTail();
-        sync();
+        sync(true);
         assert buffer == null;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c00ce18..8e05112 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -139,6 +139,18 @@ public class CompressedSegment extends CommitLogSegment
             channel.write(compressedBuffer);
             assert channel.position() - lastWrittenPos == compressedBuffer.limit();
             lastWrittenPos = channel.position();
+        }
+        catch (Exception e)
+        {
+            throw new FSWriteError(e, getPath());
+        }
+    }
+
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force(channel, true);
         }
         catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 3a16d91..8259f04 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -78,8 +78,13 @@ public class MemoryMappedSegment extends CommitLogSegment
         // write previous sync marker to point to next sync marker
         // we don't chain the crcs here to ensure this method is idempotent if it fails
         writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
+    }
 
-        try {
+    @Override
+    protected void flush(int startMarker, int nextMarker)
+    {
+        try
+        {
             SyncUtil.force((MappedByteBuffer) buffer);
         }
         catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
index 86a248b..76419b7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogService.java
@@ -26,7 +26,7 @@ class PeriodicCommitLogService extends AbstractCommitLogService
 
     public PeriodicCommitLogService(final CommitLog commitLog)
     {
-        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod());
+        super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod(), DatabaseDescriptor.getCommitLogMarkerPeriod());
     }
 
     protected void maybeWaitForSync(CommitLogSegment.Allocation alloc)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
new file mode 100644
index 0000000..e2b9f72
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+/**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+@RunWith(BMUnitRunner.class)
+public class CommitLogChainedMarkersTest
+{
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+    @Test
+    @BMRule(name = "force all calls to sync() to not flush to disk",
+    targetClass = "CommitLogSegment",
+    targetMethod = "sync(boolean)",
+    action = "$flush = false")
+    public void replayCommitLogWithoutFlushing() throws IOException
+    {
+        DatabaseDescriptor.setCommitLogSegmentSize(5);
+        DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+        DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+        CompactionManager.instance.disableAutoCompaction();
+
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+        byte[] entropy = new byte[1024];
+        new Random().nextBytes(entropy);
+        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                           .clustering("bytes")
+                           .add("val", ByteBuffer.wrap(entropy))
+                           .build();
+
+        int samples = 10000;
+        for (int i = 0; i < samples; i++)
+            CommitLog.instance.add(m);
+
+        CommitLog.instance.sync(false, true);
+
+        Replayer replayer = new Replayer(cfs1.metadata);
+        File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());
+        replayer.recover(commitLogDir.listFiles());
+        Assert.assertEquals(samples, replayer.count);
+    }
+
+    private static class Replayer extends CommitLogReplayer
+    {
+        private final CFMetaData cfm;
+        private int count;
+
+        Replayer(CFMetaData cfm)
+        {
+            super(CommitLog.instance, ReplayPosition.NONE, null, ReplayFilter.create());
+            this.cfm = cfm;
+        }
+
+        @Override
+        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+        {
+            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+            try
+            {
+                Mutation mutation = Mutation.serializer.deserialize(bufIn,
+                                                           desc.getMessagingVersion(),
+                                                           SerializationHelper.Flag.LOCAL);
+
+                if (cfm == null || mutation.get(cfm) != null)
+                    count++;
+            }
+            catch (IOException e)
+            {
+                // Test fails.
+                throw new AssertionError(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
index b651098..a1999ef 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
@@ -66,12 +66,12 @@ public class CommitLogSegmentBackpressureTest
     @BMRules(rules = {@BMRule(name = "Acquire Semaphore before sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AT INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.acquire()"),
                       @BMRule(name = "Release Semaphore after sync",
                               targetClass = "AbstractCommitLogService$1",
                               targetMethod = "run",
-                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync",
+                              targetLocation = "AFTER INVOKE org.apache.cassandra.db.commitlog.CommitLog.sync(boolean, boolean)",
                               action = "org.apache.cassandra.db.commitlog.CommitLogSegmentBackpressureTest.allowSync.release()")})
     public void testCompressedCommitLogBackpressure() throws Throwable
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 9e9ee53..b8f68ed 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -339,7 +339,7 @@ public class CommitLogTest
 
         // "Flush": this won't delete anything
         UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         CommitLog.instance.discardCompletedSegments(cfid1, ReplayPosition.NONE, CommitLog.instance.getContext());
 
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
@@ -652,7 +652,7 @@ public class CommitLogTest
             DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
         }
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // Currently we don't attempt to re-flush a memtable that failed, thus make sure data is replayed by commitlog.
         // If retries work subsequent flushes should clear up error and this should change to expect 0.
@@ -685,7 +685,7 @@ public class CommitLogTest
         for (SSTableReader reader : cfs.getLiveSSTables())
             reader.reloadSSTableMetadata();
 
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
         System.setProperty("cassandra.replayList", KEYSPACE1 + "." + STANDARD1);
         // In the absence of error, this should be 0 because forceBlockingFlush/forceRecycleAllSegments would have
         // persisted all data in the commit log. Because we know there was an error, there must be something left to

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05cb556f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index e690785..36973f2 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -38,7 +38,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
 {
     public static void examineCommitLog(Predicate<Mutation> processor) throws IOException
     {
-        CommitLog.instance.sync(true);
+        CommitLog.instance.sync(true, true);
 
         CommitLogTestReplayer replayer = new CommitLogTestReplayer(CommitLog.instance, processor);
         File commitLogDir = new File(DatabaseDescriptor.getCommitLogLocation());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org