You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/12/05 13:12:07 UTC

[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2402acd4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2402acd4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2402acd4

Branch: refs/heads/trunk
Commit: 2402acd47e3bb514981cde742b7330666c564869
Parents: d274c6a c3a1a4f
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:10:08 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:11:21 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 conf/cassandra.yaml                             | 10 +-
 .../org/apache/cassandra/config/Config.java     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    | 10 ++
 .../AbstractCommitLogSegmentManager.java        |  8 +-
 .../db/commitlog/AbstractCommitLogService.java  | 84 ++++++++++++++---
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 .../db/commitlog/CommitLogSegment.java          | 90 ++++++++++++------
 .../db/commitlog/CompressedSegment.java         |  4 +-
 .../db/commitlog/EncryptedSegment.java          |  3 -
 .../db/commitlog/FileDirectSegment.java         | 14 +++
 .../db/commitlog/MemoryMappedSegment.java       |  4 +
 .../db/commitlog/PeriodicCommitLogService.java  |  2 +-
 .../cassandra/db/commitlog/CDCTestReplayer.java |  2 +-
 .../commitlog/CommitLogChainedMarkersTest.java  | 98 ++++++++++++++++++++
 .../CommitLogSegmentBackpressureTest.java       |  4 +-
 .../CommitLogSegmentManagerCDCTest.java         |  8 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 10 +-
 .../db/commitlog/CommitLogTestReplayer.java     |  2 +-
 19 files changed, 288 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index ba478e7,3569d36..02cec12
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -358,24 -367,21 +358,24 @@@ counter_cache_save_period: 720
  # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
  # saved_caches_directory: /var/lib/cassandra/saved_caches
  
 -# commitlog_sync may be either "periodic" or "batch." 
 +# commitlog_sync may be either "periodic", "group", or "batch." 
  # 
  # When in batch mode, Cassandra won't ack writes until the commit log
 -# has been fsynced to disk.  It will wait
 -# commitlog_sync_batch_window_in_ms milliseconds between fsyncs.
 -# This window should be kept short because the writer threads will
 -# be unable to do extra work while waiting.  (You may need to increase
 -# concurrent_writes for the same reason.)
 +# has been flushed to disk.  Each incoming write will trigger the flush task.
 +# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had
 +# almost no value, and is being removed.
  #
 -# commitlog_sync: batch
  # commitlog_sync_batch_window_in_ms: 2
  #
 -# the other option is "periodic" where writes may be acked immediately
 +# group mode is similar to batch mode, where Cassandra will not ack writes
 +# until the commit log has been flushed to disk. The difference is group
 +# mode will wait up to commitlog_sync_group_window_in_ms between flushes.
 +#
 +# commitlog_sync_group_window_in_ms: 1000
 +#
 +# the default option is "periodic" where writes may be acked immediately
  # and the CommitLog is simply synced every commitlog_sync_period_in_ms
- # milliseconds. 
+ # milliseconds.
  commitlog_sync: periodic
  commitlog_sync_period_in_ms: 10000
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 1db8217,5fe752e..4fa3bed
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -185,13 -198,9 +185,14 @@@ public class Confi
      public String commitlog_directory;
      public Integer commitlog_total_space_in_mb;
      public CommitLogSync commitlog_sync;
 +
 +    /**
 +     * @deprecated since 4.0 This value was near useless, and we're not using it anymore
 +     */
      public double commitlog_sync_batch_window_in_ms = Double.NaN;
 +    public double commitlog_sync_group_window_in_ms = Double.NaN;
      public int commitlog_sync_period_in_ms;
+     public int commitlog_marker_period_in_ms = 0;
      public int commitlog_segment_size_in_mb = 32;
      public ParameterizedClass commitlog_compression;
      public int commitlog_max_compression_buffers_in_pool = 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 3a57139,7c02892..016dbc1
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -94,9 -89,14 +94,15 @@@ public abstract class CommitLogSegmen
      // Everything before this offset has been synced and written.  The SYNC_MARKER_SIZE bytes after
      // each sync are reserved, and point forwards to the next such offset.  The final
      // sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
 -    private volatile int lastSyncedOffset;
 +    @VisibleForTesting
 +    volatile int lastSyncedOffset;
  
+     /**
+      * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+      * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+      */
+     private volatile int lastMarkerOffset;
+ 
      // The end position of the buffer. Initially set to its capacity and updated to point to the last written position
      // as the segment is being closed.
      // No need to be volatile as writes are protected by appendOrder barrier.
@@@ -316,34 -319,48 +328,50 @@@
          // succeeded in the previous sync.
          assert buffer != null;  // Only close once.
  
-         int startMarker = lastSyncedOffset;
-         // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
-         // the point at which we can safely consider records to have been completely written to.
-         int nextMarker = allocate(SYNC_MARKER_SIZE);
-         if (nextMarker < 0)
+         boolean close = false;
+         int startMarker = lastMarkerOffset;
+         int nextMarker, sectionEnd;
+         if (needToMarkData)
          {
-             // Ensure no more of this CLS is writeable, and mark ourselves for closing.
-             discardUnusedTail();
-             close = true;
- 
-             // We use the buffer size as the synced position after a close instead of the end of the actual data
-             // to make sure we only close the buffer once.
-             // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
-             nextMarker = buffer.capacity();
-         }
+             // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+             // the point at which we can safely consider records to have been completely written to.
+             nextMarker = allocate(SYNC_MARKER_SIZE);
+             if (nextMarker < 0)
+             {
+                 // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+                 discardUnusedTail();
+                 close = true;
+ 
+                 // We use the buffer size as the synced position after a close instead of the end of the actual data
+                 // to make sure we only close the buffer once.
+                 // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+                 nextMarker = buffer.capacity();
+             }
+             // Wait for mutations to complete as well as endOfBuffer to have been written.
+             waitForModifications();
+             sectionEnd = close ? endOfBuffer : nextMarker;
  
-         // Wait for mutations to complete as well as endOfBuffer to have been written.
-         waitForModifications();
-         int sectionEnd = close ? endOfBuffer : nextMarker;
+             // Possibly perform compression or encryption and update the chained markers
+             write(startMarker, sectionEnd);
+             lastMarkerOffset = sectionEnd;
+         }
+         else
+         {
+             // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+             // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+             nextMarker = lastMarkerOffset;
+             sectionEnd = nextMarker;
+         }
  
-         // Possibly perform compression or encryption, writing to file and flush.
-         write(startMarker, sectionEnd);
  
-         if (cdcState == CDCState.CONTAINS)
-             writeCDCIndexFile(descriptor, sectionEnd, close);
+         if (flush || close)
+         {
+             flush(startMarker, sectionEnd);
++            if (cdcState == CDCState.CONTAINS)
++                writeCDCIndexFile(descriptor, sectionEnd, close);
+             lastSyncedOffset = lastMarkerOffset = nextMarker;
+         }
  
-         // Signal the sync as complete.
-         lastSyncedOffset = nextMarker;
          if (close)
              internalClose();
          syncComplete.signalAll();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
index 3695da8,0000000..18bc6e0
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
@@@ -1,76 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.File;
 +import java.io.IOException;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +
 +/**
 + * Utility class that flags the replayer as having seen a CDC mutation and calculates offset but doesn't apply mutations
 + */
 +public class CDCTestReplayer extends CommitLogReplayer
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CDCTestReplayer.class);
 +
 +    public CDCTestReplayer() throws IOException
 +    {
 +        super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        commitLogReader = new CommitLogTestReader();
 +    }
 +
 +    public void examineCommitLog() throws IOException
 +    {
 +        replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
 +    }
 +
 +    private class CommitLogTestReader extends CommitLogReader
 +    {
 +        @Override
 +        protected void readMutation(CommitLogReadHandler handler,
 +                                    byte[] inputBuffer,
 +                                    int size,
 +                                    CommitLogPosition minPosition,
 +                                    final int entryLocation,
 +                                    final CommitLogDescriptor desc) throws IOException
 +        {
 +            RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
 +            Mutation mutation;
 +            try
 +            {
 +                mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
 +                if (mutation.trackedByCDC())
 +                    sawCDCMutation = true;
 +            }
 +            catch (IOException e)
 +            {
 +                // Test fails.
 +                throw new AssertionError(e);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,663e7af..be44ec3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,98 +1,98 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.commitlog;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Random;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+ 
+ /**
+  * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+  * in the commit log segment but do not flush the file to disk.
+  */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+     private static final String KEYSPACE1 = "CommitLogTest";
+     private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+ 
+     @Test
+     @BMRule(name = "force all calls to sync() to not flush to disk",
+     targetClass = "CommitLogSegment",
+     targetMethod = "sync(boolean)",
+     action = "$flush = false")
+     public void replayCommitLogWithoutFlushing() throws IOException
+     {
+         // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
+         DatabaseDescriptor.daemonInitialization();
+         DatabaseDescriptor.setCommitLogSegmentSize(5);
+         DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+         DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+         DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE1,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+ 
+         CompactionManager.instance.disableAutoCompaction();
+ 
+         ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ 
+         byte[] entropy = new byte[1024];
+         new Random().nextBytes(entropy);
 -        final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
++        final Mutation m = new RowUpdateBuilder(cfs1.metadata.get(), 0, "k")
+                            .clustering("bytes")
+                            .add("val", ByteBuffer.wrap(entropy))
+                            .build();
+ 
+         int samples = 10000;
+         for (int i = 0; i < samples; i++)
+             CommitLog.instance.add(m);
+ 
+         CommitLog.instance.sync(false);
+ 
+         ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
+         CommitLogReader reader = new CommitLogReader();
 -        CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++        CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata.get());
+         for (File f : toCheck)
+             reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+ 
+         Assert.assertEquals(samples, testHandler.seenMutationCount());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 80dfd01,68ce57d..7417cd3
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@@ -159,259 -199,6 +159,259 @@@ public class CommitLogSegmentManagerCDC
          }
      }
  
 +    @Test
 +    public void testCDCIndexFileWriteOnSync() throws IOException
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +            .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +            .build().apply();
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +        int syncOffset = currentSegment.lastSyncedOffset;
 +
 +        // Confirm index file is written
 +        File cdcIndexFile = currentSegment.getCDCIndexFile();
 +        Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Read index value and confirm it's == end from last sync
 +        BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
 +        String input = in.readLine();
 +        Integer offset = Integer.parseInt(input);
 +        Assert.assertEquals(syncOffset, (long)offset);
 +        in.close();
 +    }
 +
 +    @Test
 +    public void testCompletedFlag() throws IOException
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +        CommitLogSegment initialSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +        DatabaseDescriptor.setCDCSpaceInMB(8);
 +        try
 +        {
 +            for (int i = 0; i < 1000; i++)
 +            {
 +                new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +                .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +                .build().apply();
 +            }
 +        }
 +        catch (CDCWriteException ce)
 +        {
 +            // pass. Expected since we'll have a file or two linked on restart of CommitLog due to replay
 +        }
 +
 +        CommitLog.instance.forceRecycleAllSegments();
 +
 +        // Confirm index file is written
 +        File cdcIndexFile = initialSegment.getCDCIndexFile();
 +        Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Read index file and confirm second line is COMPLETED
 +        BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
 +        String input = in.readLine();
 +        input = in.readLine();
 +        Assert.assertTrue("Expected COMPLETED in index file, got: " + input, input.equals("COMPLETED"));
 +        in.close();
 +    }
 +
 +    @Test
 +    public void testDeleteLinkOnDiscardNoCDC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;");
 +        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +            .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +            .build().apply();
 +        CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +
 +        // Confirm that, with no CDC data present, we've hard-linked but have no index file
 +        Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
 +        File cdcIndexFile = currentSegment.getCDCIndexFile();
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Sync and confirm no index written as index is written on flush
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Force a full recycle and confirm hard-link is deleted
 +        CommitLog.instance.forceRecycleAllSegments();
 +        CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
 +        Assert.assertFalse("Expected hard link to CLS to be deleted on non-cdc segment: " + linked, Files.exists(linked));
 +    }
 +
 +    @Test
 +    public void testRetainLinkOnDiscardCDC() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +        CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
 +        File cdcIndexFile = currentSegment.getCDCIndexFile();
 +        Assert.assertFalse("Expected no index file before flush but found: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        new RowUpdateBuilder(currentTableMetadata(), 0, 1)
 +            .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +            .build().apply();
 +
 +        Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
 +        // Confirm that, with CDC data present but not yet flushed, we've hard-linked but have no index file
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +
 +        // Sync and confirm index written as index is written on flush
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertTrue("Expected cdc index file after flush but found none: " + cdcIndexFile, cdcIndexFile.exists());
 +
 +        // Force a full recycle and confirm all files remain
 +        CommitLog.instance.forceRecycleAllSegments();
 +        Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
 +        Assert.assertTrue("Expected cdc index file after recycle but found none: " + cdcIndexFile, cdcIndexFile.exists());
 +    }
 +
 +    @Test
 +    public void testReplayLogic() throws IOException
 +    {
 +        // Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
 +        String table_name = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
 +
 +        DatabaseDescriptor.setCDCSpaceInMB(8);
 +        TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata();
 +        try
 +        {
 +            for (int i = 0; i < 1000; i++)
 +            {
 +                new RowUpdateBuilder(ccfm, 0, i)
 +                    .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
 +                    .build().apply();
 +            }
 +            Assert.fail("Expected CDCWriteException from full CDC but did not receive it.");
 +        }
 +        catch (CDCWriteException e)
 +        {
 +            // pass
 +        }
 +
-         CommitLog.instance.sync();
++        CommitLog.instance.sync(true);
 +        CommitLog.instance.stopUnsafe(false);
 +
 +        // Build up a list of expected index files after replay and then clear out cdc_raw
 +        List<CDCIndexData> oldData = parseCDCIndexData();
 +        for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
 +            FileUtils.deleteWithConfirm(f.getAbsolutePath());
 +
 +        try
 +        {
 +            Assert.assertEquals("Expected 0 files in CDC folder after deletion. ",
 +                                0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
 +        }
 +        finally
 +        {
 +            // If we don't have a started commitlog, assertions will cause the test to hang. I assume it's some assumption
 +            // hang in the shutdown on CQLTester trying to clean up / drop keyspaces / tables and hanging applying
 +            // mutations.
 +            CommitLog.instance.start();
 +            CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
 +        }
 +        CDCTestReplayer replayer = new CDCTestReplayer();
 +        replayer.examineCommitLog();
 +
 +        // Rough sanity check -> should be files there now.
 +        Assert.assertTrue("Expected non-zero number of files in CDC folder after restart.",
 +                          new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length > 0);
 +
 +        // Confirm all the old indexes in old are present and >= the original offset, as we flag the entire segment
 +        // as cdc written on a replay.
 +        List<CDCIndexData> newData = parseCDCIndexData();
 +        for (CDCIndexData cid : oldData)
 +        {
 +            boolean found = false;
 +            for (CDCIndexData ncid : newData)
 +            {
 +                if (cid.fileName.equals(ncid.fileName))
 +                {
 +                    Assert.assertTrue("New CDC index file expected to have >= offset in old.", ncid.offset >= cid.offset);
 +                    found = true;
 +                }
 +            }
 +            if (!found)
 +            {
 +                StringBuilder errorMessage = new StringBuilder();
 +                errorMessage.append(String.format("Missing old CDCIndexData in new set after replay: %s\n", cid));
 +                errorMessage.append("List of CDCIndexData in new set of indexes after replay:\n");
 +                for (CDCIndexData ncid : newData)
 +                    errorMessage.append(String.format("   %s\n", ncid));
 +                Assert.fail(errorMessage.toString());
 +            }
 +        }
 +
 +        // And make sure we don't have new CDC Indexes we don't expect
 +        for (CDCIndexData ncid : newData)
 +        {
 +            boolean found = false;
 +            for (CDCIndexData cid : oldData)
 +            {
 +                if (cid.fileName.equals(ncid.fileName))
 +                    found = true;
 +            }
 +            if (!found)
 +                Assert.fail(String.format("Unexpected new CDCIndexData found after replay: %s\n", ncid));
 +        }
 +    }
 +
 +    private List<CDCIndexData> parseCDCIndexData()
 +    {
 +        List<CDCIndexData> results = new ArrayList<>();
 +        try
 +        {
 +            for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
 +            {
 +                if (f.getName().contains("_cdc.idx"))
 +                    results.add(new CDCIndexData(f));
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            Assert.fail(String.format("Failed to parse CDCIndexData: %s", e.getMessage()));
 +        }
 +        return results;
 +    }
 +
 +    private static class CDCIndexData
 +    {
 +        private final String fileName;
 +        private final int offset;
 +
 +        CDCIndexData(File f) throws IOException
 +        {
 +            String line = "";
 +            try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(f))))
 +            {
 +                line = br.readLine();
 +            }
 +            catch (Exception e)
 +            {
 +                throw e;
 +            }
 +            fileName = f.getName();
 +            offset = Integer.parseInt(line);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("%s,%d", fileName, offset);
 +        }
 +
 +        @Override
 +        public boolean equals(Object other)
 +        {
 +            CDCIndexData cid = (CDCIndexData)other;
 +            return fileName.equals(cid.fileName) && offset == cid.offset;
 +        }
 +    }
 +
      private ByteBuffer randomizeBuffer(int size)
      {
          byte[] toWrap = new byte[size];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 8d04ecc,215ad6c..da895a0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -340,9 -359,9 +340,9 @@@ public abstract class CommitLogTes
          assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
          // "Flush": this won't delete anything
 -        UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
 +        TableId id1 = rm.getTableIds().iterator().next();
-         CommitLog.instance.sync();
+         CommitLog.instance.sync(true);
 -        CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
 +        CommitLog.instance.discardCompletedSegments(id1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
  
          assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
  
@@@ -676,9 -696,9 +676,9 @@@
          cellCount += 1;
          CommitLog.instance.add(rm2);
  
-         CommitLog.instance.sync();
+         CommitLog.instance.sync(true);
  
 -        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata());
          List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
          Assert.assertFalse(activeSegments.isEmpty());
  
@@@ -713,9 -733,9 +713,9 @@@
              }
          }
  
-         CommitLog.instance.sync();
+         CommitLog.instance.sync(true);
  
 -        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
 +        SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata());
          List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
          Assert.assertFalse(activeSegments.isEmpty());
  


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org