You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/12/05 13:12:07 UTC
[6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2402acd4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2402acd4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2402acd4
Branch: refs/heads/trunk
Commit: 2402acd47e3bb514981cde742b7330666c564869
Parents: d274c6a c3a1a4f
Author: Jason Brown <ja...@gmail.com>
Authored: Tue Dec 5 05:10:08 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Tue Dec 5 05:11:21 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 10 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../AbstractCommitLogSegmentManager.java | 8 +-
.../db/commitlog/AbstractCommitLogService.java | 84 ++++++++++++++---
.../cassandra/db/commitlog/CommitLog.java | 4 +-
.../db/commitlog/CommitLogSegment.java | 90 ++++++++++++------
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 3 -
.../db/commitlog/FileDirectSegment.java | 14 +++
.../db/commitlog/MemoryMappedSegment.java | 4 +
.../db/commitlog/PeriodicCommitLogService.java | 2 +-
.../cassandra/db/commitlog/CDCTestReplayer.java | 2 +-
.../commitlog/CommitLogChainedMarkersTest.java | 98 ++++++++++++++++++++
.../CommitLogSegmentBackpressureTest.java | 4 +-
.../CommitLogSegmentManagerCDCTest.java | 8 +-
.../cassandra/db/commitlog/CommitLogTest.java | 10 +-
.../db/commitlog/CommitLogTestReplayer.java | 2 +-
19 files changed, 288 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/conf/cassandra.yaml
----------------------------------------------------------------------
diff --cc conf/cassandra.yaml
index ba478e7,3569d36..02cec12
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -358,24 -367,21 +358,24 @@@ counter_cache_save_period: 720
# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
# saved_caches_directory: /var/lib/cassandra/saved_caches
-# commitlog_sync may be either "periodic" or "batch."
+# commitlog_sync may be either "periodic", "group", or "batch."
#
# When in batch mode, Cassandra won't ack writes until the commit log
-# has been fsynced to disk. It will wait
-# commitlog_sync_batch_window_in_ms milliseconds between fsyncs.
-# This window should be kept short because the writer threads will
-# be unable to do extra work while waiting. (You may need to increase
-# concurrent_writes for the same reason.)
+# has been flushed to disk. Each incoming write will trigger the flush task.
+# commitlog_sync_batch_window_in_ms is a deprecated value. Previously it had
+# almost no value, and is being removed.
#
-# commitlog_sync: batch
# commitlog_sync_batch_window_in_ms: 2
#
-# the other option is "periodic" where writes may be acked immediately
+# group mode is similar to batch mode, where Cassandra will not ack writes
+# until the commit log has been flushed to disk. The difference is group
+# mode will wait up to commitlog_sync_group_window_in_ms between flushes.
+#
+# commitlog_sync_group_window_in_ms: 1000
+#
+# the default option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
- # milliseconds.
+ # milliseconds.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Config.java
index 1db8217,5fe752e..4fa3bed
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -185,13 -198,9 +185,14 @@@ public class Confi
public String commitlog_directory;
public Integer commitlog_total_space_in_mb;
public CommitLogSync commitlog_sync;
+
+ /**
+ * @deprecated since 4.0 This value was near useless, and we're not using it anymore
+ */
public double commitlog_sync_batch_window_in_ms = Double.NaN;
+ public double commitlog_sync_group_window_in_ms = Double.NaN;
public int commitlog_sync_period_in_ms;
+ public int commitlog_marker_period_in_ms = 0;
public int commitlog_segment_size_in_mb = 32;
public ParameterizedClass commitlog_compression;
public int commitlog_max_compression_buffers_in_pool = 3;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 3a57139,7c02892..016dbc1
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -94,9 -89,14 +94,15 @@@ public abstract class CommitLogSegmen
// Everything before this offset has been synced and written. The SYNC_MARKER_SIZE bytes after
// each sync are reserved, and point forwards to the next such offset. The final
// sync marker in a segment will be zeroed out, or point to a position too close to the EOF to fit a marker.
- private volatile int lastSyncedOffset;
+ @VisibleForTesting
+ volatile int lastSyncedOffset;
+ /**
+ * Everything before this offset has it's markers written into the {@link #buffer}, but has not necessarily
+ * been flushed to disk. This value should be greater than or equal to {@link #lastSyncedOffset}.
+ */
+ private volatile int lastMarkerOffset;
+
// The end position of the buffer. Initially set to its capacity and updated to point to the last written position
// as the segment is being closed.
// No need to be volatile as writes are protected by appendOrder barrier.
@@@ -316,34 -319,48 +328,50 @@@
// succeeded in the previous sync.
assert buffer != null; // Only close once.
- int startMarker = lastSyncedOffset;
- // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
- // the point at which we can safely consider records to have been completely written to.
- int nextMarker = allocate(SYNC_MARKER_SIZE);
- if (nextMarker < 0)
+ boolean close = false;
+ int startMarker = lastMarkerOffset;
+ int nextMarker, sectionEnd;
+ if (needToMarkData)
{
- // Ensure no more of this CLS is writeable, and mark ourselves for closing.
- discardUnusedTail();
- close = true;
-
- // We use the buffer size as the synced position after a close instead of the end of the actual data
- // to make sure we only close the buffer once.
- // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
- nextMarker = buffer.capacity();
- }
+ // Allocate a new sync marker; this is both necessary in itself, but also serves to demarcate
+ // the point at which we can safely consider records to have been completely written to.
+ nextMarker = allocate(SYNC_MARKER_SIZE);
+ if (nextMarker < 0)
+ {
+ // Ensure no more of this CLS is writeable, and mark ourselves for closing.
+ discardUnusedTail();
+ close = true;
+
+ // We use the buffer size as the synced position after a close instead of the end of the actual data
+ // to make sure we only close the buffer once.
+ // The endOfBuffer position may be incorrect at this point (to be written by another stalled thread).
+ nextMarker = buffer.capacity();
+ }
+ // Wait for mutations to complete as well as endOfBuffer to have been written.
+ waitForModifications();
+ sectionEnd = close ? endOfBuffer : nextMarker;
- // Wait for mutations to complete as well as endOfBuffer to have been written.
- waitForModifications();
- int sectionEnd = close ? endOfBuffer : nextMarker;
+ // Possibly perform compression or encryption and update the chained markers
+ write(startMarker, sectionEnd);
+ lastMarkerOffset = sectionEnd;
+ }
+ else
+ {
+ // note: we don't need to waitForModifications() as, once we get to this block, we are only doing the flush
+ // and any mutations have already been fully written into the segment (as we wait for it in the previous block).
+ nextMarker = lastMarkerOffset;
+ sectionEnd = nextMarker;
+ }
- // Possibly perform compression or encryption, writing to file and flush.
- write(startMarker, sectionEnd);
- if (cdcState == CDCState.CONTAINS)
- writeCDCIndexFile(descriptor, sectionEnd, close);
+ if (flush || close)
+ {
+ flush(startMarker, sectionEnd);
++ if (cdcState == CDCState.CONTAINS)
++ writeCDCIndexFile(descriptor, sectionEnd, close);
+ lastSyncedOffset = lastMarkerOffset = nextMarker;
+ }
- // Signal the sync as complete.
- lastSyncedOffset = nextMarker;
if (close)
internalClose();
syncComplete.signalAll();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
index 3695da8,0000000..18bc6e0
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
@@@ -1,76 -1,0 +1,76 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+
+/**
+ * Utility class that flags the replayer as having seen a CDC mutation and calculates offset but doesn't apply mutations
+ */
+public class CDCTestReplayer extends CommitLogReplayer
+{
+ private static final Logger logger = LoggerFactory.getLogger(CDCTestReplayer.class);
+
+ public CDCTestReplayer() throws IOException
+ {
+ super(CommitLog.instance, CommitLogPosition.NONE, null, ReplayFilter.create());
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ commitLogReader = new CommitLogTestReader();
+ }
+
+ public void examineCommitLog() throws IOException
+ {
+ replayFiles(new File(DatabaseDescriptor.getCommitLogLocation()).listFiles());
+ }
+
+ private class CommitLogTestReader extends CommitLogReader
+ {
+ @Override
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
+ {
+ RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size);
+ Mutation mutation;
+ try
+ {
+ mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ if (mutation.trackedByCDC())
+ sawCDCMutation = true;
+ }
+ catch (IOException e)
+ {
+ // Test fails.
+ throw new AssertionError(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
index 0000000,663e7af..be44ec3
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogChainedMarkersTest.java
@@@ -1,0 -1,98 +1,98 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.db.commitlog;
+
+ import java.io.File;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Random;
+
+ import org.junit.Assert;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.Mutation;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.db.compaction.CompactionManager;
+ import org.apache.cassandra.db.marshal.AsciiType;
+ import org.apache.cassandra.db.marshal.BytesType;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+ /**
+ * Tests the commitlog to make sure we can replay it - explicitly for the case where we update the chained markers
+ * in the commit log segment but do not flush the file to disk.
+ */
+ @RunWith(BMUnitRunner.class)
+ public class CommitLogChainedMarkersTest
+ {
+ private static final String KEYSPACE1 = "CommitLogTest";
+ private static final String STANDARD1 = "CommitLogChainedMarkersTest";
+
+ @Test
+ @BMRule(name = "force all calls to sync() to not flush to disk",
+ targetClass = "CommitLogSegment",
+ targetMethod = "sync(boolean)",
+ action = "$flush = false")
+ public void replayCommitLogWithoutFlushing() throws IOException
+ {
+ // this method is blend of CommitLogSegmentBackpressureTest & CommitLogReaderTest methods
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setCommitLogSegmentSize(5);
+ DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+ DatabaseDescriptor.setCommitLogSyncPeriod(10000 * 1000);
+ DatabaseDescriptor.setCommitLogMarkerPeriod(1);
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance));
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ byte[] entropy = new byte[1024];
+ new Random().nextBytes(entropy);
- final Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
++ final Mutation m = new RowUpdateBuilder(cfs1.metadata.get(), 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.wrap(entropy))
+ .build();
+
+ int samples = 10000;
+ for (int i = 0; i < samples; i++)
+ CommitLog.instance.add(m);
+
+ CommitLog.instance.sync(false);
+
+ ArrayList<File> toCheck = CommitLogReaderTest.getCommitLogs();
+ CommitLogReader reader = new CommitLogReader();
- CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata);
++ CommitLogReaderTest.TestCLRHandler testHandler = new CommitLogReaderTest.TestCLRHandler(cfs1.metadata.get());
+ for (File f : toCheck)
+ reader.readCommitLogSegment(testHandler, f, CommitLogReader.ALL_MUTATIONS, false);
+
+ Assert.assertEquals(samples, testHandler.seenMutationCount());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentBackpressureTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
index 80dfd01,68ce57d..7417cd3
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDCTest.java
@@@ -159,259 -199,6 +159,259 @@@ public class CommitLogSegmentManagerCDC
}
}
+ @Test
+ public void testCDCIndexFileWriteOnSync() throws IOException
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
+ int syncOffset = currentSegment.lastSyncedOffset;
+
+ // Confirm index file is written
+ File cdcIndexFile = currentSegment.getCDCIndexFile();
+ Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Read index value and confirm it's == end from last sync
+ BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
+ String input = in.readLine();
+ Integer offset = Integer.parseInt(input);
+ Assert.assertEquals(syncOffset, (long)offset);
+ in.close();
+ }
+
+ @Test
+ public void testCompletedFlag() throws IOException
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+ CommitLogSegment initialSegment = CommitLog.instance.segmentManager.allocatingFrom();
+ DatabaseDescriptor.setCDCSpaceInMB(8);
+ try
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+ }
+ }
+ catch (CDCWriteException ce)
+ {
+ // pass. Expected since we'll have a file or two linked on restart of CommitLog due to replay
+ }
+
+ CommitLog.instance.forceRecycleAllSegments();
+
+ // Confirm index file is written
+ File cdcIndexFile = initialSegment.getCDCIndexFile();
+ Assert.assertTrue("Index file not written: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Read index file and confirm second line is COMPLETED
+ BufferedReader in = new BufferedReader(new FileReader(cdcIndexFile));
+ String input = in.readLine();
+ input = in.readLine();
+ Assert.assertTrue("Expected COMPLETED in index file, got: " + input, input.equals("COMPLETED"));
+ in.close();
+ }
+
+ @Test
+ public void testDeleteLinkOnDiscardNoCDC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=false;");
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+ CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
+
+ // Confirm that, with no CDC data present, we've hard-linked but have no index file
+ Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
+ File cdcIndexFile = currentSegment.getCDCIndexFile();
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Sync and confirm no index written as index is written on flush
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertFalse("Expected index file to not be created but found: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Force a full recycle and confirm hard-link is deleted
+ CommitLog.instance.forceRecycleAllSegments();
+ CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+ Assert.assertFalse("Expected hard link to CLS to be deleted on non-cdc segment: " + linked, Files.exists(linked));
+ }
+
+ @Test
+ public void testRetainLinkOnDiscardCDC() throws Throwable
+ {
+ createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+ CommitLogSegment currentSegment = CommitLog.instance.segmentManager.allocatingFrom();
+ File cdcIndexFile = currentSegment.getCDCIndexFile();
+ Assert.assertFalse("Expected no index file before flush but found: " + cdcIndexFile, cdcIndexFile.exists());
+
+ new RowUpdateBuilder(currentTableMetadata(), 0, 1)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+
+ Path linked = new File(DatabaseDescriptor.getCDCLogLocation(), currentSegment.logFile.getName()).toPath();
+ // Confirm that, with CDC data present but not yet flushed, we've hard-linked but have no index file
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+
+ // Sync and confirm index written as index is written on flush
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertTrue("Expected cdc index file after flush but found none: " + cdcIndexFile, cdcIndexFile.exists());
+
+ // Force a full recycle and confirm all files remain
+ CommitLog.instance.forceRecycleAllSegments();
+ Assert.assertTrue("File does not exist: " + linked, Files.exists(linked));
+ Assert.assertTrue("Expected cdc index file after recycle but found none: " + cdcIndexFile, cdcIndexFile.exists());
+ }
+
+ @Test
+ public void testReplayLogic() throws IOException
+ {
+ // Assert.assertEquals(0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
+ String table_name = createTable("CREATE TABLE %s (idx int, data text, primary key(idx)) WITH cdc=true;");
+
+ DatabaseDescriptor.setCDCSpaceInMB(8);
+ TableMetadata ccfm = Keyspace.open(keyspace()).getColumnFamilyStore(table_name).metadata();
+ try
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ new RowUpdateBuilder(ccfm, 0, i)
+ .add("data", randomizeBuffer(DatabaseDescriptor.getCommitLogSegmentSize() / 3))
+ .build().apply();
+ }
+ Assert.fail("Expected CDCWriteException from full CDC but did not receive it.");
+ }
+ catch (CDCWriteException e)
+ {
+ // pass
+ }
+
- CommitLog.instance.sync();
++ CommitLog.instance.sync(true);
+ CommitLog.instance.stopUnsafe(false);
+
+ // Build up a list of expected index files after replay and then clear out cdc_raw
+ List<CDCIndexData> oldData = parseCDCIndexData();
+ for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+ FileUtils.deleteWithConfirm(f.getAbsolutePath());
+
+ try
+ {
+ Assert.assertEquals("Expected 0 files in CDC folder after deletion. ",
+ 0, new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length);
+ }
+ finally
+ {
+ // If we don't have a started commitlog, assertions will cause the test to hang. I assume it's some assumption
+ // hang in the shutdown on CQLTester trying to clean up / drop keyspaces / tables and hanging applying
+ // mutations.
+ CommitLog.instance.start();
+ CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
+ }
+ CDCTestReplayer replayer = new CDCTestReplayer();
+ replayer.examineCommitLog();
+
+ // Rough sanity check -> should be files there now.
+ Assert.assertTrue("Expected non-zero number of files in CDC folder after restart.",
+ new File(DatabaseDescriptor.getCDCLogLocation()).listFiles().length > 0);
+
+ // Confirm all the old indexes in old are present and >= the original offset, as we flag the entire segment
+ // as cdc written on a replay.
+ List<CDCIndexData> newData = parseCDCIndexData();
+ for (CDCIndexData cid : oldData)
+ {
+ boolean found = false;
+ for (CDCIndexData ncid : newData)
+ {
+ if (cid.fileName.equals(ncid.fileName))
+ {
+ Assert.assertTrue("New CDC index file expected to have >= offset in old.", ncid.offset >= cid.offset);
+ found = true;
+ }
+ }
+ if (!found)
+ {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append(String.format("Missing old CDCIndexData in new set after replay: %s\n", cid));
+ errorMessage.append("List of CDCIndexData in new set of indexes after replay:\n");
+ for (CDCIndexData ncid : newData)
+ errorMessage.append(String.format(" %s\n", ncid));
+ Assert.fail(errorMessage.toString());
+ }
+ }
+
+ // And make sure we don't have new CDC Indexes we don't expect
+ for (CDCIndexData ncid : newData)
+ {
+ boolean found = false;
+ for (CDCIndexData cid : oldData)
+ {
+ if (cid.fileName.equals(ncid.fileName))
+ found = true;
+ }
+ if (!found)
+ Assert.fail(String.format("Unexpected new CDCIndexData found after replay: %s\n", ncid));
+ }
+ }
+
+ private List<CDCIndexData> parseCDCIndexData()
+ {
+ List<CDCIndexData> results = new ArrayList<>();
+ try
+ {
+ for (File f : new File(DatabaseDescriptor.getCDCLogLocation()).listFiles())
+ {
+ if (f.getName().contains("_cdc.idx"))
+ results.add(new CDCIndexData(f));
+ }
+ }
+ catch (IOException e)
+ {
+ Assert.fail(String.format("Failed to parse CDCIndexData: %s", e.getMessage()));
+ }
+ return results;
+ }
+
+ private static class CDCIndexData
+ {
+ private final String fileName;
+ private final int offset;
+
+ CDCIndexData(File f) throws IOException
+ {
+ String line = "";
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(f))))
+ {
+ line = br.readLine();
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ fileName = f.getName();
+ offset = Integer.parseInt(line);
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s,%d", fileName, offset);
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ CDCIndexData cid = (CDCIndexData)other;
+ return fileName.equals(cid.fileName) && offset == cid.offset;
+ }
+ }
+
private ByteBuffer randomizeBuffer(int size)
{
byte[] toWrap = new byte[size];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2402acd4/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 8d04ecc,215ad6c..da895a0
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -340,9 -359,9 +340,9 @@@ public abstract class CommitLogTes
assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
// "Flush": this won't delete anything
- UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
+ TableId id1 = rm.getTableIds().iterator().next();
- CommitLog.instance.sync();
+ CommitLog.instance.sync(true);
- CommitLog.instance.discardCompletedSegments(cfid1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
+ CommitLog.instance.discardCompletedSegments(id1, CommitLogPosition.NONE, CommitLog.instance.getCurrentPosition());
assertEquals(1, CommitLog.instance.segmentManager.getActiveSegments().size());
@@@ -676,9 -696,9 +676,9 @@@
cellCount += 1;
CommitLog.instance.add(rm2);
- CommitLog.instance.sync();
+ CommitLog.instance.sync(true);
- SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata);
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, CommitLogPosition.NONE, cfs.metadata());
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
Assert.assertFalse(activeSegments.isEmpty());
@@@ -713,9 -733,9 +713,9 @@@
}
}
- CommitLog.instance.sync();
+ CommitLog.instance.sync(true);
- SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata);
+ SimpleCountingReplayer replayer = new SimpleCountingReplayer(CommitLog.instance, commitLogPosition, cfs.metadata());
List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
Assert.assertFalse(activeSegments.isEmpty());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org