You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2016/06/16 13:55:38 UTC

[2/5] cassandra git commit: Add Change Data Capture

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java b/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
deleted file mode 100644
index 17980de..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/SegmentReader.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.zip.CRC32;
-import javax.crypto.Cipher;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.db.commitlog.EncryptedFileSegmentInputStream.ChunkProvider;
-import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileSegmentInputStream;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.security.EncryptionUtils;
-import org.apache.cassandra.security.EncryptionContext;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.SYNC_MARKER_SIZE;
-import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
-
-/**
- * Read each sync section of a commit log, iteratively.
- */
-public class SegmentReader implements Iterable<SegmentReader.SyncSegment>
-{
-    private final CommitLogDescriptor descriptor;
-    private final RandomAccessReader reader;
-    private final Segmenter segmenter;
-    private final boolean tolerateTruncation;
-
-    /**
-     * ending position of the current sync section.
-     */
-    protected int end;
-
-    protected SegmentReader(CommitLogDescriptor descriptor, RandomAccessReader reader, boolean tolerateTruncation)
-    {
-        this.descriptor = descriptor;
-        this.reader = reader;
-        this.tolerateTruncation = tolerateTruncation;
-
-        end = (int) reader.getFilePointer();
-        if (descriptor.getEncryptionContext().isEnabled())
-            segmenter = new EncryptedSegmenter(reader, descriptor);
-        else if (descriptor.compression != null)
-            segmenter = new CompressedSegmenter(descriptor, reader);
-        else
-            segmenter = new NoOpSegmenter(reader);
-    }
-
-    public Iterator<SyncSegment> iterator()
-    {
-        return new SegmentIterator();
-    }
-
-    protected class SegmentIterator extends AbstractIterator<SegmentReader.SyncSegment>
-    {
-        protected SyncSegment computeNext()
-        {
-            while (true)
-            {
-                try
-                {
-                    final int currentStart = end;
-                    end = readSyncMarker(descriptor, currentStart, reader);
-                    if (end == -1)
-                    {
-                        return endOfData();
-                    }
-                    if (end > reader.length())
-                    {
-                        // the CRC was good (meaning it was good when it was written and still looks legit), but the file is truncated now.
-                        // try to grab and use as much of the file as possible, which might be nothing if the end of the file truly is corrupt
-                        end = (int) reader.length();
-                    }
-
-                    return segmenter.nextSegment(currentStart + SYNC_MARKER_SIZE, end);
-                }
-                catch(SegmentReader.SegmentReadException e)
-                {
-                    try
-                    {
-                        CommitLogReplayer.handleReplayError(!e.invalidCrc && tolerateTruncation, e.getMessage());
-                    }
-                    catch (IOException ioe)
-                    {
-                        throw new RuntimeException(ioe);
-                    }
-                }
-                catch (IOException e)
-                {
-                    try
-                    {
-                        boolean tolerateErrorsInSection = tolerateTruncation & segmenter.tolerateSegmentErrors(end, reader.length());
-                        // if no exception is thrown, the while loop will continue
-                        CommitLogReplayer.handleReplayError(tolerateErrorsInSection, e.getMessage());
-                    }
-                    catch (IOException ioe)
-                    {
-                        throw new RuntimeException(ioe);
-                    }
-                }
-            }
-        }
-    }
-
-    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
-    {
-        if (offset > reader.length() - SYNC_MARKER_SIZE)
-        {
-            // There was no room in the segment to write a final header. No data could be present here.
-            return -1;
-        }
-        reader.seek(offset);
-        CRC32 crc = new CRC32();
-        updateChecksumInt(crc, (int) (descriptor.id & 0xFFFFFFFFL));
-        updateChecksumInt(crc, (int) (descriptor.id >>> 32));
-        updateChecksumInt(crc, (int) reader.getPosition());
-        final int end = reader.readInt();
-        long filecrc = reader.readInt() & 0xffffffffL;
-        if (crc.getValue() != filecrc)
-        {
-            if (end != 0 || filecrc != 0)
-            {
-                String msg = String.format("Encountered bad header at position %d of commit log %s, with invalid CRC. " +
-                             "The end of segment marker should be zero.", offset, reader.getPath());
-                throw new SegmentReadException(msg, true);
-            }
-            return -1;
-        }
-        else if (end < offset || end > reader.length())
-        {
-            String msg = String.format("Encountered bad header at position %d of commit log %s, with bad position but valid CRC", offset, reader.getPath());
-            throw new SegmentReadException(msg, false);
-        }
-        return end;
-    }
-
-    public static class SegmentReadException extends IOException
-    {
-        public final boolean invalidCrc;
-
-        public SegmentReadException(String msg, boolean invalidCrc)
-        {
-            super(msg);
-            this.invalidCrc = invalidCrc;
-        }
-    }
-
-    public static class SyncSegment
-    {
-        /** the 'buffer' to replay commit log data from */
-        public final FileDataInput input;
-
-        /** offset in file where this section begins. */
-        public final int fileStartPosition;
-
-        /** offset in file where this section ends. */
-        public final int fileEndPosition;
-
-        /** the logical ending position of the buffer */
-        public final int endPosition;
-
-        public final boolean toleratesErrorsInSection;
-
-        public SyncSegment(FileDataInput input, int fileStartPosition, int fileEndPosition, int endPosition, boolean toleratesErrorsInSection)
-        {
-            this.input = input;
-            this.fileStartPosition = fileStartPosition;
-            this.fileEndPosition = fileEndPosition;
-            this.endPosition = endPosition;
-            this.toleratesErrorsInSection = toleratesErrorsInSection;
-        }
-    }
-
-    /**
-     * Derives the next section of the commit log to be replayed. Section boundaries are derived from the commit log sync markers.
-     */
-    interface Segmenter
-    {
-        /**
-         * Get the next section of the commit log to replay.
-         *
-         * @param startPosition the position in the file to begin reading at
-         * @param nextSectionStartPosition the file position of the beginning of the next section
-         * @return the buffer and it's logical end position
-         * @throws IOException
-         */
-        SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException;
-
-        /**
-         * Determine if we tolerate errors in the current segment.
-         */
-        default boolean tolerateSegmentErrors(int segmentEndPosition, long fileLength)
-        {
-            return segmentEndPosition >= fileLength || segmentEndPosition < 0;
-        }
-    }
-
-    static class NoOpSegmenter implements Segmenter
-    {
-        private final RandomAccessReader reader;
-
-        public NoOpSegmenter(RandomAccessReader reader)
-        {
-            this.reader = reader;
-        }
-
-        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition)
-        {
-            reader.seek(startPosition);
-            return new SyncSegment(reader, startPosition, nextSectionStartPosition, nextSectionStartPosition, true);
-        }
-
-        public boolean tolerateSegmentErrors(int end, long length)
-        {
-            return true;
-        }
-    }
-
-    static class CompressedSegmenter implements Segmenter
-    {
-        private final ICompressor compressor;
-        private final RandomAccessReader reader;
-        private byte[] compressedBuffer;
-        private byte[] uncompressedBuffer;
-        private long nextLogicalStart;
-
-        public CompressedSegmenter(CommitLogDescriptor desc, RandomAccessReader reader)
-        {
-            this(CompressionParams.createCompressor(desc.compression), reader);
-        }
-
-        public CompressedSegmenter(ICompressor compressor, RandomAccessReader reader)
-        {
-            this.compressor = compressor;
-            this.reader = reader;
-            compressedBuffer = new byte[0];
-            uncompressedBuffer = new byte[0];
-            nextLogicalStart = reader.getFilePointer();
-        }
-
-        public SyncSegment nextSegment(final int startPosition, final int nextSectionStartPosition) throws IOException
-        {
-            reader.seek(startPosition);
-            int uncompressedLength = reader.readInt();
-
-            int compressedLength = nextSectionStartPosition - (int)reader.getPosition();
-            if (compressedLength > compressedBuffer.length)
-                compressedBuffer = new byte[(int) (1.2 * compressedLength)];
-            reader.readFully(compressedBuffer, 0, compressedLength);
-
-            if (uncompressedLength > uncompressedBuffer.length)
-               uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
-            int count = compressor.uncompress(compressedBuffer, 0, compressedLength, uncompressedBuffer, 0);
-            nextLogicalStart += SYNC_MARKER_SIZE;
-            FileDataInput input = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer, 0, count), reader.getPath(), nextLogicalStart);
-            nextLogicalStart += uncompressedLength;
-            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
-        }
-    }
-
-    static class EncryptedSegmenter implements Segmenter
-    {
-        private final RandomAccessReader reader;
-        private final ICompressor compressor;
-        private final Cipher cipher;
-
-        /**
-         * the result of the decryption is written into this buffer.
-         */
-        private ByteBuffer decryptedBuffer;
-
-        /**
-         * the result of the decryption is written into this buffer.
-         */
-        private ByteBuffer uncompressedBuffer;
-
-        private final ChunkProvider chunkProvider;
-
-        private long currentSegmentEndPosition;
-        private long nextLogicalStart;
-
-        public EncryptedSegmenter(RandomAccessReader reader, CommitLogDescriptor descriptor)
-        {
-            this(reader, descriptor.getEncryptionContext());
-        }
-
-        @VisibleForTesting
-        EncryptedSegmenter(final RandomAccessReader reader, EncryptionContext encryptionContext)
-        {
-            this.reader = reader;
-            decryptedBuffer = ByteBuffer.allocate(0);
-            compressor = encryptionContext.getCompressor();
-            nextLogicalStart = reader.getFilePointer();
-
-            try
-            {
-                cipher = encryptionContext.getDecryptor();
-            }
-            catch (IOException ioe)
-            {
-                throw new FSReadError(ioe, reader.getPath());
-            }
-
-            chunkProvider = () -> {
-                if (reader.getFilePointer() >= currentSegmentEndPosition)
-                    return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-                try
-                {
-                    decryptedBuffer = EncryptionUtils.decrypt(reader, decryptedBuffer, true, cipher);
-                    uncompressedBuffer = EncryptionUtils.uncompress(decryptedBuffer, uncompressedBuffer, true, compressor);
-                    return uncompressedBuffer;
-                }
-                catch (IOException e)
-                {
-                    throw new FSReadError(e, reader.getPath());
-                }
-            };
-        }
-
-        public SyncSegment nextSegment(int startPosition, int nextSectionStartPosition) throws IOException
-        {
-            int totalPlainTextLength = reader.readInt();
-            currentSegmentEndPosition = nextSectionStartPosition - 1;
-
-            nextLogicalStart += SYNC_MARKER_SIZE;
-            FileDataInput input = new EncryptedFileSegmentInputStream(reader.getPath(), nextLogicalStart, 0, totalPlainTextLength, chunkProvider);
-            nextLogicalStart += totalPlainTextLength;
-            return new SyncSegment(input, startPosition, nextSectionStartPosition, (int)nextLogicalStart, tolerateSegmentErrors(nextSectionStartPosition, reader.length()));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
new file mode 100644
index 0000000..1c10c25
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.commitlog;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * A very simple Bytebuffer pool with a fixed allocation size and a cached max allocation count. Will allow
+ * you to go past the "max", freeing all buffers allocated beyond the max buffer count on release.
+ *
+ * Has a reusable thread local ByteBuffer that users can make use of.
+ */
+public class SimpleCachedBufferPool
+{
+    protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>()
+    {
+        protected ByteBuffer initialValue()
+        {
+            return ByteBuffer.allocate(0);
+        }
+    };
+
+    private Queue<ByteBuffer> bufferPool = new ConcurrentLinkedQueue<>();
+    private AtomicInteger usedBuffers = new AtomicInteger(0);
+
+    /**
+     * Maximum number of buffers in the compression pool. Any buffers above this count that are allocated will be cleaned
+     * upon release rather than held and re-used.
+     */
+    private final int maxBufferPoolSize;
+
+    /**
+     * Size of individual buffer segments on allocation.
+     */
+    private final int bufferSize;
+
+    public SimpleCachedBufferPool(int maxBufferPoolSize, int bufferSize)
+    {
+        this.maxBufferPoolSize = maxBufferPoolSize;
+        this.bufferSize = bufferSize;
+    }
+
+    public ByteBuffer createBuffer(BufferType bufferType)
+    {
+        usedBuffers.incrementAndGet();
+        ByteBuffer buf = bufferPool.poll();
+        if (buf != null)
+        {
+            buf.clear();
+            return buf;
+        }
+        return bufferType.allocate(bufferSize);
+    }
+
+    public ByteBuffer getThreadLocalReusableBuffer()
+    {
+        return reusableBufferHolder.get();
+    }
+
+    public void setThreadLocalReusableBuffer(ByteBuffer buffer)
+    {
+        reusableBufferHolder.set(buffer);
+    }
+
+    public void releaseBuffer(ByteBuffer buffer)
+    {
+        usedBuffers.decrementAndGet();
+
+        if (bufferPool.size() < maxBufferPoolSize)
+            bufferPool.add(buffer);
+        else
+            FileUtils.clean(buffer);
+    }
+
+    public void shutdown()
+    {
+        bufferPool.clear();
+    }
+
+    public boolean atLimit()
+    {
+        return usedBuffers.get() >= maxBufferPoolSize;
+    }
+
+    @Override
+    public String toString()
+    {
+        return new StringBuilder()
+               .append("SimpleBufferPool:")
+               .append(" bufferCount:").append(usedBuffers.get())
+               .append(", bufferSize:").append(maxBufferPoolSize)
+               .append(", buffer size:").append(bufferSize)
+               .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index be1436c..f88877a 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -199,7 +199,7 @@ public class Tracker
     public void reset()
     {
         view.set(new View(
-                         !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getContext()), cfstore))
+                         !isDummy() ? ImmutableList.of(new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), cfstore))
                                     : ImmutableList.<Memtable>of(),
                          ImmutableList.<Memtable>of(),
                          Collections.<SSTableReader, SSTableReader>emptyMap(),
@@ -293,7 +293,7 @@ public class Tracker
     /**
      * get the Memtable that the ordered writeOp should be directed to
      */
-    public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
+    public Memtable getMemtableFor(OpOrder.Group opGroup, CommitLogPosition commitLogPosition)
     {
         // since any new memtables appended to the list after we fetch it will be for operations started
         // after us, we can safely assume that we will always find the memtable that 'accepts' us;
@@ -304,7 +304,7 @@ public class Tracker
         // assign operations to a memtable that was retired/queued before we started)
         for (Memtable memtable : view.get().liveMemtables)
         {
-            if (memtable.accepts(opGroup, replayPosition))
+            if (memtable.accepts(opGroup, commitLogPosition))
                 return memtable;
         }
         throw new AssertionError(view.get().liveMemtables.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index 15185f9..e4cdde3 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -28,7 +28,7 @@ import com.google.common.collect.PeekingIterator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
@@ -94,7 +94,7 @@ public class TableViews extends AbstractCollection<View>
             viewCfs.dumpMemtable();
     }
 
-    public void truncateBlocking(ReplayPosition replayAfter, long truncatedAt)
+    public void truncateBlocking(CommitLogPosition replayAfter, long truncatedAt)
     {
         for (ColumnFamilyStore viewCfs : allViewsCfs())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index bd73733..14bcd58 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.view;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,7 +30,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.repair.SystemDistributedKeyspace;
 import org.apache.cassandra.service.StorageService;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index db69f2f..d11e057 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -45,7 +45,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 4561520..505de49 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -24,7 +24,7 @@ import java.util.*;
 import com.google.common.collect.Maps;
 
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.Version;
@@ -55,7 +55,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
 
         EstimatedHistogram.serializer.serialize(stats.estimatedPartitionSize, out);
         EstimatedHistogram.serializer.serialize(stats.estimatedColumnCount, out);
-        ReplayPosition.serializer.serialize(stats.commitLogUpperBound, out);
+        CommitLogPosition.serializer.serialize(stats.commitLogUpperBound, out);
         out.writeLong(stats.minTimestamp);
         out.writeLong(stats.maxTimestamp);
         out.writeInt(stats.maxLocalDeletionTime);
@@ -72,7 +72,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
         for (ByteBuffer value : stats.maxClusteringValues)
             ByteBufferUtil.writeWithShortLength(value, out);
         if (version.hasCommitLogLowerBound())
-            ReplayPosition.serializer.serialize(stats.commitLogLowerBound, out);
+            CommitLogPosition.serializer.serialize(stats.commitLogLowerBound, out);
     }
 
     /**
@@ -94,8 +94,8 @@ public class LegacyMetadataSerializer extends MetadataSerializer
             {
                 EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
                 EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-                ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
-                ReplayPosition commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
+                CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE;
+                CommitLogPosition commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
                 long minTimestamp = in.readLong();
                 long maxTimestamp = in.readLong();
                 int maxLocalDeletionTime = in.readInt();
@@ -120,7 +120,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                     maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
 
                 if (descriptor.version.hasCommitLogLowerBound())
-                    commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+                    commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
 
                 if (types.contains(MetadataType.VALIDATION))
                     components.put(MetadataType.VALIDATION,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index ca50a44..299bc87 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -29,7 +29,7 @@ import com.google.common.collect.Ordering;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.db.rows.Cell;
@@ -66,8 +66,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
     {
         return new StatsMetadata(defaultPartitionSizeHistogram(),
                                  defaultCellPerPartitionCountHistogram(),
-                                 ReplayPosition.NONE,
-                                 ReplayPosition.NONE,
+                                 CommitLogPosition.NONE,
+                                 CommitLogPosition.NONE,
                                  Long.MIN_VALUE,
                                  Long.MAX_VALUE,
                                  Integer.MAX_VALUE,
@@ -88,8 +88,8 @@ public class MetadataCollector implements PartitionStatisticsCollector
     protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
     // TODO: cound the number of row per partition (either with the number of cells, or instead)
     protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
-    protected ReplayPosition commitLogLowerBound = ReplayPosition.NONE;
-    protected ReplayPosition commitLogUpperBound = ReplayPosition.NONE;
+    protected CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE;
+    protected CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE;
     protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
     protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
     protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
@@ -123,7 +123,7 @@ public class MetadataCollector implements PartitionStatisticsCollector
     {
         this(comparator);
 
-        ReplayPosition min = null, max = null;
+        CommitLogPosition min = null, max = null;
         for (SSTableReader sstable : sstables)
         {
             if (min == null)
@@ -226,13 +226,13 @@ public class MetadataCollector implements PartitionStatisticsCollector
         ttlTracker.update(newTTL);
     }
 
-    public MetadataCollector commitLogLowerBound(ReplayPosition commitLogLowerBound)
+    public MetadataCollector commitLogLowerBound(CommitLogPosition commitLogLowerBound)
     {
         this.commitLogLowerBound = commitLogLowerBound;
         return this;
     }
 
-    public MetadataCollector commitLogUpperBound(ReplayPosition commitLogUpperBound)
+    public MetadataCollector commitLogUpperBound(CommitLogPosition commitLogUpperBound)
     {
         this.commitLogUpperBound = commitLogUpperBound;
         return this;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 07e35bb..e765235 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -42,8 +42,8 @@ public class StatsMetadata extends MetadataComponent
 
     public final EstimatedHistogram estimatedPartitionSize;
     public final EstimatedHistogram estimatedColumnCount;
-    public final ReplayPosition commitLogLowerBound;
-    public final ReplayPosition commitLogUpperBound;
+    public final CommitLogPosition commitLogLowerBound;
+    public final CommitLogPosition commitLogUpperBound;
     public final long minTimestamp;
     public final long maxTimestamp;
     public final int minLocalDeletionTime;
@@ -62,8 +62,8 @@ public class StatsMetadata extends MetadataComponent
 
     public StatsMetadata(EstimatedHistogram estimatedPartitionSize,
                          EstimatedHistogram estimatedColumnCount,
-                         ReplayPosition commitLogLowerBound,
-                         ReplayPosition commitLogUpperBound,
+                         CommitLogPosition commitLogLowerBound,
+                         CommitLogPosition commitLogUpperBound,
                          long minTimestamp,
                          long maxTimestamp,
                          int minLocalDeletionTime,
@@ -239,7 +239,7 @@ public class StatsMetadata extends MetadataComponent
             int size = 0;
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedPartitionSize);
             size += EstimatedHistogram.serializer.serializedSize(component.estimatedColumnCount);
-            size += ReplayPosition.serializer.serializedSize(component.commitLogUpperBound);
+            size += CommitLogPosition.serializer.serializedSize(component.commitLogUpperBound);
             if (version.storeRows())
                 size += 8 + 8 + 4 + 4 + 4 + 4 + 8 + 8; // mix/max timestamp(long), min/maxLocalDeletionTime(int), min/max TTL, compressionRatio(double), repairedAt (long)
             else
@@ -258,7 +258,7 @@ public class StatsMetadata extends MetadataComponent
             if (version.storeRows())
                 size += 8 + 8; // totalColumnsSet, totalRows
             if (version.hasCommitLogLowerBound())
-                size += ReplayPosition.serializer.serializedSize(component.commitLogLowerBound);
+                size += CommitLogPosition.serializer.serializedSize(component.commitLogLowerBound);
             return size;
         }
 
@@ -266,7 +266,7 @@ public class StatsMetadata extends MetadataComponent
         {
             EstimatedHistogram.serializer.serialize(component.estimatedPartitionSize, out);
             EstimatedHistogram.serializer.serialize(component.estimatedColumnCount, out);
-            ReplayPosition.serializer.serialize(component.commitLogUpperBound, out);
+            CommitLogPosition.serializer.serialize(component.commitLogUpperBound, out);
             out.writeLong(component.minTimestamp);
             out.writeLong(component.maxTimestamp);
             if (version.storeRows())
@@ -296,15 +296,15 @@ public class StatsMetadata extends MetadataComponent
             }
 
             if (version.hasCommitLogLowerBound())
-                ReplayPosition.serializer.serialize(component.commitLogLowerBound, out);
+                CommitLogPosition.serializer.serialize(component.commitLogLowerBound, out);
         }
 
         public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException
         {
             EstimatedHistogram partitionSizes = EstimatedHistogram.serializer.deserialize(in);
             EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
-            ReplayPosition commitLogLowerBound = ReplayPosition.NONE, commitLogUpperBound;
-            commitLogUpperBound = ReplayPosition.serializer.deserialize(in);
+            CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE, commitLogUpperBound;
+            commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
             long minTimestamp = in.readLong();
             long maxTimestamp = in.readLong();
             // We use MAX_VALUE as that's the default value for "no deletion time"
@@ -337,7 +337,7 @@ public class StatsMetadata extends MetadataComponent
             long totalRows = version.storeRows() ? in.readLong() : -1L;
 
             if (version.hasCommitLogLowerBound())
-                commitLogLowerBound = ReplayPosition.serializer.deserialize(in);
+                commitLogLowerBound = CommitLogPosition.serializer.deserialize(in);
 
             return new StatsMetadata(partitionSizes,
                                      columnCounts,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
index 1da6ed0..08c1c8e 100644
--- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -17,11 +17,10 @@
  */
 package org.apache.cassandra.metrics;
 
-
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.db.commitlog.AbstractCommitLogService;
-import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
+import org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
@@ -42,14 +41,14 @@ public class CommitLogMetrics
     public final Timer waitingOnSegmentAllocation;
     /** The time spent waiting on CL sync; for Periodic this is only occurs when the sync is lagging its sync interval */
     public final Timer waitingOnCommit;
-    
+
     public CommitLogMetrics()
     {
         waitingOnSegmentAllocation = Metrics.timer(factory.createMetricName("WaitingOnSegmentAllocation"));
         waitingOnCommit = Metrics.timer(factory.createMetricName("WaitingOnCommit"));
     }
 
-    public void attach(final AbstractCommitLogService service, final CommitLogSegmentManager allocator)
+    public void attach(final AbstractCommitLogService service, final AbstractCommitLogSegmentManager segmentManager)
     {
         completedTasks = Metrics.register(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
         {
@@ -69,7 +68,7 @@ public class CommitLogMetrics
         {
             public Long getValue()
             {
-                return allocator.onDiskSize();
+                return segmentManager.onDiskSize();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4dc273a..dd0bb46 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -116,6 +116,7 @@ public final class SchemaKeyspace
                 + "min_index_interval int,"
                 + "read_repair_chance double,"
                 + "speculative_retry text,"
+                + "cdc boolean,"
                 + "PRIMARY KEY ((keyspace_name), table_name))");
 
     private static final CFMetaData Columns =
@@ -179,6 +180,7 @@ public final class SchemaKeyspace
                 + "min_index_interval int,"
                 + "read_repair_chance double,"
                 + "speculative_retry text,"
+                + "cdc boolean,"
                 + "PRIMARY KEY ((keyspace_name), view_name))");
 
     private static final CFMetaData Indexes =
@@ -508,7 +510,8 @@ public final class SchemaKeyspace
              .frozenMap("caching", params.caching.asMap())
              .frozenMap("compaction", params.compaction.asMap())
              .frozenMap("compression", params.compression.asMap())
-             .frozenMap("extensions", params.extensions);
+             .frozenMap("extensions", params.extensions)
+             .add("cdc", params.cdc);
     }
 
     public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace,
@@ -986,6 +989,7 @@ public final class SchemaKeyspace
                           .readRepairChance(row.getDouble("read_repair_chance"))
                           .crcCheckChance(row.getDouble("crc_check_chance"))
                           .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
+                          .cdc(row.has("cdc") ? row.getBoolean("cdc") : false)
                           .build();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 7e44e73..16b4427 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -25,6 +25,7 @@ import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
+
 import static java.lang.String.format;
 
 public final class TableParams
@@ -47,7 +48,8 @@ public final class TableParams
         MIN_INDEX_INTERVAL,
         READ_REPAIR_CHANCE,
         SPECULATIVE_RETRY,
-        CRC_CHECK_CHANCE;
+        CRC_CHECK_CHANCE,
+        CDC;
 
         @Override
         public String toString()
@@ -81,6 +83,7 @@ public final class TableParams
     public final CompactionParams compaction;
     public final CompressionParams compression;
     public final ImmutableMap<String, ByteBuffer> extensions;
+    public final boolean cdc;
 
     private TableParams(Builder builder)
     {
@@ -101,6 +104,7 @@ public final class TableParams
         compaction = builder.compaction;
         compression = builder.compression;
         extensions = builder.extensions;
+        cdc = builder.cdc;
     }
 
     public static Builder builder()
@@ -124,7 +128,8 @@ public final class TableParams
                             .minIndexInterval(params.minIndexInterval)
                             .readRepairChance(params.readRepairChance)
                             .speculativeRetry(params.speculativeRetry)
-                            .extensions(params.extensions);
+                            .extensions(params.extensions)
+                            .cdc(params.cdc);
     }
 
     public void validate()
@@ -212,7 +217,8 @@ public final class TableParams
             && caching.equals(p.caching)
             && compaction.equals(p.compaction)
             && compression.equals(p.compression)
-            && extensions.equals(p.extensions);
+            && extensions.equals(p.extensions)
+            && cdc == p.cdc;
     }
 
     @Override
@@ -232,7 +238,8 @@ public final class TableParams
                                 caching,
                                 compaction,
                                 compression,
-                                extensions);
+                                extensions,
+                                cdc);
     }
 
     @Override
@@ -254,6 +261,7 @@ public final class TableParams
                           .add(Option.COMPACTION.toString(), compaction)
                           .add(Option.COMPRESSION.toString(), compression)
                           .add(Option.EXTENSIONS.toString(), extensions)
+                          .add(Option.CDC.toString(), cdc)
                           .toString();
     }
 
@@ -274,6 +282,7 @@ public final class TableParams
         private CompactionParams compaction = CompactionParams.DEFAULT;
         private CompressionParams compression = CompressionParams.DEFAULT;
         private ImmutableMap<String, ByteBuffer> extensions = ImmutableMap.of();
+        private boolean cdc;
 
         public Builder()
         {
@@ -368,6 +377,12 @@ public final class TableParams
             return this;
         }
 
+        public Builder cdc(boolean val)
+        {
+            cdc = val;
+            return this;
+        }
+
         public Builder extensions(Map<String, ByteBuffer> val)
         {
             extensions = ImmutableMap.copyOf(val);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index f1d6bb9..2d21bff 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -310,10 +310,10 @@ public class CassandraDaemon
             logger.warn("Unable to start GCInspector (currently only supported on the Sun JVM)");
         }
 
-        // replay the log if necessary
+        // Replay any CommitLogSegments found on disk
         try
         {
-            CommitLog.instance.recover();
+            CommitLog.instance.recoverSegmentsOnDisk();
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 081030d..66990c9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -133,6 +133,7 @@ public class StreamReceiveTask extends StreamTask
         public void run()
         {
             boolean hasViews = false;
+            boolean hasCDC = false;
             ColumnFamilyStore cfs = null;
             try
             {
@@ -147,16 +148,22 @@ public class StreamReceiveTask extends StreamTask
                 }
                 cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
                 hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
+                hasCDC = cfs.metadata.params.cdc;
 
                 Collection<SSTableReader> readers = task.sstables;
 
                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
-                    //We have a special path for views.
-                    //Since the view requires cleaning up any pre-existing state, we must put
-                    //all partitions through the same write path as normal mutations.
-                    //This also ensures any 2is are also updated
-                    if (hasViews)
+                    /*
+                     * We have a special path for views and for CDC.
+                     *
+                     * For views, since the view requires cleaning up any pre-existing state, we must put all partitions
+                     * through the same write path as normal mutations. This also ensures any 2is are also updated.
+                     *
+                     * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they
+                     * can be archived by the CDC process on discard.
+                     */
+                    if (hasViews || hasCDC)
                     {
                         for (SSTableReader reader : readers)
                         {
@@ -166,8 +173,17 @@ public class StreamReceiveTask extends StreamTask
                                 {
                                     try (UnfilteredRowIterator rowIterator = scanner.next())
                                     {
-                                        //Apply unsafe (we will flush below before transaction is done)
-                                        new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))).applyUnsafe();
+                                        Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata)));
+
+                                        // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below
+                                        // before transaction is done.
+                                        //
+                                        // If the CFS has CDC, however, these updates need to be written to the CommitLog
+                                        // so they get archived into the cdc_raw folder
+                                        if (hasCDC)
+                                            m.apply();
+                                        else
+                                            m.applyUnsafe();
                                     }
                                 }
                             }
@@ -218,9 +234,9 @@ public class StreamReceiveTask extends StreamTask
             }
             finally
             {
-                //We don't keep the streamed sstables since we've applied them manually
-                //So we abort the txn and delete the streamed sstables
-                if (hasViews)
+                // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete
+                // the streamed sstables.
+                if (hasViews || hasCDC)
                 {
                     if (cfs != null)
                         cfs.forceBlockingFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
new file mode 100644
index 0000000..aa7898c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableSet;
+
+import static com.google.common.collect.Sets.newHashSet;
+
+/**
+ * Walks directory recursively, summing up total contents of files within.
+ */
+public class DirectorySizeCalculator extends SimpleFileVisitor<Path>
+{
+    protected final AtomicLong size = new AtomicLong(0);
+    protected Set<String> visited = newHashSet(); //count each file only once
+    protected Set<String> alive = newHashSet();
+    protected final File path;
+
+    public DirectorySizeCalculator(File path)
+    {
+        super();
+        this.path = path;
+        rebuildFileList();
+    }
+
+    public DirectorySizeCalculator(List<File> files)
+    {
+        super();
+        this.path = null;
+        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+        for (File file : files)
+            builder.add(file.getName());
+        alive = builder.build();
+    }
+
+    public boolean isAcceptable(Path file)
+    {
+        return true;
+    }
+
+    public void rebuildFileList()
+    {
+        assert path != null;
+        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+        for (File file : path.listFiles())
+            builder.add(file.getName());
+        size.set(0);
+        alive = builder.build();
+    }
+
+    @Override
+    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException
+    {
+        if (isAcceptable(file))
+        {
+            size.addAndGet(attrs.size());
+            visited.add(file.toFile().getName());
+        }
+        return FileVisitResult.CONTINUE;
+    }
+
+    @Override
+    public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException
+    {
+        return FileVisitResult.CONTINUE;
+    }
+
+    public long getAllocatedSize()
+    {
+        return size.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index 7474a5e..e1a109a 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -81,7 +81,8 @@ public final class JVMStabilityInspector
         {
             logger.error("Exiting due to error while processing commit log during initialization.", t);
             killer.killCurrentJVM(t, true);
-        } else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die)
+        }
+        else if (DatabaseDescriptor.getCommitFailurePolicy() == Config.CommitFailurePolicy.die)
             killer.killCurrentJVM(t);
         else
             inspectThrowable(t);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index e7b299b..3458c62 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -226,7 +226,7 @@ public class BufferPool
             if (DISABLED)
                 logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
             else
-                logger.info("Global buffer pool is enabled, when pool is exahusted (max is {}) it will allocate {}",
+                logger.info("Global buffer pool is enabled, when pool is exhausted (max is {}) it will allocate {}",
                             FBUtilities.prettyPrintMemory(MEMORY_USAGE_THRESHOLD),
                             ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/conf/cassandra-murmur.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra-murmur.yaml b/test/conf/cassandra-murmur.yaml
index 00f8b4c..a4b25ba 100644
--- a/test/conf/cassandra-murmur.yaml
+++ b/test/conf/cassandra-murmur.yaml
@@ -8,6 +8,8 @@ commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size_in_mb: 5
 commitlog_directory: build/test/cassandra/commitlog
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
 hints_directory: build/test/cassandra/hints
 partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 listen_address: 127.0.0.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index eb03d17..cf02634 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -9,6 +9,8 @@ commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size_in_mb: 5
 commitlog_directory: build/test/cassandra/commitlog
+cdc_raw_directory: build/test/cassandra/cdc_raw
+cdc_enabled: false
 hints_directory: build/test/cassandra/hints
 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/conf/cdc.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cdc.yaml b/test/conf/cdc.yaml
new file mode 100644
index 0000000..f79930a
--- /dev/null
+++ b/test/conf/cdc.yaml
@@ -0,0 +1 @@
+cdc_enabled: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/data/bloom-filter/ka/foo.cql
----------------------------------------------------------------------
diff --git a/test/data/bloom-filter/ka/foo.cql b/test/data/bloom-filter/ka/foo.cql
index c4aed6a..4926e3a 100644
--- a/test/data/bloom-filter/ka/foo.cql
+++ b/test/data/bloom-filter/ka/foo.cql
@@ -59,6 +59,6 @@ Compression ratio: 0.4
 Estimated droppable tombstones: 0.0
 SSTable Level: 0
 Repaired at: 0
-ReplayPosition(segmentId=1428529465658, position=6481)
+CommitLogPosition(segmentId=1428529465658, position=6481)
 Estimated tombstone drop times:%n
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 0474b32..04682fd 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -21,41 +21,23 @@ package org.apache.cassandra.db.commitlog;
  *
  */
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.RateLimiter;
+import org.junit.*;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.*;
 import org.apache.cassandra.config.Config.CommitLogSync;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.security.EncryptionContext;
@@ -131,7 +113,7 @@ public class CommitLogStressTest
     volatile boolean stop = false;
     boolean randomSize = false;
     boolean discardedRun = false;
-    ReplayPosition discardedPos;
+    CommitLogPosition discardedPos;
 
     @BeforeClass
     static public void initialize() throws IOException
@@ -151,7 +133,7 @@ public class CommitLogStressTest
     }
 
     @Before
-    public void cleanDir()
+    public void cleanDir() throws IOException
     {
         File dir = new File(location);
         if (dir.isDirectory())
@@ -217,12 +199,22 @@ public class CommitLogStressTest
     {
         DatabaseDescriptor.setCommitLogCompression(compression);
         DatabaseDescriptor.setEncryptionContext(encryptionContext);
-        for (CommitLogSync sync : CommitLogSync.values())
+
+        String originalDir = DatabaseDescriptor.getCommitLogLocation();
+        try
+        {
+            DatabaseDescriptor.setCommitLogLocation(location);
+            for (CommitLogSync sync : CommitLogSync.values())
+            {
+                DatabaseDescriptor.setCommitLogSync(sync);
+                CommitLog commitLog = new CommitLog(CommitLogArchiver.disabled()).start();
+                testLog(commitLog);
+                assert !failed;
+            }
+        }
+        finally
         {
-            DatabaseDescriptor.setCommitLogSync(sync);
-            CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
-            testLog(commitLog);
-            assert !failed;
+            DatabaseDescriptor.setCommitLogLocation(originalDir);
         }
     }
 
@@ -234,12 +226,12 @@ public class CommitLogStressTest
                            commitLog.executor.getClass().getSimpleName(),
                            randomSize ? " random size" : "",
                            discardedRun ? " with discarded run" : "");
-        commitLog.allocator.enableReserveSegmentCreation();
+        CommitLog.instance.segmentManager.enableReserveSegmentCreation();
         
         final List<CommitlogThread> threads = new ArrayList<>();
         ScheduledExecutorService scheduled = startThreads(commitLog, threads);
 
-        discardedPos = ReplayPosition.NONE;
+        discardedPos = CommitLogPosition.NONE;
         if (discardedRun)
         {
             // Makes sure post-break data is not deleted, and that replayer correctly rejects earlier mutations.
@@ -251,13 +243,12 @@ public class CommitLogStressTest
             for (CommitlogThread t: threads)
             {
                 t.join();
-                if (t.rp.compareTo(discardedPos) > 0)
-                    discardedPos = t.rp;
+                if (t.clsp.compareTo(discardedPos) > 0)
+                    discardedPos = t.clsp;
             }
             verifySizes(commitLog);
 
-            commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId,
-                                               discardedPos);
+            commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1", "Standard1").cfId, discardedPos);
             threads.clear();
 
             System.out.format("Discarded at %s\n", discardedPos);
@@ -285,26 +276,28 @@ public class CommitLogStressTest
 
         System.out.println("Stopped. Replaying... ");
         System.out.flush();
-        Replayer repl = new Replayer(commitLog);
+        Reader reader = new Reader();
         File[] files = new File(location).listFiles();
-        repl.recover(files);
+
+        DummyHandler handler = new DummyHandler();
+        reader.readAllFiles(handler, files);
 
         for (File f : files)
             if (!f.delete())
                 Assert.fail("Failed to delete " + f);
 
-        if (hash == repl.hash && cells == repl.cells)
+        if (hash == reader.hash && cells == reader.cells)
             System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
                               commitLog.configuration.getCompressorName(),
                               commitLog.configuration.useEncryption(),
-                              repl.discarded, repl.skipped);
+                              reader.discarded, reader.skipped);
         else
         {
             System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d -  hash %d expected %d.\n",
                               commitLog.configuration.getCompressorName(),
                               commitLog.configuration.useEncryption(),
-                              repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped,
-                              repl.hash, hash);
+                              reader.cells, cells, cells - reader.cells, reader.discarded, reader.skipped,
+                              reader.hash, hash);
             failed = true;
         }
     }
@@ -318,16 +311,16 @@ public class CommitLogStressTest
         // FIXME: The executor should give us a chance to await completion of the sync we requested.
         commitLog.executor.requestExtraSync().awaitUninterruptibly();
         // Wait for any pending deletes or segment allocations to complete.
-        commitLog.allocator.awaitManagementTasksCompletion();
+        CommitLog.instance.segmentManager.awaitManagementTasksCompletion();
 
         long combinedSize = 0;
-        for (File f : new File(commitLog.location).listFiles())
+        for (File f : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
             combinedSize += f.length();
         Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize());
 
         List<String> logFileNames = commitLog.getActiveSegmentNames();
         Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios();
-        Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments();
+        Collection<CommitLogSegment> segments = CommitLog.instance.segmentManager.getActiveSegments();
 
         for (CommitLogSegment segment : segments)
         {
@@ -419,7 +412,7 @@ public class CommitLogStressTest
         final CommitLog commitLog;
         final Random random;
 
-        volatile ReplayPosition rp;
+        volatile CommitLogPosition clsp;
 
         public CommitlogThread(CommitLog commitLog, Random rand)
         {
@@ -448,34 +441,35 @@ public class CommitLogStressTest
                     dataSize += sz;
                 }
 
-                rp = commitLog.add(new Mutation(builder.build()));
+                Keyspace ks = Keyspace.open("Keyspace1");
+                clsp = commitLog.add(new Mutation(builder.build()));
                 counter.incrementAndGet();
             }
         }
     }
 
-    class Replayer extends CommitLogReplayer
+    class Reader extends CommitLogReader
     {
-        Replayer(CommitLog log)
-        {
-            super(log, discardedPos, null, ReplayFilter.create());
-        }
-
         int hash;
         int cells;
         int discarded;
         int skipped;
 
         @Override
-        void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc)
+        protected void readMutation(CommitLogReadHandler handler,
+                                    byte[] inputBuffer,
+                                    int size,
+                                    CommitLogPosition minPosition,
+                                    final int entryLocation,
+                                    final CommitLogDescriptor desc) throws IOException
         {
-            if (desc.id < discardedPos.segment)
+            if (desc.id < discardedPos.segmentId)
             {
                 System.out.format("Mutation from discarded segment, segment %d pos %d\n", desc.id, entryLocation);
                 discarded++;
                 return;
             }
-            else if (desc.id == discardedPos.segment && entryLocation <= discardedPos.position)
+            else if (desc.id == discardedPos.segmentId && entryLocation <= discardedPos.position)
             {
                 // Skip over this mutation.
                 skipped++;
@@ -516,4 +510,13 @@ public class CommitLogStressTest
             }
         }
     }
+
+    class DummyHandler implements CommitLogReadHandler
+    {
+        public boolean shouldSkipSegmentOnError(CommitLogReadException exception) throws IOException { return false; }
+
+        public void handleUnrecoverableError(CommitLogReadException exception) throws IOException { }
+
+        public void handleMutation(Mutation m, int size, int entryLocation, CommitLogDescriptor desc) { }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
new file mode 100644
index 0000000..a653c81
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.DirectorySizeCalculator;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 1)
+@Measurement(iterations = 30)
+@Fork(value = 1,jvmArgsAppend = "-Xmx512M")
+@Threads(1)
+@State(Scope.Benchmark)
+public class DirectorySizerBench
+{
+    private File tempDir;
+    private DirectorySizeCalculator sizer;
+
+    @Setup(Level.Trial)
+    public void setUp() throws IOException
+    {
+        tempDir = Files.createTempDirectory(randString()).toFile();
+
+        // Since #'s on laptops and commodity desktops are so useful in considering enterprise virtualized server environments...
+
+        // Spinning disk 7200rpm 1TB, win10, ntfs, i6600 skylake, 256 files:
+        // [java] Result: 0.581 \u2592(99.9%) 0.003 ms/op [Average]
+        // [java]   Statistics: (min, avg, max) = (0.577, 0.581, 0.599), stdev = 0.005
+        // [java]   Confidence interval (99.9%): [0.577, 0.584]
+
+        // Same hardware, 25600 files:
+        // [java] Result: 56.990 \u2592(99.9%) 0.374 ms/op [Average]
+        // [java]   Statistics: (min, avg, max) = (56.631, 56.990, 59.829), stdev = 0.560
+        // [java]   Confidence interval (99.9%): [56.616, 57.364]
+
+        // #'s on a rmbp, 2014, SSD, ubuntu 15.10, ext4, i7-4850HQ @ 2.3, 25600 samples
+        // [java] Result: 74.714 �(99.9%) 0.558 ms/op [Average]
+        // [java]   Statistics: (min, avg, max) = (73.687, 74.714, 76.872), stdev = 0.835
+        // [java]   Confidence interval (99.9%): [74.156, 75.272]
+
+        // Throttle CPU on the Windows box to .87GHZ from 4.3GHZ turbo single-core, and #'s for 25600:
+        // [java] Result: 298.628 \u2592(99.9%) 14.755 ms/op [Average]
+        // [java]   Statistics: (min, avg, max) = (291.245, 298.628, 412.881), stdev = 22.085
+        // [java]   Confidence interval (99.9%): [283.873, 313.383]
+
+        // Test w/25,600 files, 100x the load of a full default CommitLog (8192) divided by size (32 per)
+        populateRandomFiles(tempDir, 25600);
+        sizer = new DirectorySizeCalculator(tempDir);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+        FileUtils.deleteRecursive(tempDir);
+    }
+
+    private void populateRandomFiles(File dir, int count) throws IOException
+    {
+        for (int i = 0; i < count; i++)
+        {
+            PrintWriter pw = new PrintWriter(dir + File.separator + randString(), "UTF-8");
+            pw.write(randString());
+            pw.close();
+        }
+    }
+
+    private String randString()
+    {
+        return UUID.randomUUID().toString();
+    }
+
+    @Benchmark
+    public void countFiles(final Blackhole bh) throws IOException
+    {
+        sizer.rebuildFileList();
+        Files.walkFileTree(tempDir.toPath(), sizer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
index 3bdb192..0047f48 100644
--- a/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
+++ b/test/unit/org/apache/cassandra/OffsetAwareConfigurationLoader.java
@@ -47,17 +47,20 @@ public class OffsetAwareConfigurationLoader extends YamlConfigurationLoader
     {
         Config config = super.loadConfig();
 
+        String sep = File.pathSeparator;
 
         config.rpc_port += offset;
         config.native_transport_port += offset;
         config.storage_port += offset;
 
-        config.commitlog_directory += File.pathSeparator + offset;
-        config.saved_caches_directory += File.pathSeparator + offset;
-        config.hints_directory += File.pathSeparator + offset;
-        for (int i = 0; i < config.data_file_directories.length; i++)
-            config.data_file_directories[i] += File.pathSeparator + offset;
+        config.commitlog_directory += sep + offset;
+        config.saved_caches_directory += sep + offset;
+        config.hints_directory += sep + offset;
+
+        config.cdc_raw_directory += sep + offset;
 
+        for (int i = 0; i < config.data_file_directories.length; i++)
+            config.data_file_directories[i] += sep + offset;
 
         return config;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
index dd5444f..d4e621f 100644
--- a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -40,7 +40,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -248,7 +248,7 @@ public class BatchlogManagerTest
             if (i == 500)
                 SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2),
                                                     timestamp,
-                                                    ReplayPosition.NONE);
+                                                    CommitLogPosition.NONE);
 
             // Adjust the timestamp (slightly) to make the test deterministic.
             if (i >= 500)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
new file mode 100644
index 0000000..632c290
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/CDCStatementTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CDCStatementTest extends CQLTester
+{
+    @Test
+    public void testEnableOnCreate() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key text, val int, primary key(key)) WITH cdc = true;");
+        Assert.assertTrue(currentTableMetadata().params.cdc);
+    }
+
+    @Test
+    public void testEnableOnAlter() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key text, val int, primary key(key));");
+        Assert.assertFalse(currentTableMetadata().params.cdc);
+        execute("ALTER TABLE %s WITH cdc = true;");
+        Assert.assertTrue(currentTableMetadata().params.cdc);
+    }
+
+    @Test
+    public void testDisableOnAlter() throws Throwable
+    {
+        createTable("CREATE TABLE %s (key text, val int, primary key(key)) WITH cdc = true;");
+        Assert.assertTrue(currentTableMetadata().params.cdc);
+        execute("ALTER TABLE %s WITH cdc = false;");
+        Assert.assertFalse(currentTableMetadata().params.cdc);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index bca9e7b..8dac72e 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -193,6 +193,10 @@ public abstract class CQLTester
             FileUtils.deleteRecursive(dir);
         }
 
+        File cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
+        if (cdcDir.exists())
+            FileUtils.deleteRecursive(cdcDir);
+
         cleanupSavedCaches();
 
         // clean up data directory which are stored as data directory/keyspace/data files

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
index 1527b1e..fd7afd9 100644
--- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
@@ -149,7 +149,7 @@ public class OutOfSpaceTest extends CQLTester
 
         // Make sure commit log wasn't discarded.
         UUID cfid = currentTableMetadata().cfId;
-        for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments())
+        for (CommitLogSegment segment : CommitLog.instance.segmentManager.getActiveSegments())
             if (segment.getDirtyCFIDs().contains(cfid))
                 return;
         fail("Expected commit log to remain dirty for the affected table.");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index 33a41d8..8f92403 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -23,7 +23,6 @@ import java.util.UUID;
 
 import org.junit.Test;
 
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.CQLTester;
@@ -36,10 +35,10 @@ import org.apache.cassandra.triggers.ITrigger;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static java.lang.String.format;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.fail;
 import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
 import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
 
 public class CreateTest extends CQLTester
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dcab286/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index 6dafa37..4047cc9 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ReadMessageTest
@@ -56,6 +58,10 @@ public class ReadMessageTest
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+
         CFMetaData cfForReadMetadata = CFMetaData.Builder.create(KEYSPACE1, CF_FOR_READ_TEST)
                                                             .addPartitionKey("key", BytesType.instance)
                                                             .addClusteringColumn("col1", AsciiType.instance)
@@ -195,7 +201,9 @@ public class ReadMessageTest
 
         Checker checker = new Checker(cfs.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit1")),
                                       cfsnocommit.metadata.getColumnDefinition(ByteBufferUtil.bytes("commit2")));
-        CommitLogTestReplayer.examineCommitLog(checker);
+
+        CommitLogTestReplayer replayer = new CommitLogTestReplayer(checker);
+        replayer.examineCommitLog();
 
         assertTrue(checker.commitLogMessageFound);
         assertFalse(checker.noCommitLogMessageFound);