You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/02 09:51:55 UTC

[3/4] cassandra git commit: Improve batchlog write path

Improve batchlog write path

patch by Stefania Alborghetti; reviewed by Aleksey Yeschenko for
CASSANDRA-9673


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53a177a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53a177a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53a177a9

Branch: refs/heads/trunk
Commit: 53a177a9150586e56408f25c959f75110a2997e7
Parents: 5f02f20
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Jul 10 17:03:06 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 2 08:43:42 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   1 -
 .../org/apache/cassandra/batchlog/Batch.java    | 155 +++++
 .../batchlog/BatchRemoveVerbHandler.java        |  31 +
 .../batchlog/BatchStoreVerbHandler.java         |  32 +
 .../cassandra/batchlog/BatchlogManager.java     | 554 +++++++++++++++++
 .../batchlog/BatchlogManagerMBean.java          |  38 ++
 .../batchlog/LegacyBatchlogMigrator.java        | 196 ++++++
 .../org/apache/cassandra/concurrent/Stage.java  |   2 -
 .../cassandra/concurrent/StageManager.java      |   1 -
 .../org/apache/cassandra/config/Config.java     |   1 -
 .../cassandra/config/DatabaseDescriptor.java    |   7 +-
 .../cql3/statements/BatchStatement.java         |   5 -
 .../apache/cassandra/db/BatchlogManager.java    | 596 -------------------
 .../cassandra/db/BatchlogManagerMBean.java      |  38 --
 .../db/CounterMutationVerbHandler.java          |   2 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   7 +-
 .../cassandra/db/MutationVerbHandler.java       |  43 +-
 .../cassandra/db/ReadRepairVerbHandler.java     |   3 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../org/apache/cassandra/db/WriteResponse.java  |  18 +-
 .../cassandra/hints/EncodedHintMessage.java     |   6 +-
 src/java/org/apache/cassandra/hints/Hint.java   |  11 +-
 .../org/apache/cassandra/hints/HintMessage.java |  14 +-
 .../cassandra/hints/LegacyHintsMigrator.java    |   2 +-
 .../apache/cassandra/net/MessagingService.java  |  26 +-
 .../cassandra/service/CassandraDaemon.java      |   6 +-
 .../apache/cassandra/service/StorageProxy.java  | 208 ++++---
 .../cassandra/service/StorageService.java       |  19 +-
 .../service/paxos/CommitVerbHandler.java        |   6 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   4 +-
 .../cql3/MaterializedViewLongTest.java          |   2 +-
 .../apache/cassandra/batchlog/BatchTest.java    | 153 +++++
 .../batchlog/BatchlogEndpointFilterTest.java    | 115 ++++
 .../cassandra/batchlog/BatchlogManagerTest.java | 460 ++++++++++++++
 .../cassandra/db/BatchlogManagerTest.java       | 394 ------------
 .../service/BatchlogEndpointFilterTest.java     | 117 ----
 37 files changed, 1945 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fe8f453..751b75d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Improve batchlog write patch (CASSANDRA-9673)
  * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
  * Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
  * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 16108bd..0f8b829 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -355,7 +355,6 @@ seed_provider:
 concurrent_reads: 32
 concurrent_writes: 32
 concurrent_counter_writes: 32
-concurrent_batchlog_writes: 32
 
 # For materialized view writes, as there is a read involved, so this should
 # be limited by the less of concurrent reads or concurrent writes.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/Batch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
new file mode 100644
index 0000000..caa2682
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofVInt;
+
+public final class Batch
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final UUID id;
+    public final long creationTime; // time of batch creation (in microseconds)
+
+    // one of these will always be empty
+    final Collection<Mutation> decodedMutations;
+    final Collection<ByteBuffer> encodedMutations;
+
+    private Batch(UUID id, long creationTime, Collection<Mutation> decodedMutations, Collection<ByteBuffer> encodedMutations)
+    {
+        this.id = id;
+        this.creationTime = creationTime;
+
+        this.decodedMutations = decodedMutations;
+        this.encodedMutations = encodedMutations;
+    }
+
+    /**
+     * Creates a 'local' batch - with all enclosed mutations in decoded form (as Mutation instances)
+     */
+    public static Batch createLocal(UUID id, long creationTime, Collection<Mutation> mutations)
+    {
+        return new Batch(id, creationTime, mutations, Collections.emptyList());
+    }
+
+    /**
+     * Creates a 'remote' batch - with all enclosed mutations in encoded form (as ByteBuffer instances)
+     *
+     * The mutations will always be encoded using the current messaging version.
+     */
+    public static Batch createRemote(UUID id, long creationTime, Collection<ByteBuffer> mutations)
+    {
+        return new Batch(id, creationTime, Collections.<Mutation>emptyList(), mutations);
+    }
+
+    /**
+     * Count of the mutations in the batch.
+     */
+    public int size()
+    {
+        return decodedMutations.size() + encodedMutations.size();
+    }
+
+    static final class Serializer implements IVersionedSerializer<Batch>
+    {
+        public long serializedSize(Batch batch, int version)
+        {
+            assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+            long size = UUIDSerializer.serializer.serializedSize(batch.id, version);
+            size += sizeof(batch.creationTime);
+
+            size += sizeofVInt(batch.decodedMutations.size());
+            for (Mutation mutation : batch.decodedMutations)
+            {
+                int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+                size += sizeofVInt(mutationSize);
+                size += mutationSize;
+            }
+
+            return size;
+        }
+
+        public void serialize(Batch batch, DataOutputPlus out, int version) throws IOException
+        {
+            assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+            UUIDSerializer.serializer.serialize(batch.id, out, version);
+            out.writeLong(batch.creationTime);
+
+            out.writeVInt(batch.decodedMutations.size());
+            for (Mutation mutation : batch.decodedMutations)
+            {
+                out.writeVInt(Mutation.serializer.serializedSize(mutation, version));
+                Mutation.serializer.serialize(mutation, out, version);
+            }
+        }
+
+        public Batch deserialize(DataInputPlus in, int version) throws IOException
+        {
+            UUID id = UUIDSerializer.serializer.deserialize(in, version);
+            long creationTime = in.readLong();
+
+            /*
+             * If version doesn't match the current one, we cannot not just read the encoded mutations verbatim,
+             * so we decode them instead, to deal with compatibility.
+             */
+            return version == MessagingService.current_version
+                 ? createRemote(id, creationTime, readEncodedMutations(in))
+                 : createLocal(id, creationTime, decodeMutations(in, version));
+        }
+
+        private static Collection<ByteBuffer> readEncodedMutations(DataInputPlus in) throws IOException
+        {
+            int count = (int) in.readVInt();
+
+            ArrayList<ByteBuffer> mutations = new ArrayList<>(count);
+            for (int i = 0; i < count; i++)
+                mutations.add(ByteBufferUtil.readWithVIntLength(in));
+
+            return mutations;
+        }
+
+        private static Collection<Mutation> decodeMutations(DataInputPlus in, int version) throws IOException
+        {
+            int count = (int) in.readVInt();
+
+            ArrayList<Mutation> mutations = new ArrayList<>(count);
+            for (int i = 0; i < count; i++)
+            {
+                in.readVInt(); // skip mutation size
+                mutations.add(Mutation.serializer.deserialize(in, version));
+            }
+
+            return mutations;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
new file mode 100644
index 0000000..3c3fcec
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.util.UUID;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+
+public final class BatchRemoveVerbHandler implements IVerbHandler<UUID>
+{
+    public void doVerb(MessageIn<UUID> message, int id)
+    {
+        BatchlogManager.remove(message.payload);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
new file mode 100644
index 0000000..4bc878c
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import org.apache.cassandra.db.WriteResponse;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+public final class BatchStoreVerbHandler implements IVerbHandler<Batch>
+{
+    public void doVerb(MessageIn<Batch> message, int id)
+    {
+        BatchlogManager.store(message.payload);
+        MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
new file mode 100644
index 0000000..934ebaa
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static com.google.common.collect.Iterables.transform;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
+
+public class BatchlogManager implements BatchlogManagerMBean
+{
+    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
+    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
+    static final int DEFAULT_PAGE_SIZE = 128;
+
+    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
+    public static final BatchlogManager instance = new BatchlogManager();
+
+    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
+    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
+
+    // Single-thread executor service for scheduling and serializing log replay.
+    private final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
+
+    public void start()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
+                                             StorageService.RING_DELAY,
+                                             REPLAY_INTERVAL,
+                                             TimeUnit.MILLISECONDS);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+        batchlogTasks.shutdown();
+        batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
+    public static void remove(UUID id)
+    {
+        new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
+                                                         UUIDType.instance.decompose(id),
+                                                         FBUtilities.timestampMicros(),
+                                                         FBUtilities.nowInSeconds()))
+            .apply();
+    }
+
+    public static void store(Batch batch)
+    {
+        store(batch, true);
+    }
+
+    public static void store(Batch batch, boolean durableWrites)
+    {
+        RowUpdateBuilder builder =
+            new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
+                .clustering()
+                .add("version", MessagingService.current_version);
+
+        for (ByteBuffer mutation : batch.encodedMutations)
+            builder.addListEntry("mutations", mutation);
+
+        for (Mutation mutation : batch.decodedMutations)
+        {
+            try (DataOutputBuffer buffer = new DataOutputBuffer())
+            {
+                Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
+                builder.addListEntry("mutations", buffer.buffer());
+            }
+            catch (IOException e)
+            {
+                // shouldn't happen
+                throw new AssertionError(e);
+            }
+        }
+
+        builder.build().apply(durableWrites);
+    }
+
+    @VisibleForTesting
+    public int countAllBatches()
+    {
+        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
+        UntypedResultSet results = executeInternal(query);
+        if (results == null || results.isEmpty())
+            return 0;
+
+        return (int) results.one().getLong("count");
+    }
+
+    public long getTotalBatchesReplayed()
+    {
+        return totalBatchesReplayed;
+    }
+
+    public void forceBatchlogReplay() throws Exception
+    {
+        startBatchlogReplay().get();
+    }
+
+    public Future<?> startBatchlogReplay()
+    {
+        // If a replay is already in progress this request will be executed after it completes.
+        return batchlogTasks.submit(this::replayFailedBatches);
+    }
+
+    void performInitialReplay() throws InterruptedException, ExecutionException
+    {
+        // Invokes initial replay. Used for testing only.
+        batchlogTasks.submit(this::replayFailedBatches).get();
+    }
+
+    private void replayFailedBatches()
+    {
+        logger.debug("Started replayFailedBatches");
+
+        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
+        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
+        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
+        int pageSize = calculatePageSize(store);
+        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
+        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
+        // token(id) > token(lastReplayedUuid) as part of the query.
+        String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.BATCHES);
+        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
+        processBatchlogEntries(batches, pageSize, rateLimiter);
+        lastReplayedUuid = limitUuid;
+        logger.debug("Finished replayFailedBatches");
+    }
+
+    // read less rows (batches) per page if they are very large
+    static int calculatePageSize(ColumnFamilyStore store)
+    {
+        double averageRowSize = store.getMeanPartitionSize();
+        if (averageRowSize <= 0)
+            return DEFAULT_PAGE_SIZE;
+
+        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
+    }
+
+    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
+    {
+        int positionInPage = 0;
+        ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
+
+        Set<InetAddress> hintedNodes = new HashSet<>();
+        Set<UUID> replayedBatches = new HashSet<>();
+
+        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
+        for (UntypedResultSet.Row row : batches)
+        {
+            UUID id = row.getUUID("id");
+            int version = row.getInt("version");
+            try
+            {
+                ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
+                if (batch.replay(rateLimiter, hintedNodes) > 0)
+                {
+                    unfinishedBatches.add(batch);
+                }
+                else
+                {
+                    remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
+                    ++totalBatchesReplayed;
+                }
+            }
+            catch (IOException e)
+            {
+                logger.warn("Skipped batch replay of {} due to {}", id, e);
+                remove(id);
+            }
+
+            if (++positionInPage == pageSize)
+            {
+                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
+                // finish processing the page before requesting the next row.
+                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+                positionInPage = 0;
+            }
+        }
+
+        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+
+        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
+        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
+
+        // once all generated hints are fsynced, actually delete the batches
+        replayedBatches.forEach(BatchlogManager::remove);
+    }
+
+    private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
+    {
+        // schedule hints for timed out deliveries
+        for (ReplayingBatch batch : batches)
+        {
+            batch.finish(hintedNodes);
+            replayedBatches.add(batch.id);
+        }
+
+        totalBatchesReplayed += batches.size();
+        batches.clear();
+    }
+
+    public static long getBatchlogTimeout()
+    {
+        return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
+    }
+
+    private static class ReplayingBatch
+    {
+        private final UUID id;
+        private final long writtenAt;
+        private final List<Mutation> mutations;
+        private final int replayedBytes;
+
+        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
+
+        ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
+        {
+            this.id = id;
+            this.writtenAt = UUIDGen.unixTimestamp(id);
+            this.mutations = new ArrayList<>(serializedMutations.size());
+            this.replayedBytes = addMutations(version, serializedMutations);
+        }
+
+        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
+        {
+            logger.debug("Replaying batch {}", id);
+
+            if (mutations.isEmpty())
+                return 0;
+
+            int gcgs = gcgs(mutations);
+            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+                return 0;
+
+            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
+
+            rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
+
+            return replayHandlers.size();
+        }
+
+        public void finish(Set<InetAddress> hintedNodes)
+        {
+            for (int i = 0; i < replayHandlers.size(); i++)
+            {
+                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+                try
+                {
+                    handler.get();
+                }
+                catch (WriteTimeoutException|WriteFailureException e)
+                {
+                    logger.debug("Failed replaying a batched mutation to a node, will write a hint");
+                    logger.debug("Failure was : {}", e.getMessage());
+                    // writing hints for the rest to hints, starting from i
+                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
+                    return;
+                }
+            }
+        }
+
+        private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
+        {
+            int ret = 0;
+            for (ByteBuffer serializedMutation : serializedMutations)
+            {
+                ret += serializedMutation.remaining();
+                try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
+                {
+                    addMutation(Mutation.serializer.deserialize(in, version));
+                }
+            }
+
+            return ret;
+        }
+
+        // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+        // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
+        // truncated.
+        private void addMutation(Mutation mutation)
+        {
+            for (UUID cfId : mutation.getColumnFamilyIds())
+                if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+                    mutation = mutation.without(cfId);
+
+            if (!mutation.isEmpty())
+                mutations.add(mutation);
+        }
+
+        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
+        {
+            int gcgs = gcgs(mutations);
+
+            // expired
+            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+                return;
+
+            for (int i = startFrom; i < replayHandlers.size(); i++)
+            {
+                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+                Mutation undeliveredMutation = mutations.get(i);
+
+                if (handler != null)
+                {
+                    hintedNodes.addAll(handler.undelivered);
+                    HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+                                                Hint.create(undeliveredMutation, writtenAt));
+                }
+            }
+        }
+
+        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
+                                                                              long writtenAt,
+                                                                              Set<InetAddress> hintedNodes)
+        {
+            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
+            for (Mutation mutation : mutations)
+            {
+                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
+                if (handler != null)
+                    handlers.add(handler);
+            }
+            return handlers;
+        }
+
+        /**
+         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+         * when a replica is down or a write request times out.
+         *
+         * @return direct delivery handler to wait on or null, if no live nodes found
+         */
+        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
+                                                                                     long writtenAt,
+                                                                                     Set<InetAddress> hintedNodes)
+        {
+            Set<InetAddress> liveEndpoints = new HashSet<>();
+            String ks = mutation.getKeyspaceName();
+            Token tk = mutation.key().getToken();
+
+            for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+                                                         StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+            {
+                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                {
+                    mutation.apply();
+                }
+                else if (FailureDetector.instance.isAlive(endpoint))
+                {
+                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+                }
+                else
+                {
+                    hintedNodes.add(endpoint);
+                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+                                                Hint.create(mutation, writtenAt));
+                }
+            }
+
+            if (liveEndpoints.isEmpty())
+                return null;
+
+            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
+            MessageOut<Mutation> message = mutation.createMessage();
+            for (InetAddress endpoint : liveEndpoints)
+                MessagingService.instance().sendRR(message, endpoint, handler, false);
+            return handler;
+        }
+
+        private static int gcgs(Collection<Mutation> mutations)
+        {
+            int gcgs = Integer.MAX_VALUE;
+            for (Mutation mutation : mutations)
+                gcgs = Math.min(gcgs, mutation.smallestGCGS());
+            return gcgs;
+        }
+
+        /**
+         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
+         * which we did not receive a successful reply.
+         */
+        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
+        {
+            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+            {
+                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+                undelivered.addAll(writeEndpoints);
+            }
+
+            @Override
+            protected int totalBlockFor()
+            {
+                return this.naturalEndpoints.size();
+            }
+
+            @Override
+            public void response(MessageIn<T> m)
+            {
+                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
+                assert removed;
+                super.response(m);
+            }
+        }
+    }
+
+    public static class EndpointFilter
+    {
+        private final String localRack;
+        private final Multimap<String, InetAddress> endpoints;
+
+        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+        {
+            this.localRack = localRack;
+            this.endpoints = endpoints;
+        }
+
+        /**
+         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+         */
+        public Collection<InetAddress> filter()
+        {
+            // special case for single-node data centers
+            if (endpoints.values().size() == 1)
+                return endpoints.values();
+
+            // strip out dead endpoints and localhost
+            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+                if (isValid(entry.getValue()))
+                    validated.put(entry.getKey(), entry.getValue());
+
+            if (validated.size() <= 2)
+                return validated.values();
+
+            if (validated.size() - validated.get(localRack).size() >= 2)
+            {
+                // we have enough endpoints in other racks
+                validated.removeAll(localRack);
+            }
+
+            if (validated.keySet().size() == 1)
+            {
+                // we have only 1 `other` rack
+                Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+                return Lists.newArrayList(Iterables.limit(otherRack, 2));
+            }
+
+            // randomize which racks we pick from if more than 2 remaining
+            Collection<String> racks;
+            if (validated.keySet().size() == 2)
+            {
+                racks = validated.keySet();
+            }
+            else
+            {
+                racks = Lists.newArrayList(validated.keySet());
+                Collections.shuffle((List<String>) racks);
+            }
+
+            // grab a random member of up to two racks
+            List<InetAddress> result = new ArrayList<>(2);
+            for (String rack : Iterables.limit(racks, 2))
+            {
+                List<InetAddress> rackMembers = validated.get(rack);
+                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+            }
+
+            return result;
+        }
+
+        @VisibleForTesting
+        protected boolean isValid(InetAddress input)
+        {
+            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+        }
+
+        @VisibleForTesting
+        protected int getRandomInt(int bound)
+        {
+            return ThreadLocalRandom.current().nextInt(bound);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
new file mode 100644
index 0000000..4dcc9f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+public interface BatchlogManagerMBean
+{
+    /**
+     * Counts all batches currently in the batchlog.
+     *
+     * @return total batch count
+     */
+    public int countAllBatches();
+
+    /**
+     * @return total count of batches replayed since node start
+     */
+    public long getTotalBatchesReplayed();
+
+    /**
+     * Forces batchlog replay. Returns immediately if replay is already in progress.
+     */
+    public void forceBatchlogReplay() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
new file mode 100644
index 0000000..13ff81a
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class LegacyBatchlogMigrator
+{
+    private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class);
+
+    private LegacyBatchlogMigrator()
+    {
+        // static class
+    }
+
+    @SuppressWarnings("deprecation")
+    public static void migrate()
+    {
+        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG);
+
+        // nothing to migrate
+        if (store.isEmpty())
+            return;
+
+        logger.info("Migrating legacy batchlog to new storage");
+
+        int convertedBatches = 0;
+        String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.LEGACY_BATCHLOG);
+
+        int pageSize = BatchlogManager.calculatePageSize(store);
+
+        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize);
+        for (UntypedResultSet.Row row : rows)
+        {
+            if (apply(row, convertedBatches))
+                convertedBatches++;
+        }
+
+        if (convertedBatches > 0)
+            Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
+    }
+
+    @SuppressWarnings("deprecation")
+    public static boolean isLegacyBatchlogMutation(Mutation mutation)
+    {
+        return mutation.getKeyspaceName().equals(SystemKeyspace.NAME)
+            && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null;
+    }
+
+    @SuppressWarnings("deprecation")
+    public static void handleLegacyMutation(Mutation mutation)
+    {
+        PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId);
+        logger.debug("Applying legacy batchlog mutation {}", update);
+        update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1));
+    }
+
+    private static boolean apply(UntypedResultSet.Row row, long counter)
+    {
+        UUID id = row.getUUID("id");
+        long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at");
+        int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+
+        if (id.version() != 1)
+            id = UUIDGen.getTimeUUID(timestamp, counter);
+
+        logger.debug("Converting mutation at {}", timestamp);
+
+        try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false))
+        {
+            int numMutations = in.readInt();
+            List<Mutation> mutations = new ArrayList<>(numMutations);
+            for (int i = 0; i < numMutations; i++)
+                mutations.add(Mutation.serializer.deserialize(in, version));
+
+            BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations));
+            return true;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t);
+            return false;
+        }
+    }
+
+    public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
+    throws WriteTimeoutException, WriteFailureException
+    {
+        for (InetAddress target : endpoints)
+        {
+            logger.debug("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
+            int targetVersion = MessagingService.instance().getVersion(target);
+            MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION),
+                                               target,
+                                               handler,
+                                               false);
+        }
+    }
+
+    public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+    {
+        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
+                                                                                     Collections.<InetAddress>emptyList(),
+                                                                                     ConsistencyLevel.ANY,
+                                                                                     Keyspace.open(SystemKeyspace.NAME),
+                                                                                     null,
+                                                                                     WriteType.SIMPLE);
+        Mutation mutation = getRemoveMutation(uuid);
+
+        for (InetAddress target : endpoints)
+        {
+            logger.debug("Sending legacy batchlog remove request {} to {}", uuid, target);
+            MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false);
+        }
+    }
+
+    static void store(Batch batch, int version)
+    {
+        getStoreMutation(batch, version).apply();
+    }
+
+    @SuppressWarnings("deprecation")
+    static Mutation getStoreMutation(Batch batch, int version)
+    {
+        return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id)
+               .clustering()
+               .add("written_at", new Date(batch.creationTime / 1000))
+               .add("data", getSerializedMutations(version, batch.decodedMutations))
+               .add("version", version)
+               .build();
+    }
+
+    @SuppressWarnings("deprecation")
+    private static Mutation getRemoveMutation(UUID uuid)
+    {
+        return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog,
+                                                                UUIDType.instance.decompose(uuid),
+                                                                FBUtilities.timestampMicros(),
+                                                                FBUtilities.nowInSeconds()));
+    }
+
+    private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations)
+    {
+        try (DataOutputBuffer buf = new DataOutputBuffer())
+        {
+            buf.writeInt(mutations.size());
+            for (Mutation mutation : mutations)
+                Mutation.serializer.serialize(mutation, buf, version);
+            return buf.buffer();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index e91c515..a57587c 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -27,7 +27,6 @@ public enum Stage
     READ,
     MUTATION,
     COUNTER_MUTATION,
-    BATCHLOG_MUTATION,
     MATERIALIZED_VIEW_MUTATION,
     GOSSIP,
     REQUEST_RESPONSE,
@@ -62,7 +61,6 @@ public enum Stage
                 return "internal";
             case MUTATION:
             case COUNTER_MUTATION:
-            case BATCHLOG_MUTATION:
             case MATERIALIZED_VIEW_MUTATION:
             case READ:
             case REQUEST_RESPONSE:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index ca83829..ee1fbe5 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -47,7 +47,6 @@ public class StageManager
     {
         stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
-        stages.put(Stage.BATCHLOG_MUTATION, multiThreadedLowSignalStage(Stage.BATCHLOG_MUTATION, getConcurrentBatchlogWriters()));
         stages.put(Stage.MATERIALIZED_VIEW_MUTATION, multiThreadedLowSignalStage(Stage.MATERIALIZED_VIEW_MUTATION, getConcurrentMaterializedViewWriters()));
         stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
         stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9d55fc8..22b09d3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -93,7 +93,6 @@ public class Config
     public Integer concurrent_reads = 32;
     public Integer concurrent_writes = 32;
     public Integer concurrent_counter_writes = 32;
-    public Integer concurrent_batchlog_writes = 32;
     public Integer concurrent_materialized_view_writes = 32;
 
     @Deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4e13911..31a4e9d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1080,8 +1080,9 @@ public class DatabaseDescriptor
             case PAXOS_COMMIT:
             case PAXOS_PREPARE:
             case PAXOS_PROPOSE:
-            case BATCHLOG_MUTATION:
             case HINT:
+            case BATCH_STORE:
+            case BATCH_REMOVE:
                 return getWriteRpcTimeout();
             case COUNTER_MUTATION:
                 return getCounterWriteRpcTimeout();
@@ -1128,10 +1129,6 @@ public class DatabaseDescriptor
         return conf.concurrent_counter_writes;
     }
 
-    public static int getConcurrentBatchlogWriters()
-    {
-        return conf.concurrent_batchlog_writes;
-    }
     public static int getConcurrentMaterializedViewWriters()
     {
         return conf.concurrent_materialized_view_writes;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 5de4b6c..c8482b3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -524,11 +524,6 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    public interface BatchVariables
-    {
-        public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
-    }
-
     public String toString()
     {
         return String.format("BatchStatement(type=%s, statements=%s)", type, statements);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
deleted file mode 100644
index de85925..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-import com.google.common.util.concurrent.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteFailureException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.hints.Hint;
-import org.apache.cassandra.hints.HintsService;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static com.google.common.collect.Iterables.transform;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
-
-public class BatchlogManager implements BatchlogManagerMBean
-{
-    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
-    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
-    private static final int DEFAULT_PAGE_SIZE = 128;
-
-    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
-    public static final BatchlogManager instance = new BatchlogManager();
-
-    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
-    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
-
-    // Single-thread executor service for scheduling and serializing log replay.
-    private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
-
-    public void start()
-    {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
-
-        batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches,
-                                             StorageService.RING_DELAY + REPLAY_INTERVAL,
-                                             REPLAY_INTERVAL,
-                                             TimeUnit.MILLISECONDS);
-    }
-
-    private void replayInitially()
-    {
-        // Initial run must take care of non-time-uuid batches as written by Version 1.2.
-        convertOldBatchEntries();
-
-        replayAllFailedBatches();
-    }
-
-    public static void shutdown() throws InterruptedException
-    {
-        batchlogTasks.shutdown();
-        batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
-    }
-
-    public int countAllBatches()
-    {
-        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
-        UntypedResultSet results = executeInternal(query);
-        if (results.isEmpty())
-            return 0;
-        return (int) results.one().getLong("count");
-    }
-
-    public long getTotalBatchesReplayed()
-    {
-        return totalBatchesReplayed;
-    }
-
-    public void forceBatchlogReplay() throws Exception
-    {
-        startBatchlogReplay().get();
-    }
-
-    public Future<?> startBatchlogReplay()
-    {
-        // If a replay is already in progress this request will be executed after it completes.
-        return batchlogTasks.submit(this::replayAllFailedBatches);
-    }
-
-    void performInitialReplay() throws InterruptedException, ExecutionException
-    {
-        // Invokes initial replay. Used for testing only.
-        batchlogTasks.submit(this::replayInitially).get();
-    }
-
-    public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
-    {
-        return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid)
-               .clustering()
-               .add("data", serializeMutations(mutations, version))
-               .add("version", version)
-               .build();
-    }
-
-    @VisibleForTesting
-    static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
-    {
-        try (DataOutputBuffer buf = new DataOutputBuffer())
-        {
-            buf.writeInt(mutations.size());
-            for (Mutation mutation : mutations)
-                Mutation.serializer.serialize(mutation, buf, version);
-            return buf.buffer();
-        }
-        catch (IOException e)
-        {
-            throw new AssertionError(); // cannot happen.
-        }
-    }
-
-    private void replayAllFailedBatches()
-    {
-        logger.debug("Started replayAllFailedBatches");
-
-        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
-        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
-        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
-        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
-
-        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
-        int pageSize = calculatePageSize();
-        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
-        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
-        // token(id) > token(lastReplayedUuid) as part of the query.
-        String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
-                                     SystemKeyspace.NAME,
-                                     SystemKeyspace.BATCHES);
-        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
-        processBatchlogEntries(batches, pageSize, rateLimiter);
-        lastReplayedUuid = limitUuid;
-        logger.debug("Finished replayAllFailedBatches");
-    }
-
-    // read less rows (batches) per page if they are very large
-    private static int calculatePageSize()
-    {
-        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
-        double averageRowSize = store.getMeanPartitionSize();
-        if (averageRowSize <= 0)
-            return DEFAULT_PAGE_SIZE;
-
-        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
-    }
-
-    private static void deleteBatch(UUID id)
-    {
-        Mutation mutation = new Mutation(
-                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
-                                                    UUIDType.instance.decompose(id),
-                                                    FBUtilities.timestampMicros(),
-                                                    FBUtilities.nowInSeconds()));
-        mutation.apply();
-    }
-
-    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
-    {
-        int positionInPage = 0;
-        ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize);
-
-        Set<InetAddress> hintedNodes = new HashSet<>();
-        Set<UUID> replayedBatches = new HashSet<>();
-
-        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
-        for (UntypedResultSet.Row row : batches)
-        {
-            UUID id = row.getUUID("id");
-            int version = row.getInt("version");
-            Batch batch = new Batch(id, row.getBytes("data"), version);
-            try
-            {
-                if (batch.replay(rateLimiter, hintedNodes) > 0)
-                {
-                    unfinishedBatches.add(batch);
-                }
-                else
-                {
-                    deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
-                    ++totalBatchesReplayed;
-                }
-            }
-            catch (IOException e)
-            {
-                logger.warn("Skipped batch replay of {} due to {}", id, e);
-                deleteBatch(id);
-            }
-
-            if (++positionInPage == pageSize)
-            {
-                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
-                // finish processing the page before requesting the next row.
-                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
-                positionInPage = 0;
-            }
-        }
-
-        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
-
-        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
-        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
-
-        // once all generated hints are fsynced, actually delete the batches
-        replayedBatches.forEach(BatchlogManager::deleteBatch);
-    }
-
-    private void finishAndClearBatches(ArrayList<Batch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
-    {
-        // schedule hints for timed out deliveries
-        for (Batch batch : batches)
-        {
-            batch.finish(hintedNodes);
-            replayedBatches.add(batch.id);
-        }
-
-        totalBatchesReplayed += batches.size();
-        batches.clear();
-    }
-
-    public static long getBatchlogTimeout()
-    {
-        return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
-    }
-
-    private static class Batch
-    {
-        private final UUID id;
-        private final long writtenAt;
-        private final ByteBuffer data;
-        private final int version;
-
-        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
-
-        Batch(UUID id, ByteBuffer data, int version)
-        {
-            this.id = id;
-            this.writtenAt = UUIDGen.unixTimestamp(id);
-            this.data = data;
-            this.version = version;
-        }
-
-        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
-        {
-            logger.debug("Replaying batch {}", id);
-
-            List<Mutation> mutations = replayingMutations();
-
-            if (mutations.isEmpty())
-                return 0;
-
-            int gcgs = gcgs(mutations);
-            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
-                return 0;
-
-            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
-
-            rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
-
-            return replayHandlers.size();
-        }
-
-        public void finish(Set<InetAddress> hintedNodes)
-        {
-            for (int i = 0; i < replayHandlers.size(); i++)
-            {
-                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
-                try
-                {
-                    handler.get();
-                }
-                catch (WriteTimeoutException|WriteFailureException e)
-                {
-                    logger.debug("Failed replaying a batched mutation to a node, will write a hint");
-                    logger.debug("Failure was : {}", e.getMessage());
-                    // writing hints for the rest to hints, starting from i
-                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
-                    return;
-                }
-            }
-        }
-
-        private List<Mutation> replayingMutations() throws IOException
-        {
-            DataInputPlus in = new DataInputBuffer(data, true);
-            int size = in.readInt();
-            List<Mutation> mutations = new ArrayList<>(size);
-            for (int i = 0; i < size; i++)
-            {
-                Mutation mutation = Mutation.serializer.deserialize(in, version);
-
-                // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
-                // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
-                // truncated.
-                for (UUID cfId : mutation.getColumnFamilyIds())
-                    if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
-                        mutation = mutation.without(cfId);
-
-                if (!mutation.isEmpty())
-                    mutations.add(mutation);
-            }
-            return mutations;
-        }
-
-        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
-        {
-            try
-            {
-                // Here we deserialize mutations 2nd time from byte buffer.
-                // but this is ok, because timeout on batch direct delivery is rare
-                // (it can happen only several seconds until node is marked dead)
-                // so trading some cpu to keep less objects
-                List<Mutation> replayingMutations = replayingMutations();
-                for (int i = startFrom; i < replayHandlers.size(); i++)
-                {
-                    Mutation undeliveredMutation = replayingMutations.get(i);
-                    int gcgs = gcgs(replayingMutations);
-                    ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
-
-                    if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs > FBUtilities.nowInSeconds() && handler != null)
-                    {
-                        hintedNodes.addAll(handler.undelivered);
-                        HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
-                                                    Hint.create(undeliveredMutation, writtenAt));
-                    }
-                }
-            }
-            catch (IOException e)
-            {
-                logger.error("Cannot schedule hints for undelivered batch", e);
-            }
-        }
-
-        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
-                                                                              long writtenAt,
-                                                                              Set<InetAddress> hintedNodes)
-        {
-            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
-            for (Mutation mutation : mutations)
-            {
-                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
-                if (handler != null)
-                    handlers.add(handler);
-            }
-            return handlers;
-        }
-
-        /**
-         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
-         * when a replica is down or a write request times out.
-         *
-         * @return direct delivery handler to wait on or null, if no live nodes found
-         */
-        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
-                                                                                     long writtenAt,
-                                                                                     Set<InetAddress> hintedNodes)
-        {
-            Set<InetAddress> liveEndpoints = new HashSet<>();
-            String ks = mutation.getKeyspaceName();
-            Token tk = mutation.key().getToken();
-
-            for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
-                                                         StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
-            {
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    mutation.apply();
-                }
-                else if (FailureDetector.instance.isAlive(endpoint))
-                {
-                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
-                }
-                else
-                {
-                    hintedNodes.add(endpoint);
-                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
-                                                Hint.create(mutation, writtenAt));
-                }
-            }
-
-            if (liveEndpoints.isEmpty())
-                return null;
-
-            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
-            MessageOut<Mutation> message = mutation.createMessage();
-            for (InetAddress endpoint : liveEndpoints)
-                MessagingService.instance().sendRR(message, endpoint, handler, false);
-            return handler;
-        }
-
-        private static int gcgs(Collection<Mutation> mutations)
-        {
-            int gcgs = Integer.MAX_VALUE;
-            for (Mutation mutation : mutations)
-                gcgs = Math.min(gcgs, mutation.smallestGCGS());
-            return gcgs;
-        }
-
-        /**
-         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
-         * which we did not receive a successful reply.
-         */
-        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
-        {
-            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
-            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
-            {
-                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
-                undelivered.addAll(writeEndpoints);
-            }
-
-            @Override
-            protected int totalBlockFor()
-            {
-                return this.naturalEndpoints.size();
-            }
-
-            @Override
-            public void response(MessageIn<T> m)
-            {
-                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
-                assert removed;
-                super.response(m);
-            }
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    private static void convertOldBatchEntries()
-    {
-        logger.debug("Started convertOldBatchEntries");
-
-        String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
-                                     SystemKeyspace.NAME,
-                                     SystemKeyspace.LEGACY_BATCHLOG);
-        UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE);
-        int convertedBatches = 0;
-        for (UntypedResultSet.Row row : batches)
-        {
-            UUID id = row.getUUID("id");
-            long timestamp = row.getLong("written_at");
-            int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
-            logger.debug("Converting mutation at " + timestamp);
-
-            UUID newId = id;
-            if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id))
-                newId = UUIDGen.getTimeUUID(timestamp, convertedBatches);
-            ++convertedBatches;
-
-            Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches,
-                                                   FBUtilities.timestampMicros(),
-                                                   newId)
-                    .clustering()
-                    .add("data", row.getBytes("data"))
-                    .add("version", version)
-                    .build();
-
-            addRow.apply();
-        }
-        if (convertedBatches > 0)
-            Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
-        // cleanup will be called after replay
-        logger.debug("Finished convertOldBatchEntries");
-    }
-
-    public static class EndpointFilter
-    {
-        private final String localRack;
-        private final Multimap<String, InetAddress> endpoints;
-
-        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
-        {
-            this.localRack = localRack;
-            this.endpoints = endpoints;
-        }
-
-        /**
-         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
-         */
-        public Collection<InetAddress> filter()
-        {
-            // special case for single-node data centers
-            if (endpoints.values().size() == 1)
-                return endpoints.values();
-
-            // strip out dead endpoints and localhost
-            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
-            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
-                if (isValid(entry.getValue()))
-                    validated.put(entry.getKey(), entry.getValue());
-
-            if (validated.size() <= 2)
-                return validated.values();
-
-            if (validated.size() - validated.get(localRack).size() >= 2)
-            {
-                // we have enough endpoints in other racks
-                validated.removeAll(localRack);
-            }
-
-            if (validated.keySet().size() == 1)
-            {
-                // we have only 1 `other` rack
-                Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
-                return Lists.newArrayList(Iterables.limit(otherRack, 2));
-            }
-
-            // randomize which racks we pick from if more than 2 remaining
-            Collection<String> racks;
-            if (validated.keySet().size() == 2)
-            {
-                racks = validated.keySet();
-            }
-            else
-            {
-                racks = Lists.newArrayList(validated.keySet());
-                Collections.shuffle((List<String>) racks);
-            }
-
-            // grab a random member of up to two racks
-            List<InetAddress> result = new ArrayList<>(2);
-            for (String rack : Iterables.limit(racks, 2))
-            {
-                List<InetAddress> rackMembers = validated.get(rack);
-                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
-            }
-
-            return result;
-        }
-
-        @VisibleForTesting
-        protected boolean isValid(InetAddress input)
-        {
-            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
-        }
-
-        @VisibleForTesting
-        protected int getRandomInt(int bound)
-        {
-            return ThreadLocalRandom.current().nextInt(bound);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
deleted file mode 100644
index a688117..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-public interface BatchlogManagerMBean
-{
-    /**
-     * Counts all batches currently in the batchlog.
-     *
-     * @return total batch count
-     */
-    public int countAllBatches();
-
-    /**
-     * @return total count of batches replayed since node start
-     */
-    public long getTotalBatchesReplayed();
-
-    /**
-     * Forces batchlog replay. Returns immediately if replay is already in progress.
-     */
-    public void forceBatchlogReplay() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d9ee38a..e349bfc 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -49,7 +49,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
         {
             public void run()
             {
-                MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from);
+                MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6e78b0e..da7d13d 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -58,7 +58,7 @@ public class Mutation implements IMutation
     public final long createdAt = System.currentTimeMillis();
     public Mutation(String keyspaceName, DecoratedKey key)
     {
-        this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
+        this(keyspaceName, key, new HashMap<>());
     }
 
     public Mutation(PartitionUpdate update)
@@ -201,6 +201,11 @@ public class Mutation implements IMutation
         ks.apply(this, ks.getMetadata().params.durableWrites);
     }
 
+    public void apply(boolean durableWrites)
+    {
+        Keyspace.open(keyspaceName).apply(this, durableWrites);
+    }
+
     public void applyUnsafe()
     {
         Keyspace.open(keyspaceName).apply(this, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 640e45f..d4670a2 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -18,10 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
-import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.*;
@@ -29,31 +29,32 @@ import org.apache.cassandra.tracing.Tracing;
 
 public class MutationVerbHandler implements IVerbHandler<Mutation>
 {
-    private static final boolean TEST_FAIL_WRITES = System.getProperty("cassandra.test.fail_writes", "false").equalsIgnoreCase("true");
-
     public void doVerb(MessageIn<Mutation> message, int id)  throws IOException
     {
-            // Check if there were any forwarding headers in this message
-            byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
-            InetAddress replyTo;
-            if (from == null)
-            {
-                replyTo = message.from;
-                byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
-                if (forwardBytes != null)
-                    forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
-            }
-            else
-            {
-                replyTo = InetAddress.getByAddress(from);
-            }
+        // Check if there were any forwarding headers in this message
+        byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
+        InetAddress replyTo;
+        if (from == null)
+        {
+            replyTo = message.from;
+            byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
+            if (forwardBytes != null)
+                forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
+        }
+        else
+        {
+            replyTo = InetAddress.getByAddress(from);
+        }
 
         try
         {
-            message.payload.apply();
-            WriteResponse response = new WriteResponse();
+            if (message.version < MessagingService.VERSION_30 && LegacyBatchlogMigrator.isLegacyBatchlogMutation(message.payload))
+                LegacyBatchlogMigrator.handleLegacyMutation(message.payload);
+            else
+                message.payload.apply();
+
             Tracing.trace("Enqueuing response to {}", replyTo);
-            MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
+            MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo);
         }
         catch (WriteTimeoutException wto)
         {
@@ -65,7 +66,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
      * Older version (< 1.0) will not send this message at all, hence we don't
      * need to check the version of the data.
      */
-    private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
+    private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
     {
         try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 849ac70..2e499e7 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -26,7 +26,6 @@ public class ReadRepairVerbHandler implements IVerbHandler<Mutation>
     public void doVerb(MessageIn<Mutation> message, int id)
     {
         message.payload.apply();
-        WriteResponse response = new WriteResponse();
-        MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+        MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index fb9eb48..cf8e14d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -113,7 +113,7 @@ public final class SystemKeyspace
                 "batches awaiting replay",
                 "CREATE TABLE %s ("
                 + "id timeuuid,"
-                + "data blob,"
+                + "mutations list<blob>,"
                 + "version int,"
                 + "PRIMARY KEY ((id)))")
                 .copy(new LocalPartitioner(TimeUUIDType.instance))