You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/16 13:55:38 UTC
[2/5] cassandra git commit: Add Change Data Capture
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
deleted file mode 100644
index 17980de..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.zip.CRC32;
-import javax.crypto.Cipher;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
-import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileSegmentInputStream;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.security.EncryptionUtils;
-import org.apache.cassandra.security.EncryptionContext;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE;
-import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
-
-/**
- * Read each sync section of a commit log, iteratively.
- */
-public class SegmentReader implements Iterable<SegmentReader.SyncSegment>
-{
- private final CommitLogDescriptor descriptor;
- private final RandomAccessReader reader;
- private final Segmenter segmenter;
- private final boolean tolerateTruncation;
-
- /**
- * ending position of the current sync section.
- */
- protected int end;
-
- protected SegmentReader(CommitLogDescriptor descriptor, RandomAccessReader reader, boolean tolerateTruncation)
- {
- this.descriptor = descriptor;
- this.reader = reader;
- this.tolerateTruncation = tolerateTruncation;
-
- end = (int) reader.getFilePointer();
- if (descriptor.getEncryptionContext().isEnabled())
- segmenter = new EncryptedSegmenter(reader, descriptor);
- else if (descriptor.compression != null)
- segmenter = new CompressedSegmenter(descriptor, reader);
- else
- segmenter = new NoOpSegmenter(reader);
- }
-
- public Iterator<SyncSegment> iterator()
- {
- return new SegmentIterator();
- }
-
- protected class SegmentIterator extends AbstractIterator<SegmentReader.SyncSegment>
- {
- protected SyncSegment computeNext()
- {
- while (true)
- {
- try
- {
- final int currentStart = end;
- end = readSyncMarker(descriptor, currentStart, reader);
- if (end == -1)
- {
- return endOfData();
- }
- if (end > reader.length())
- {
- // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
- // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
- end = (int) reader.length();
- }
-
- return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
- }
- catch(SegmentReader.SegmentReadException e)
- {
- try
- {
- CommitLogReplayer.handleReplayError(!e.invalidCrc && tolerateTruncation, e.getMessage());
- }
- catch (IOException ioe)
- {
- throw new RuntimeException(ioe);
- }
- }
- catch (IOException e)
- {
- try
- {
- boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
- // if no exception is thrown, the while loop will continue
- CommitLogReplayer.handleReplayError(tolerateErrorsInSection, e.getMessage());
- }
- catch (IOException ioe)
- {
- throw new RuntimeException(ioe);
- }
- }
- }
- }
- }
-
- private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
- {
- if (offset > reader.length() - SYNC_MARKER_SIZE)
- {
- // There was no room in the segment to write a final header. No data could be present here.
- return -1;
- }
- reader.seek(offset);
- CRC32 crc = new CRC32();
- updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
- updateChecksumInt(crc, (int) (descriptor.id >>> 32));
- updateChecksumInt(crc, (int) reader.getPosition());
- final int end = reader.readInt();
- long filecrc = reader.readInt() & 0xffffffffL;
- if (crc.getValue() != filecrc)
- {
- if (end != 0 || filecrc != 0)
- {
- String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " +
- "The end of segment marker should be zero.", offset, reader.getPath());
- throw new SegmentReadException(msg, true);
- }
- return -1;
- }
- else if (end < offset || end > reader.length())
- {
- String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
- throw new SegmentReadException(msg, false);
- }
- return end;
- }
-
- public static class SegmentReadException extends IOException
- {
- public final boolean invalidCrc;
-
- public SegmentReadException(String msg, boolean invalidCrc)
- {
- super(msg);
- this.invalidCrc = invalidCrc;
- }
- }
-
- public static class SyncSegment
- {
- /** the 'buffer' to replay commit log data from */
- public final FileDataInput input;
-
- /** offset in file where this section begins. */
- public final int fileStartPosition;
-
- /** offset in file where this section ends. */
- public final int fileEndPosition;
-
- /** the logical ending position of the buffer */
- public final int endPosition;
-
- public final boolean toleratesErrorsInSection;
-
- public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection)
- {
- this.input = input;
- this.fileStartPosition = fileStartPosition;
- this.fileEndPosition = fileEndPosition;
- this.endPosition = endPosition;
- this.toleratesErrorsInSection = toleratesErrorsInSection;
- }
- }
-
- /**
- * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers.
- */
- interface Segmenter
- {
- /**
- * Get the next section of the commit log to replay.
- *
- * @param startPosition the position in the file to begin reading at
- * @param nextSectionStartPosition the file position of the beginning of the next section
- * @return the buffer and it's logical end position
- * @throws IOException
- */
- SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException;
-
- /**
- * Determine if we tolerate errors in the current segment.
- */
- default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength)
- {
- return segmentEndPosition >= fileLength || segmentEndPosition < 0;
- }
- }
-
- static class NoOpSegmenter implements Segmenter
- {
- private final RandomAccessReader reader;
-
- public NoOpSegmenter(RandomAccessReader reader)
- {
- this.reader = reader;
- }
-
- public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition)
- {
- reader.seek(startPosition);
- return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true);
- }
-
- public boolean tolerateSegmentErrors(int end, long length)
- {
- return true;
- }
- }
-
- static class CompressedSegmenter implements Segmenter
- {
- private final ICompressor compressor;
- private final RandomAccessReader reader;
- private byte[] compressedBuffer;
- private byte[] uncompressedBuffer;
- private long nextLogicalStart;
-
- public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader)
- {
- this(CompressionParams.createCompressor(desc.compression), reader);
- }
-
- public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader)
- {
- this.compressor = compressor;
- this.reader = reader;
- compressedBuffer = new byte[0];
- uncompressedBuffer = new byte[0];
- nextLogicalStart = reader.getFilePointer();
- }
-
- public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException
- {
- reader.seek(startPosition);
- int uncompressedLength = reader.readInt();
-
- int compressedLength = nextSectionStartPosition - (int)reader.getPosition();
- if (compressedLength > compressedBuffer.length)
- compressedBuffer = new byte[(int) (1.2 * compressedLength)];
- reader.readFully(compressedBuffer, 0, compressedLength);
-
- if (uncompressedLength > uncompressedBuffer.length)
- uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
- int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0);
- nextLogicalStart += SYNC_MARKER_SIZE;
- FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart);
- nextLogicalStart += uncompressedLength;
- return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
- }
- }
-
- static class EncryptedSegmenter implements Segmenter
- {
- private final RandomAccessReader reader;
- private final ICompressor compressor;
- private final Cipher cipher;
-
- /**
- * the result of the decryption is written into this buffer.
- */
- private ByteBuffer decryptedBuffer;
-
- /**
- * the result of the decryption is written into this buffer.
- */
- private ByteBuffer uncompressedBuffer;
-
- private final ChunkProvider chunkProvider;
-
- private long currentSegmentEndPosition;
- private long nextLogicalStart;
-
- public EncryptedSegmenter(RandomAccessReader reader, CommitLogDescriptor descriptor)
- {
- this(reader, descriptor.getEncryptionContext());
- }
-
- @VisibleForTesting
- EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext)
- {
- this.reader = reader;
- decryptedBuffer = ByteBuffer.allocate(0);
- compressor = encryptionContext.getCompressor();
- nextLogicalStart = reader.getFilePointer();
-
- try
- {
- cipher = encryptionContext.getDecryptor();
- }
- catch (IOException ioe)
- {
- throw new FSReadError(ioe, reader.getPath());
- }
-
- chunkProvider = () -> {
- if (reader.getFilePointer() >= currentSegmentEndPosition)
- return ByteBufferUtil.EMPTY_BYTE_BUFFER;
- try
- {
- decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher);
- uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor);
- return uncompressedBuffer;
- }
- catch (IOException e)
- {
- throw new FSReadError(e, reader.getPath());
- }
- };
- }
-
- public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException
- {
- int totalPlainTextLength = reader.readInt();
- currentSegmentEndPosition = nextSectionStartPosition - 1;
-
- nextLogicalStart += SYNC_MARKER_SIZE;
- FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider);
- nextLogicalStart += totalPlainTextLength;
- return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
new file mode 100644
index 0000000..1c10c25
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * A very simple Bytebuffer pool with a fixed allocation size and a cached max allocation count. Will allow
+ * you to go past the "max", freeing all buffers allocated beyond the max buffer count on release.
+ *
+ * Has a reusable thread local ByteBuffer that users can make use of.
+ */
+public class SimpleCachedBufferPool
+{
+ protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>()
+ {
+ protected ByteBuffer initialValue()
+ {
+ return ByteBuffer.allocate(0);
+ }
+ };
+
+ private Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
+ private AtomicInteger usedBuffers = new AtomicInteger(0);
+
+ /**
+ * Maximum number of buffers in the compression pool. Any buffers above this count that are allocated will be cleaned
+ * upon release rather than held and re-used.
+ */
+ private final int maxBufferPoolSize;
+
+ /**
+ * Size of individual buffer segments on allocation.
+ */
+ private final int bufferSize;
+
+ public SimpleCachedBufferPool(int maxBufferPoolSize, int bufferSize)
+ {
+ this.maxBufferPoolSize = maxBufferPoolSize;
+ this.bufferSize = bufferSize;
+ }
+
+ public ByteBuffer createBuffer(BufferType bufferType)
+ {
+ usedBuffers.incrementAndGet();
+ ByteBuffer buf = bufferPool.poll();
+ if (buf != null)
+ {
+ buf.clear();
+ return buf;
+ }
+ return bufferType.allocate(bufferSize);
+ }
+
+ public ByteBuffer getThreadLocalReusableBuffer()
+ {
+ return reusableBufferHolder.get();
+ }
+
+ public void setThreadLocalReusableBuffer(ByteBuffer buffer)
+ {
+ reusableBufferHolder.set(buffer);
+ }
+
+ public void releaseBuffer(ByteBuffer buffer)
+ {
+ usedBuffers.decrementAndGet();
+
+ if (bufferPool.size() < maxBufferPoolSize)
+ bufferPool.add(buffer);
+ else
+ FileUtils.clean(buffer);
+ }
+
+ public void shutdown()
+ {
+ bufferPool.clear();
+ }
+
+ public boolean atLimit()
+ {
+ return usedBuffers.get() >= maxBufferPoolSize;
+ }
+
+ @Override
+ public String toString()
+ {
+ return new StringBuilder()
+ .append("SimpleBufferPool:")
+ .append(" bufferCount:").append(usedBuffers.get())
+ .append(", bufferSize:").append(maxBufferPoolSize)
+ .append(", buffer size:").append(bufferSize)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index be1436c..f88877a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -199,7 +199,7 @@ public class Tracker
public void reset()
{
view.set(new View(
- !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
+ !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), cfstore))
: ImmutableList.<Memtable>of(),
ImmutableList.<Memtable>of(),
Collections.<SSTableReader, SSTableReader>emptyMap(),
@@ -293,7 +293,7 @@ public class Tracker
/**
* get the Memtable that the ordered writeOp should be directed to
*/
- public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
+ public Memtable getMemtableFor(OpOrder.Group opGroup, CommitLogPosition commitLogPosition)
{
// since any new memtables appended to the list after we fetch it will be for operations started
// after us, we can safely assume that we will always find the memtable that 'accepts' us;
@@ -304,7 +304,7 @@ public class Tracker
// assign operations to a memtable that was retired/queued before we started)
for (Memtable memtable : view.get().liveMemtables)
{
- if (memtable.accepts(opGroup, replayPosition))
+ if (memtable.accepts(opGroup, commitLogPosition))
return memtable;
}
throw new AssertionError(view.get().liveMemtables.toString());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index 15185f9..e4cdde3 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -28,7 +28,7 @@ import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
@@ -94,7 +94,7 @@ public class TableViews extends AbstractCollection<View>
viewCfs.dumpMemtable();
}
- public void truncateBlocking(ReplayPosition replayAfter, long truncatedAt)
+ public void truncateBlocking(CommitLogPosition replayAfter, long truncatedAt)
{
for (ColumnFamilyStore viewCfs : allViewsCfs())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index bd73733..14bcd58 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.view;
-import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,7 +30,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.repair.SystemDistributedKeyspace;
import org.apache.cassandra.service.StorageService;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index db69f2f..d11e057 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -45,7 +45,6 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4561520..505de49 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -24,7 +24,7 @@ import java.util.*;
import com.google.common.collect.Maps;
import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.Version;
@@ -55,7 +55,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
+ CommitLogPosition.serializer.serialize(stats.commitLogUpperBound, out);
out.writeLong(stats.minTimestamp);
out.writeLong(stats.maxTimestamp);
out.writeInt(stats.maxLocalDeletionTime);
@@ -72,7 +72,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
for (ByteBuffer value : stats.maxClusteringValues)
ByteBufferUtil.writeWithShortLength(value, out);
if (version.hasCommitLogLowerBound())
- ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
+ CommitLogPosition.serializer.serialize(stats.commitLogLowerBound, out);
}
/**
@@ -94,8 +94,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
{
EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
- ReplayPosition commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
+ CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE;
+ CommitLogPosition commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
int maxLocalDeletionTime = in.readInt();
@@ -120,7 +120,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
if (descriptor.version.hasCommitLogLowerBound())
- commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+ commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
if (types.contains(MetadataType.VALIDATION))
components.put(MetadataType.VALIDATION,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index ca50a44..299bc87 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -29,7 +29,7 @@ import com.google.common.collect.Ordering;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.db.rows.Cell;
@@ -66,8 +66,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
{
return new StatsMetadata(defaultPartitionSizeHistogram(),
defaultCellPerPartitionCountHistogram(),
- ReplayPosition.NONE,
- ReplayPosition.NONE,
+ CommitLogPosition.NONE,
+ CommitLogPosition.NONE,
Long.MIN_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
@@ -88,8 +88,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
// TODO: cound the number of row per partition (either with the number of cells, or instead)
protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
- protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
- protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
+ protected CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE;
+ protected CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE;
protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
@@ -123,7 +123,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
{
this(comparator);
- ReplayPosition min = null, max = null;
+ CommitLogPosition min = null, max = null;
for (SSTableReader sstable : sstables)
{
if (min == null)
@@ -226,13 +226,13 @@ public class MetadataCollector implements PartitionStatisticsCollector
ttlTracker.update(newTTL);
}
- public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
+ public MetadataCollector commitLogLowerBound(CommitLogPosition commitLogLowerBound)
{
this.commitLogLowerBound = commitLogLowerBound;
return this;
}
- public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
+ public MetadataCollector commitLogUpperBound(CommitLogPosition commitLogUpperBound)
{
this.commitLogUpperBound = commitLogUpperBound;
return this;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 07e35bb..e765235 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.io.sstable.format.Version;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -42,8 +42,8 @@ public class StatsMetadata extends MetadataComponent
public final EstimatedHistogram estimatedPartitionSize;
public final EstimatedHistogram estimatedColumnCount;
- public final ReplayPosition commitLogLowerBound;
- public final ReplayPosition commitLogUpperBound;
+ public final CommitLogPosition commitLogLowerBound;
+ public final CommitLogPosition commitLogUpperBound;
public final long minTimestamp;
public final long maxTimestamp;
public final int minLocalDeletionTime;
@@ -62,8 +62,8 @@ public class StatsMetadata extends MetadataComponent
public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
EstimatedHistogram estimatedColumnCount,
- ReplayPosition commitLogLowerBound,
- ReplayPosition commitLogUpperBound,
+ CommitLogPosition commitLogLowerBound,
+ CommitLogPosition commitLogUpperBound,
long minTimestamp,
long maxTimestamp,
int minLocalDeletionTime,
@@ -239,7 +239,7 @@ public class StatsMetadata extends MetadataComponent
int size = 0;
size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
- size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound);
+ size += CommitLogPosition.serializer.serializedSize(component.commitLogUpperBound);
if (version.storeRows())
size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
else
@@ -258,7 +258,7 @@ public class StatsMetadata extends MetadataComponent
if (version.storeRows())
size += 8 + 8; // totalColumnsSet, totalRows
if (version.hasCommitLogLowerBound())
- size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound);
+ size += CommitLogPosition.serializer.serializedSize(component.commitLogLowerBound);
return size;
}
@@ -266,7 +266,7 @@ public class StatsMetadata extends MetadataComponent
{
EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
+ CommitLogPosition.serializer.serialize(component.commitLogUpperBound, out);
out.writeLong(component.minTimestamp);
out.writeLong(component.maxTimestamp);
if (version.storeRows())
@@ -296,15 +296,15 @@ public class StatsMetadata extends MetadataComponent
}
if (version.hasCommitLogLowerBound())
- ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
+ CommitLogPosition.serializer.serialize(component.commitLogLowerBound, out);
}
public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
{
EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- ReplayPosition commitLogLowerBound = ReplayPosition.NONE, commitLogUpperBound;
- commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
+ CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE, commitLogUpperBound;
+ commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
long minTimestamp = in.readLong();
long maxTimestamp = in.readLong();
// We use MAX_VALUE as that's the default value for "no deletion time"
@@ -337,7 +337,7 @@ public class StatsMetadata extends MetadataComponent
long totalRows = version.storeRows() ? in.readLong() : -1L;
if (version.hasCommitLogLowerBound())
- commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+ commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
return new StatsMetadata(partitionSizes,
columnCounts,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
index 1da6ed0..08c1c8e 100644
--- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -17,11 +17,10 @@
*/
package org.apache.cassandra.metrics;
-
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import org.apache.cassandra.db.commitlog.AbstractCommitLogService;
-import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -42,14 +41,14 @@ public class CommitLogMetrics
public final Timer waitingOnSegmentAllocation;
/** The time spent waiting on CL sync; for Periodic this is only occurs when the sync is lagging its sync interval */
public final Timer waitingOnCommit;
-
+
public CommitLogMetrics()
{
waitingOnSegmentAllocation = Metrics.timer(factory.createMetricName("WaitingOnSegmentAllocation"));
waitingOnCommit = Metrics.timer(factory.createMetricName("WaitingOnCommit"));
}
- public void attach(final AbstractCommitLogService service, final CommitLogSegmentManager allocator)
+ public void attach(final AbstractCommitLogService service, final AbstractCommitLogSegmentManager segmentManager)
{
completedTasks = Metrics.register(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
{
@@ -69,7 +68,7 @@ public class CommitLogMetrics
{
public Long getValue()
{
- return allocator.onDiskSize();
+ return segmentManager.onDiskSize();
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4dc273a..dd0bb46 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -116,6 +116,7 @@ public final class SchemaKeyspace
+ "min_index_interval int,"
+ "read_repair_chance double,"
+ "speculative_retry text,"
+ + "cdc boolean,"
+ "PRIMARY KEY ((keyspace_name), table_name))");
private static final CFMetaData Columns =
@@ -179,6 +180,7 @@ public final class SchemaKeyspace
+ "min_index_interval int,"
+ "read_repair_chance double,"
+ "speculative_retry text,"
+ + "cdc boolean,"
+ "PRIMARY KEY ((keyspace_name), view_name))");
private static final CFMetaData Indexes =
@@ -508,7 +510,8 @@ public final class SchemaKeyspace
.frozenMap("caching", params.caching.asMap())
.frozenMap("compaction", params.compaction.asMap())
.frozenMap("compression", params.compression.asMap())
- .frozenMap("extensions", params.extensions);
+ .frozenMap("extensions", params.extensions)
+ .add("cdc", params.cdc);
}
public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace,
@@ -986,6 +989,7 @@ public final class SchemaKeyspace
.readRepairChance(row.getDouble("read_repair_chance"))
.crcCheckChance(row.getDouble("crc_check_chance"))
.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
+ .cdc(row.has("cdc") ? row.getBoolean("cdc") : false)
.build();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 7e44e73..16b4427 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -25,6 +25,7 @@ import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.exceptions.ConfigurationException;
+
import static java.lang.String.format;
public final class TableParams
@@ -47,7 +48,8 @@ public final class TableParams
MIN_INDEX_INTERVAL,
READ_REPAIR_CHANCE,
SPECULATIVE_RETRY,
- CRC_CHECK_CHANCE;
+ CRC_CHECK_CHANCE,
+ CDC;
@Override
public String toString()
@@ -81,6 +83,7 @@ public final class TableParams
public final CompactionParams compaction;
public final CompressionParams compression;
public final ImmutableMap<String, ByteBuffer> extensions;
+ public final boolean cdc;
private TableParams(Builder builder)
{
@@ -101,6 +104,7 @@ public final class TableParams
compaction = builder.compaction;
compression = builder.compression;
extensions = builder.extensions;
+ cdc = builder.cdc;
}
public static Builder builder()
@@ -124,7 +128,8 @@ public final class TableParams
.minIndexInterval(params.minIndexInterval)
.readRepairChance(params.readRepairChance)
.speculativeRetry(params.speculativeRetry)
- .extensions(params.extensions);
+ .extensions(params.extensions)
+ .cdc(params.cdc);
}
public void validate()
@@ -212,7 +217,8 @@ public final class TableParams
&& caching.equals(p.caching)
&& compaction.equals(p.compaction)
&& compression.equals(p.compression)
- && extensions.equals(p.extensions);
+ && extensions.equals(p.extensions)
+ && cdc == p.cdc;
}
@Override
@@ -232,7 +238,8 @@ public final class TableParams
caching,
compaction,
compression,
- extensions);
+ extensions,
+ cdc);
}
@Override
@@ -254,6 +261,7 @@ public final class TableParams
.add(Option.COMPACTION.toString(), compaction)
.add(Option.COMPRESSION.toString(), compression)
.add(Option.EXTENSIONS.toString(), extensions)
+ .add(Option.CDC.toString(), cdc)
.toString();
}
@@ -274,6 +282,7 @@ public final class TableParams
private CompactionParams compaction = CompactionParams.DEFAULT;
private CompressionParams compression = CompressionParams.DEFAULT;
private ImmutableMap<String, ByteBuffer> extensions = ImmutableMap.of();
+ private boolean cdc;
public Builder()
{
@@ -368,6 +377,12 @@ public final class TableParams
return this;
}
+ public Builder cdc(boolean val)
+ {
+ cdc = val;
+ return this;
+ }
+
public Builder extensions(Map<String, ByteBuffer> val)
{
extensions = ImmutableMap.copyOf(val);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f1d6bb9..2d21bff 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -310,10 +310,10 @@ public class CassandraDaemon
logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
}
- // replay the log if necessary
+ // Replay any CommitLogSegments found on disk
try
{
- CommitLog.instance.recover();
+ CommitLog.instance.recoverSegmentsOnDisk();
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 081030d..66990c9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -133,6 +133,7 @@ public class StreamReceiveTask extends StreamTask
public void run()
{
boolean hasViews = false;
+ boolean hasCDC = false;
ColumnFamilyStore cfs = null;
try
{
@@ -147,16 +148,22 @@ public class StreamReceiveTask extends StreamTask
}
cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+ hasCDC = cfs.metadata.params.cdc;
Collection<SSTableReader> readers = task.sstables;
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
- //We have a special path for views.
- //Since the view requires cleaning up any pre-existing state, we must put
- //all partitions through the same write path as normal mutations.
- //This also ensures any 2is are also updated
- if (hasViews)
+ /*
+ * We have a special path for views and for CDC.
+ *
+ * For views, since the view requires cleaning up any pre-existing state, we must put all partitions
+ * through the same write path as normal mutations. This also ensures any 2is are also updated.
+ *
+ * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they
+ * can be archived by the CDC process on discard.
+ */
+ if (hasViews || hasCDC)
{
for (SSTableReader reader : readers)
{
@@ -166,8 +173,17 @@ public class StreamReceiveTask extends StreamTask
{
try (UnfilteredRowIterator rowIterator = scanner.next())
{
- //Apply unsafe (we will flush below before transaction is done)
- new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))).applyUnsafe();
+ Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata)));
+
+ // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
+ // before transaction is done.
+ //
+ // If the CFS has CDC, however, these updates need to be written to the CommitLog
+ // so they get archived into the cdc_raw folder
+ if (hasCDC)
+ m.apply();
+ else
+ m.applyUnsafe();
}
}
}
@@ -218,9 +234,9 @@ public class StreamReceiveTask extends StreamTask
}
finally
{
- //We don't keep the streamed sstables since we've applied them manually
- //So we abort the txn and delete the streamed sstables
- if (hasViews)
+ // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete
+ // the streamed sstables.
+ if (hasViews || hasCDC)
{
if (cfs != null)
cfs.forceBlockingFlush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
new file mode 100644
index 0000000..aa7898c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableSet;
+
+import static com.google.common.collect.Sets.newHashSet;
+
+/**
+ * Walks directory recursively, summing up total contents of files within.
+ */
+public class DirectorySizeCalculator extends SimpleFileVisitor<Path>
+{
+ protected final AtomicLong size = new AtomicLong(0);
+ protected Set<String> visited = newHashSet(); //count each file only once
+ protected Set<String> alive = newHashSet();
+ protected final File path;
+
+ public DirectorySizeCalculator(File path)
+ {
+ super();
+ this.path = path;
+ rebuildFileList();
+ }
+
+ public DirectorySizeCalculator(List<File> files)
+ {
+ super();
+ this.path = null;
+ ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ for (File file : files)
+ builder.add(file.getName());
+ alive = builder.build();
+ }
+
+ public boolean isAcceptable(Path file)
+ {
+ return true;
+ }
+
+ public void rebuildFileList()
+ {
+ assert path != null;
+ ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ for (File file : path.listFiles())
+ builder.add(file.getName());
+ size.set(0);
+ alive = builder.build();
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
+ {
+ if (isAcceptable(file))
+ {
+ size.addAndGet(attrs.size());
+ visited.add(file.toFile().getName());
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException
+ {
+ return FileVisitResult.CONTINUE;
+ }
+
+ public long getAllocatedSize()
+ {
+ return size.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 7474a5e..e1a109a 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -81,7 +81,8 @@ public final class JVMStabilityInspector
{
logger.error("Exiting due to error while processing commit log during initialization.", t);
killer.killCurrentJVM(t, true);
- } else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die)
+ }
+ else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die)
killer.killCurrentJVM(t);
else
inspectThrowable(t);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index e7b299b..3458c62 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -226,7 +226,7 @@ public class BufferPool
if (DISABLED)
logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
else
- logger.info("Global buffer pool is enabled, when pool is exahusted (max is {}) it will allocate {}",
+ logger.info("Global buffer pool is enabled, when pool is exhausted (max is {}) it will allocate {}",
FBUtilities.prettyPrintMemory(MEMORY_USAGE_THRESHOLD),
ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/conf/cassandra-murmur.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra-murmur.yaml b/test/conf/cassandra-murmur.yaml
index 00f8b4c..a4b25ba 100644
--- a/test/conf/cassandra-murmur.yaml
+++ b/test/conf/cassandra-murmur.yaml
@@ -8,6 +8,8 @@ commitlog_sync: batch
commitlog_sync_batch_window_in_ms: 1.0
commitlog_segment_size_in_mb: 5
commitlog_directory: build/test/cassandra/commitlog
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
hints_directory: build/test/cassandra/hints
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
listen_address: 127.0.0.1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index eb03d17..cf02634 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -9,6 +9,8 @@ commitlog_sync: batch
commitlog_sync_batch_window_in_ms: 1.0
commitlog_segment_size_in_mb: 5
commitlog_directory: build/test/cassandra/commitlog
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
hints_directory: build/test/cassandra/hints
partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
listen_address: 127.0.0.1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/conf/cdc.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cdc.yaml b/test/conf/cdc.yaml
new file mode 100644
index 0000000..f79930a
--- /dev/null
+++ b/test/conf/cdc.yaml
@@ -0,0 +1 @@
+cdc_enabled: true
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/data/bloom-filter/ka/foo.cql
----------------------------------------------------------------------
diff --git a/test/data/bloom-filter/ka/foo.cql b/test/data/bloom-filter/ka/foo.cql
index c4aed6a..4926e3a 100644
--- a/test/data/bloom-filter/ka/foo.cql
+++ b/test/data/bloom-filter/ka/foo.cql
@@ -59,6 +59,6 @@ Compression ratio: 0.4
Estimated droppable tombstones: 0.0
SSTable Level: 0
Repaired at: 0
-ReplayPosition(segmentId=1428529465658, position=6481)
+CommitLogPosition(segmentId=1428529465658, position=6481)
Estimated tombstone drop times:%n
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 0474b32..04682fd 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -21,41 +21,23 @@ package org.apache.cassandra.db.commitlog;
*
*/
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.util.concurrent.RateLimiter;
+import org.junit.*;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.*;
import org.apache.cassandra.config.Config.CommitLogSync;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.security.EncryptionContext;
@@ -131,7 +113,7 @@ public class CommitLogStressTest
volatile boolean stop = false;
boolean randomSize = false;
boolean discardedRun = false;
- ReplayPosition discardedPos;
+ CommitLogPosition discardedPos;
@BeforeClass
static public void initialize() throws IOException
@@ -151,7 +133,7 @@ public class CommitLogStressTest
}
@Before
- public void cleanDir()
+ public void cleanDir() throws IOException
{
File dir = new File(location);
if (dir.isDirectory())
@@ -217,12 +199,22 @@ public class CommitLogStressTest
{
DatabaseDescriptor.setCommitLogCompression(compression);
DatabaseDescriptor.setEncryptionContext(encryptionContext);
- for (CommitLogSync sync : CommitLogSync.values())
+
+ String originalDir = DatabaseDescriptor.getCommitLogLocation();
+ try
+ {
+ DatabaseDescriptor.setCommitLogLocation(location);
+ for (CommitLogSync sync : CommitLogSync.values())
+ {
+ DatabaseDescriptor.setCommitLogSync(sync);
+ CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start();
+ testLog(commitLog);
+ assert !failed;
+ }
+ }
+ finally
{
- DatabaseDescriptor.setCommitLogSync(sync);
- CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
- testLog(commitLog);
- assert !failed;
+ DatabaseDescriptor.setCommitLogLocation(originalDir);
}
}
@@ -234,12 +226,12 @@ public class CommitLogStressTest
commitLog.executor.getClass().getSimpleName(),
randomSize ? " random size" : "",
discardedRun ? " with discarded run" : "");
- commitLog.allocator.enableReserveSegmentCreation();
+ CommitLog.instance.segmentManager.enableReserveSegmentCreation();
final List<CommitlogThread> threads = new ArrayList<>();
ScheduledExecutorService scheduled = startThreads(commitLog, threads);
- discardedPos = ReplayPosition.NONE;
+ discardedPos = CommitLogPosition.NONE;
if (discardedRun)
{
// Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
@@ -251,13 +243,12 @@ public class CommitLogStressTest
for (CommitlogThread t: threads)
{
t.join();
- if (t.rp.compareTo(discardedPos) > 0)
- discardedPos = t.rp;
+ if (t.clsp.compareTo(discardedPos) > 0)
+ discardedPos = t.clsp;
}
verifySizes(commitLog);
- commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
- discardedPos);
+ commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos);
threads.clear();
System.out.format("Discarded at %s\n", discardedPos);
@@ -285,26 +276,28 @@ public class CommitLogStressTest
System.out.println("Stopped. Replaying... ");
System.out.flush();
- Replayer repl = new Replayer(commitLog);
+ Reader reader = new Reader();
File[] files = new File(location).listFiles();
- repl.recover(files);
+
+ DummyHandler handler = new DummyHandler();
+ reader.readAllFiles(handler, files);
for (File f : files)
if (!f.delete())
Assert.fail("Failed to delete " + f);
- if (hash == repl.hash && cells == repl.cells)
+ if (hash == reader.hash && cells == reader.cells)
System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
commitLog.configuration.getCompressorName(),
commitLog.configuration.useEncryption(),
- repl.discarded, repl.skipped);
+ reader.discarded, reader.skipped);
else
{
System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d - hash %d expected %d.\n",
commitLog.configuration.getCompressorName(),
commitLog.configuration.useEncryption(),
- repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped,
- repl.hash, hash);
+ reader.cells, cells, cells - reader.cells, reader.discarded, reader.skipped,
+ reader.hash, hash);
failed = true;
}
}
@@ -318,16 +311,16 @@ public class CommitLogStressTest
// FIXME: The executor should give us a chance to await completion of the sync we requested.
commitLog.executor.requestExtraSync().awaitUninterruptibly();
// Wait for any pending deletes or segment allocations to complete.
- commitLog.allocator.awaitManagementTasksCompletion();
+ CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
long combinedSize = 0;
- for (File f : new File(commitLog.location).listFiles())
+ for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
combinedSize += f.length();
Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize());
List<String> logFileNames = commitLog.getActiveSegmentNames();
Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios();
- Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments();
+ Collection<CommitLogSegment> segments = CommitLog.instance.segmentManager.getActiveSegments();
for (CommitLogSegment segment : segments)
{
@@ -419,7 +412,7 @@ public class CommitLogStressTest
final CommitLog commitLog;
final Random random;
- volatile ReplayPosition rp;
+ volatile CommitLogPosition clsp;
public CommitlogThread(CommitLog commitLog, Random rand)
{
@@ -448,34 +441,35 @@ public class CommitLogStressTest
dataSize += sz;
}
- rp = commitLog.add(new Mutation(builder.build()));
+ Keyspace ks = Keyspace.open("Keyspace1");
+ clsp = commitLog.add(new Mutation(builder.build()));
counter.incrementAndGet();
}
}
}
- class Replayer extends CommitLogReplayer
+ class Reader extends CommitLogReader
{
- Replayer(CommitLog log)
- {
- super(log, discardedPos, null, ReplayFilter.create());
- }
-
int hash;
int cells;
int discarded;
int skipped;
@Override
- void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+ protected void readMutation(CommitLogReadHandler handler,
+ byte[] inputBuffer,
+ int size,
+ CommitLogPosition minPosition,
+ final int entryLocation,
+ final CommitLogDescriptor desc) throws IOException
{
- if (desc.id < discardedPos.segment)
+ if (desc.id < discardedPos.segmentId)
{
System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
discarded++;
return;
}
- else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
+ else if (desc.id == discardedPos.segmentId && entryLocation <= discardedPos.position)
{
// Skip over this mutation.
skipped++;
@@ -516,4 +510,13 @@ public class CommitLogStressTest
}
}
}
+
+ class DummyHandler implements CommitLogReadHandler
+ {
+ public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException { return false; }
+
+ public void handleUnrecoverableError(CommitLogReadException exception) throws IOException { }
+
+ public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) { }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
new file mode 100644
index 0000000..a653c81
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.DirectorySizeCalculator;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 1)
+@Measurement(iterations = 30)
+@Fork(value = 1,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class DirectorySizerBench
+{
+ private File tempDir;
+ private DirectorySizeCalculator sizer;
+
+ @Setup(Level.Trial)
+ public void setUp() throws IOException
+ {
+ tempDir = Files.createTempDirectory(randString()).toFile();
+
+ // Since #'s on laptops and commodity desktops are so useful in considering enterprise virtualized server environments...
+
+ // Spinning disk 7200rpm 1TB, win10, ntfs, i6600 skylake, 256 files:
+ // [java] Result: 0.581 \u2592(99.9%) 0.003 ms/op [Average]
+ // [java] Statistics: (min, avg, max) = (0.577, 0.581, 0.599), stdev = 0.005
+ // [java] Confidence interval (99.9%): [0.577, 0.584]
+
+ // Same hardware, 25600 files:
+ // [java] Result: 56.990 \u2592(99.9%) 0.374 ms/op [Average]
+ // [java] Statistics: (min, avg, max) = (56.631, 56.990, 59.829), stdev = 0.560
+ // [java] Confidence interval (99.9%): [56.616, 57.364]
+
+ // #'s on a rmbp, 2014, SSD, ubuntu 15.10, ext4, i7-4850HQ @ 2.3, 25600 samples
+ // [java] Result: 74.714 �(99.9%) 0.558 ms/op [Average]
+ // [java] Statistics: (min, avg, max) = (73.687, 74.714, 76.872), stdev = 0.835
+ // [java] Confidence interval (99.9%): [74.156, 75.272]
+
+ // Throttle CPU on the Windows box to .87GHZ from 4.3GHZ turbo single-core, and #'s for 25600:
+ // [java] Result: 298.628 \u2592(99.9%) 14.755 ms/op [Average]
+ // [java] Statistics: (min, avg, max) = (291.245, 298.628, 412.881), stdev = 22.085
+ // [java] Confidence interval (99.9%): [283.873, 313.383]
+
+ // Test w/25,600 files, 100x the load of a full default CommitLog (8192) divided by size (32 per)
+ populateRandomFiles(tempDir, 25600);
+ sizer = new DirectorySizeCalculator(tempDir);
+ }
+
+ @TearDown
+ public void tearDown()
+ {
+ FileUtils.deleteRecursive(tempDir);
+ }
+
+ private void populateRandomFiles(File dir, int count) throws IOException
+ {
+ for (int i = 0; i < count; i++)
+ {
+ PrintWriter pw = new PrintWriter(dir + File.separator + randString(), "UTF-8");
+ pw.write(randString());
+ pw.close();
+ }
+ }
+
+ private String randString()
+ {
+ return UUID.randomUUID().toString();
+ }
+
+ @Benchmark
+ public void countFiles(final Blackhole bh) throws IOException
+ {
+ sizer.rebuildFileList();
+ Files.walkFileTree(tempDir.toPath(), sizer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 3bdb192..0047f48 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -47,17 +47,20 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
{
Config config = super.loadConfig();
+ String sep = File.pathSeparator;
config.rpc_port += offset;
config.native_transport_port += offset;
config.storage_port += offset;
- config.commitlog_directory += File.pathSeparator + offset;
- config.saved_caches_directory += File.pathSeparator + offset;
- config.hints_directory += File.pathSeparator + offset;
- for (int i = 0; i < config.data_file_directories.length; i++)
- config.data_file_directories[i] += File.pathSeparator + offset;
+ config.commitlog_directory += sep + offset;
+ config.saved_caches_directory += sep + offset;
+ config.hints_directory += sep + offset;
+
+ config.cdc_raw_directory += sep + offset;
+ for (int i = 0; i < config.data_file_directories.length; i++)
+ config.data_file_directories[i] += sep + offset;
return config;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
index dd5444f..d4e621f 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -248,7 +248,7 @@ public class BatchlogManagerTest
if (i == 500)
SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2),
timestamp,
- ReplayPosition.NONE);
+ CommitLogPosition.NONE);
// Adjust the timestamp (slightly) to make the test deterministic.
if (i >= 500)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
new file mode 100644
index 0000000..632c290
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CDCStatementTest extends CQLTester
+{
+ @Test
+ public void testEnableOnCreate() throws Throwable
+ {
+ createTable("CREATE TABLE %s (key text, val int, primary key(key)) WITH cdc = true;");
+ Assert.assertTrue(currentTableMetadata().params.cdc);
+ }
+
+ @Test
+ public void testEnableOnAlter() throws Throwable
+ {
+ createTable("CREATE TABLE %s (key text, val int, primary key(key));");
+ Assert.assertFalse(currentTableMetadata().params.cdc);
+ execute("ALTER TABLE %s WITH cdc = true;");
+ Assert.assertTrue(currentTableMetadata().params.cdc);
+ }
+
+ @Test
+ public void testDisableOnAlter() throws Throwable
+ {
+ createTable("CREATE TABLE %s (key text, val int, primary key(key)) WITH cdc = true;");
+ Assert.assertTrue(currentTableMetadata().params.cdc);
+ execute("ALTER TABLE %s WITH cdc = false;");
+ Assert.assertFalse(currentTableMetadata().params.cdc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index bca9e7b..8dac72e 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -193,6 +193,10 @@ public abstract class CQLTester
FileUtils.deleteRecursive(dir);
}
+ File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
+ if (cdcDir.exists())
+ FileUtils.deleteRecursive(cdcDir);
+
cleanupSavedCaches();
// clean up data directory which are stored as data directory/keyspace/data files
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
index 1527b1e..fd7afd9 100644
--- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
@@ -149,7 +149,7 @@ public class OutOfSpaceTest extends CQLTester
// Make sure commit log wasn't discarded.
UUID cfid = currentTableMetadata().cfId;
- for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments())
+ for (CommitLogSegment segment : CommitLog.instance.segmentManager.getActiveSegments())
if (segment.getDirtyCFIDs().contains(cfid))
return;
fail("Expected commit log to remain dirty for the affected table.");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index 33a41d8..8f92403 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -23,7 +23,6 @@ import java.util.UUID;
import org.junit.Test;
-
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.CQLTester;
@@ -36,10 +35,10 @@ import org.apache.cassandra.triggers.ITrigger;
import org.apache.cassandra.utils.ByteBufferUtil;
import static java.lang.String.format;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.fail;
import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
public class CreateTest extends CQLTester
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index 6dafa37..4047cc9 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
public class ReadMessageTest
@@ -56,6 +58,10 @@ public class ReadMessageTest
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
+ CassandraDaemon daemon = new CassandraDaemon();
+ daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+ StorageService.instance.registerDaemon(daemon);
+
CFMetaData cfForReadMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_FOR_READ_TEST)
.addPartitionKey("key", BytesType.instance)
.addClusteringColumn("col1", AsciiType.instance)
@@ -195,7 +201,9 @@ public class ReadMessageTest
Checker checker = new Checker(cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit1")),
cfsnocommit.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit2")));
- CommitLogTestReplayer.examineCommitLog(checker);
+
+ CommitLogTestReplayer replayer = new CommitLogTestReplayer(checker);
+ replayer.examineCommitLog();
assertTrue(checker.commitLogMessageFound);
assertFalse(checker.noCommitLogMessageFound);