You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/02 09:51:53 UTC

[1/4] cassandra git commit: Improve batchlog write path

Repository: cassandra
Updated Branches:
  refs/heads/trunk 2dbf029f0 -> 79628dd70


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
deleted file mode 100644
index 568e23d..0000000
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import com.google.common.collect.Lists;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.Util.PartitionerSwitcher;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class BatchlogManagerTest
-{
-    private static final String KEYSPACE1 = "BatchlogManagerTest1";
-    private static final String CF_STANDARD1 = "Standard1";
-    private static final String CF_STANDARD2 = "Standard2";
-    private static final String CF_STANDARD3 = "Standard3";
-    private static final String CF_STANDARD4 = "Standard4";
-
-    static PartitionerSwitcher sw;
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        sw = Util.switchPartitioner(Murmur3Partitioner.instance);
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance));
-        System.out.println(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata.partitionKeyColumns());
-    }
-
-    @AfterClass
-    public static void cleanup()
-    {
-        sw.close();
-    }
-
-    @Before
-    public void setUp() throws Exception
-    {
-        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
-        InetAddress localhost = InetAddress.getByName("127.0.0.1");
-        metadata.updateNormalToken(Util.token("A"), localhost);
-        metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking();
-    }
-
-    @Test
-    public void testDelete()
-    {
-        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
-        CFMetaData cfm = cfs.metadata;
-        new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234"))
-                .clustering("c")
-                .add("val", "val" + 1234)
-                .build()
-                .applyUnsafe();
-
-        DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
-        ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
-        Iterator<Row> iter = results.iterator();
-        assert iter.hasNext();
-
-        Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
-                                                         dk,
-                                                         FBUtilities.timestampMicros(),
-                                                         FBUtilities.nowInSeconds()));
-        mutation.applyUnsafe();
-
-        Util.assertEmpty(Util.cmd(cfs, dk).build());
-    }
-
-    // TODO: Fix. Currently endlessly looping on BatchLogManager.replayAllFailedBatches
-    @Test
-    public void testReplay() throws Exception
-    {
-        long initialAllBatches = BatchlogManager.instance.countAllBatches();
-        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
-
-        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
-
-        // Generate 1000 mutations and put them all into the batchlog.
-        // Half (500) ready to be replayed, half not.
-        for (int i = 0; i < 1000; i++)
-        {
-            Mutation m = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
-                    .clustering("name" + i)
-                    .add("val", "val" + i)
-                    .build();
-
-            long timestamp = i < 500
-                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
-                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
-            BatchlogManager.getBatchlogMutationFor(Collections.singleton(m),
-                                                   UUIDGen.getTimeUUID(timestamp, i),
-                                                   MessagingService.current_version)
-                           .applyUnsafe();
-        }
-
-        // Flush the batchlog to disk (see CASSANDRA-6822).
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
-
-        assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
-        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
-        // Force batchlog replay and wait for it to complete.
-        BatchlogManager.instance.startBatchlogReplay().get();
-
-        // Ensure that the first half, and only the first half, got replayed.
-        assertEquals(500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
-        assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
-        for (int i = 0; i < 1000; i++)
-        {
-            UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i));
-            if (i < 500)
-            {
-                assertEquals(bytes(i), result.one().getBytes("key"));
-                assertEquals("name" + i, result.one().getString("name"));
-                assertEquals("val" + i, result.one().getString("val"));
-            }
-            else
-            {
-                assertTrue(result.isEmpty());
-            }
-        }
-
-        // Ensure that no stray mutations got somehow applied.
-        UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1));
-        assertEquals(500, result.one().getLong("count"));
-    }
-
-    @Test
-    public void testTruncatedReplay() throws InterruptedException, ExecutionException
-    {
-        CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
-        CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
-        // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
-        // Each batchlog entry with a mutation for Standard2 and Standard3.
-        // In the middle of the process, 'truncate' Standard2.
-        for (int i = 0; i < 1000; i++)
-        {
-            Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), bytes(i))
-                .clustering("name" + i)
-                .add("val", "val" + i)
-                .build();
-            Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), bytes(i))
-                .clustering("name" + i)
-                .add("val", "val" + i)
-                .build();
-
-            List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
-
-            // Make sure it's ready to be replayed, so adjust the timestamp.
-            long timestamp = System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout();
-
-            if (i == 500)
-                SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"),
-                                                    timestamp,
-                                                    ReplayPosition.NONE);
-
-            // Adjust the timestamp (slightly) to make the test deterministic.
-            if (i >= 500)
-                timestamp++;
-            else
-                timestamp--;
-
-            BatchlogManager.getBatchlogMutationFor(mutations,
-                                                   UUIDGen.getTimeUUID(timestamp, i),
-                                                   MessagingService.current_version)
-                           .applyUnsafe();
-        }
-
-        // Flush the batchlog to disk (see CASSANDRA-6822).
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
-
-        // Force batchlog replay and wait for it to complete.
-        BatchlogManager.instance.startBatchlogReplay().get();
-
-        // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
-        for (int i = 0; i < 1000; i++)
-        {
-            UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i));
-            if (i >= 500)
-            {
-                assertEquals(bytes(i), result.one().getBytes("key"));
-                assertEquals("name" + i, result.one().getString("name"));
-                assertEquals("val" + i, result.one().getString("val"));
-            }
-            else
-            {
-                assertTrue(result.isEmpty());
-            }
-        }
-
-        for (int i = 0; i < 1000; i++)
-        {
-            UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
-            assertEquals(bytes(i), result.one().getBytes("key"));
-            assertEquals("name" + i, result.one().getString("name"));
-            assertEquals("val" + i, result.one().getString("val"));
-        }
-    }
-
-    static Mutation fakeVersion12MutationFor(Collection<Mutation> mutations, long now) throws IOException
-    {
-        // Serialization can't write version 1.2 mutations, pretend this is old by using random id and written_at and
-        // saving it in the legacy batchlog.
-        UUID uuid = UUID.randomUUID();
-        ByteBuffer writtenAt = LongType.instance.decompose(now);
-        int version = MessagingService.VERSION_30;
-        ByteBuffer data = BatchlogManager.serializeMutations(mutations, version);
-
-        return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
-            .clustering()
-            .add("written_at", writtenAt)
-            .add("data", data)
-            .add("version", version)
-            .build();
-    }
-
-    static Mutation fakeVersion20MutationFor(Collection<Mutation> mutations, UUID uuid)
-    {
-        // Serialization can't write version 1.2 mutations, pretend this is old by saving it in the legacy batchlog.
-        int version = MessagingService.VERSION_30;
-        ByteBuffer writtenAt = LongType.instance.decompose(UUIDGen.unixTimestamp(uuid));
-        return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
-               .clustering()
-               .add("data", BatchlogManager.serializeMutations(mutations, version))
-               .add("written_at", writtenAt)
-               .add("version", version)
-               .build();
-    }
-
-    @Test
-    public void testConversion() throws Exception
-    {
-        long initialAllBatches = BatchlogManager.instance.countAllBatches();
-        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
-        CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
-
-        // Generate 1000 mutations and put them all into the batchlog.
-        // Half (500) ready to be replayed, half not.
-        for (int i = 0; i < 1000; i++)
-        {
-            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
-                .clustering("name" + i)
-                .add("val", "val" + i)
-                .build();
-
-            long timestamp = i < 500
-                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
-                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
-
-            fakeVersion12MutationFor(Collections.singleton(mutation), timestamp).applyUnsafe();
-        }
-
-        // Add 400 version 2.0 mutations and put them all into the batchlog.
-        // Half (200) ready to be replayed, half not.
-        for (int i = 1000; i < 1400; i++)
-        {
-            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
-                .clustering("name" + i)
-                .add("val", "val" + i)
-                .build();
-
-            long timestamp = i < 1200
-                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
-                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
-
-            fakeVersion20MutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(timestamp, i)).applyUnsafe();
-        }
-
-        // Mix in 100 current version mutations, 50 ready for replay.
-        for (int i = 1400; i < 1500; i++)
-        {
-            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
-                .clustering("name" + i)
-                .add("val", "val" + i)
-                .build();
-
-            long timestamp = i < 1450
-                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
-                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
-
-
-            BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation),
-                                                   UUIDGen.getTimeUUID(timestamp, i),
-                                                   MessagingService.current_version)
-                           .applyUnsafe();
-        }
-
-        // Flush the batchlog to disk (see CASSANDRA-6822).
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
-
-        assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
-        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
-        UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
-        assertEquals("Count in blog legacy", 1400, result.one().getLong("count"));
-        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
-        assertEquals("Count in blog", 100, result.one().getLong("count"));
-
-        // Force batchlog replay and wait for it to complete.
-        BatchlogManager.instance.performInitialReplay();
-
-        // Ensure that the first half, and only the first half, got replayed.
-        assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
-        assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
-
-        for (int i = 0; i < 1500; i++)
-        {
-            result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
-            if (i < 500 || i >= 1000 && i < 1200 || i >= 1400 && i < 1450)
-            {
-                assertEquals(bytes(i), result.one().getBytes("key"));
-                assertEquals("name" + i, result.one().getString("name"));
-                assertEquals("val" + i, result.one().getString("val"));
-            }
-            else
-            {
-                assertTrue("Present at " + i, result.isEmpty());
-            }
-        }
-
-        // Ensure that no stray mutations got somehow applied.
-        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
-        assertEquals(750, result.one().getLong("count"));
-
-        // Ensure batchlog is left as expected.
-        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
-        assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
-        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
-        assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
deleted file mode 100644
index be33e3f..0000000
--- a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashSet;
-
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
-
-import org.apache.cassandra.db.BatchlogManager;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-public class BatchlogEndpointFilterTest
-{
-    private static final String LOCAL = "local";
-
-    @Test
-    public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .put("1", InetAddress.getByName("11"))
-                .put("2", InetAddress.getByName("2"))
-                .put("2", InetAddress.getByName("22"))
-                .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
-        assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
-    }
-
-    @Test
-    public void shouldSelectHostFromLocal() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
-        assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
-    }
-
-    @Test
-    public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
-        assertThat(result.size(), is(1));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
-    }
-
-    @Test
-    public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .put("1", InetAddress.getByName("11"))
-                .put("1", InetAddress.getByName("111"))
-                .build();
-        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
-        // result should contain random two distinct values
-        assertThat(new HashSet<>(result).size(), is(2));
-    }
-
-    private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
-    {
-        public TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
-        {
-            super(localRack, endpoints);
-        }
-
-        @Override
-        protected boolean isValid(InetAddress input)
-        {
-            // We will use always alive non-localhost endpoints
-            return true;
-        }
-
-        @Override
-        protected int getRandomInt(int bound)
-        {
-            // We don't need random behavior here
-            return bound - 1;
-        }
-    }
-}


[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/79628dd7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/79628dd7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/79628dd7

Branch: refs/heads/trunk
Commit: 79628dd7076c125c9b434e90689057cdf08bb87b
Parents: 2dbf029 53a177a
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Sep 2 08:51:29 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 2 08:51:29 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   1 -
 .../org/apache/cassandra/batchlog/Batch.java    | 155 +++++
 .../batchlog/BatchRemoveVerbHandler.java        |  31 +
 .../batchlog/BatchStoreVerbHandler.java         |  32 +
 .../cassandra/batchlog/BatchlogManager.java     | 554 +++++++++++++++++
 .../batchlog/BatchlogManagerMBean.java          |  38 ++
 .../batchlog/LegacyBatchlogMigrator.java        | 196 ++++++
 .../org/apache/cassandra/concurrent/Stage.java  |   2 -
 .../cassandra/concurrent/StageManager.java      |   1 -
 .../org/apache/cassandra/config/Config.java     |   1 -
 .../cassandra/config/DatabaseDescriptor.java    |   7 +-
 .../cql3/statements/BatchStatement.java         |   5 -
 .../apache/cassandra/db/BatchlogManager.java    | 596 -------------------
 .../cassandra/db/BatchlogManagerMBean.java      |  38 --
 .../db/CounterMutationVerbHandler.java          |   2 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   7 +-
 .../cassandra/db/MutationVerbHandler.java       |  43 +-
 .../cassandra/db/ReadRepairVerbHandler.java     |   3 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../org/apache/cassandra/db/WriteResponse.java  |  18 +-
 .../cassandra/hints/EncodedHintMessage.java     |   6 +-
 src/java/org/apache/cassandra/hints/Hint.java   |  11 +-
 .../org/apache/cassandra/hints/HintMessage.java |  14 +-
 .../cassandra/hints/LegacyHintsMigrator.java    |   2 +-
 .../apache/cassandra/net/MessagingService.java  |  26 +-
 .../cassandra/service/CassandraDaemon.java      |   6 +-
 .../apache/cassandra/service/StorageProxy.java  | 208 ++++---
 .../cassandra/service/StorageService.java       |  19 +-
 .../service/paxos/CommitVerbHandler.java        |   6 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   4 +-
 .../cql3/MaterializedViewLongTest.java          |   2 +-
 .../apache/cassandra/batchlog/BatchTest.java    | 153 +++++
 .../batchlog/BatchlogEndpointFilterTest.java    | 115 ++++
 .../cassandra/batchlog/BatchlogManagerTest.java | 460 ++++++++++++++
 .../cassandra/db/BatchlogManagerTest.java       | 394 ------------
 .../service/BatchlogEndpointFilterTest.java     | 117 ----
 37 files changed, 1945 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/79628dd7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c916ac9,751b75d..2a1bd1c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 +3.2
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +
 +
  3.0.0-beta2
+  * Improve batchlog write patch (CASSANDRA-9673)
   * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
   * Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
   * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/79628dd7/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/79628dd7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------


[3/4] cassandra git commit: Improve batchlog write path

Posted by al...@apache.org.
Improve batchlog write path

patch by Stefania Alborghetti; reviewed by Aleksey Yeschenko for
CASSANDRA-9673


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53a177a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53a177a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53a177a9

Branch: refs/heads/trunk
Commit: 53a177a9150586e56408f25c959f75110a2997e7
Parents: 5f02f20
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Jul 10 17:03:06 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 2 08:43:42 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   1 -
 .../org/apache/cassandra/batchlog/Batch.java    | 155 +++++
 .../batchlog/BatchRemoveVerbHandler.java        |  31 +
 .../batchlog/BatchStoreVerbHandler.java         |  32 +
 .../cassandra/batchlog/BatchlogManager.java     | 554 +++++++++++++++++
 .../batchlog/BatchlogManagerMBean.java          |  38 ++
 .../batchlog/LegacyBatchlogMigrator.java        | 196 ++++++
 .../org/apache/cassandra/concurrent/Stage.java  |   2 -
 .../cassandra/concurrent/StageManager.java      |   1 -
 .../org/apache/cassandra/config/Config.java     |   1 -
 .../cassandra/config/DatabaseDescriptor.java    |   7 +-
 .../cql3/statements/BatchStatement.java         |   5 -
 .../apache/cassandra/db/BatchlogManager.java    | 596 -------------------
 .../cassandra/db/BatchlogManagerMBean.java      |  38 --
 .../db/CounterMutationVerbHandler.java          |   2 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   7 +-
 .../cassandra/db/MutationVerbHandler.java       |  43 +-
 .../cassandra/db/ReadRepairVerbHandler.java     |   3 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../org/apache/cassandra/db/WriteResponse.java  |  18 +-
 .../cassandra/hints/EncodedHintMessage.java     |   6 +-
 src/java/org/apache/cassandra/hints/Hint.java   |  11 +-
 .../org/apache/cassandra/hints/HintMessage.java |  14 +-
 .../cassandra/hints/LegacyHintsMigrator.java    |   2 +-
 .../apache/cassandra/net/MessagingService.java  |  26 +-
 .../cassandra/service/CassandraDaemon.java      |   6 +-
 .../apache/cassandra/service/StorageProxy.java  | 208 ++++---
 .../cassandra/service/StorageService.java       |  19 +-
 .../service/paxos/CommitVerbHandler.java        |   6 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   4 +-
 .../cql3/MaterializedViewLongTest.java          |   2 +-
 .../apache/cassandra/batchlog/BatchTest.java    | 153 +++++
 .../batchlog/BatchlogEndpointFilterTest.java    | 115 ++++
 .../cassandra/batchlog/BatchlogManagerTest.java | 460 ++++++++++++++
 .../cassandra/db/BatchlogManagerTest.java       | 394 ------------
 .../service/BatchlogEndpointFilterTest.java     | 117 ----
 37 files changed, 1945 insertions(+), 1331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fe8f453..751b75d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta2
+ * Improve batchlog write patch (CASSANDRA-9673)
  * Re-apply MaterializedView updates on commitlog replay (CASSANDRA-10164)
  * Require AbstractType.isByteOrderComparable declaration in constructor (CASSANDRA-9901)
  * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 16108bd..0f8b829 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -355,7 +355,6 @@ seed_provider:
 concurrent_reads: 32
 concurrent_writes: 32
 concurrent_counter_writes: 32
-concurrent_batchlog_writes: 32
 
 # For materialized view writes, as there is a read involved, so this should
 # be limited by the less of concurrent reads or concurrent writes.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/Batch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/Batch.java b/src/java/org/apache/cassandra/batchlog/Batch.java
new file mode 100644
index 0000000..caa2682
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/Batch.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofVInt;
+
+public final class Batch
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final UUID id;
+    public final long creationTime; // time of batch creation (in microseconds)
+
+    // one of these will always be empty
+    final Collection<Mutation> decodedMutations;
+    final Collection<ByteBuffer> encodedMutations;
+
+    private Batch(UUID id, long creationTime, Collection<Mutation> decodedMutations, Collection<ByteBuffer> encodedMutations)
+    {
+        this.id = id;
+        this.creationTime = creationTime;
+
+        this.decodedMutations = decodedMutations;
+        this.encodedMutations = encodedMutations;
+    }
+
+    /**
+     * Creates a 'local' batch - with all enclosed mutations in decoded form (as Mutation instances)
+     */
+    public static Batch createLocal(UUID id, long creationTime, Collection<Mutation> mutations)
+    {
+        return new Batch(id, creationTime, mutations, Collections.emptyList());
+    }
+
+    /**
+     * Creates a 'remote' batch - with all enclosed mutations in encoded form (as ByteBuffer instances)
+     *
+     * The mutations will always be encoded using the current messaging version.
+     */
+    public static Batch createRemote(UUID id, long creationTime, Collection<ByteBuffer> mutations)
+    {
+        return new Batch(id, creationTime, Collections.<Mutation>emptyList(), mutations);
+    }
+
+    /**
+     * Count of the mutations in the batch.
+     */
+    public int size()
+    {
+        return decodedMutations.size() + encodedMutations.size();
+    }
+
+    static final class Serializer implements IVersionedSerializer<Batch>
+    {
+        public long serializedSize(Batch batch, int version)
+        {
+            assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+            long size = UUIDSerializer.serializer.serializedSize(batch.id, version);
+            size += sizeof(batch.creationTime);
+
+            size += sizeofVInt(batch.decodedMutations.size());
+            for (Mutation mutation : batch.decodedMutations)
+            {
+                int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version);
+                size += sizeofVInt(mutationSize);
+                size += mutationSize;
+            }
+
+            return size;
+        }
+
+        public void serialize(Batch batch, DataOutputPlus out, int version) throws IOException
+        {
+            assert batch.encodedMutations.isEmpty() : "attempted to serialize a 'remote' batch";
+
+            UUIDSerializer.serializer.serialize(batch.id, out, version);
+            out.writeLong(batch.creationTime);
+
+            out.writeVInt(batch.decodedMutations.size());
+            for (Mutation mutation : batch.decodedMutations)
+            {
+                out.writeVInt(Mutation.serializer.serializedSize(mutation, version));
+                Mutation.serializer.serialize(mutation, out, version);
+            }
+        }
+
+        public Batch deserialize(DataInputPlus in, int version) throws IOException
+        {
+            UUID id = UUIDSerializer.serializer.deserialize(in, version);
+            long creationTime = in.readLong();
+
+            /*
+             * If version doesn't match the current one, we cannot not just read the encoded mutations verbatim,
+             * so we decode them instead, to deal with compatibility.
+             */
+            return version == MessagingService.current_version
+                 ? createRemote(id, creationTime, readEncodedMutations(in))
+                 : createLocal(id, creationTime, decodeMutations(in, version));
+        }
+
+        private static Collection<ByteBuffer> readEncodedMutations(DataInputPlus in) throws IOException
+        {
+            int count = (int) in.readVInt();
+
+            ArrayList<ByteBuffer> mutations = new ArrayList<>(count);
+            for (int i = 0; i < count; i++)
+                mutations.add(ByteBufferUtil.readWithVIntLength(in));
+
+            return mutations;
+        }
+
+        private static Collection<Mutation> decodeMutations(DataInputPlus in, int version) throws IOException
+        {
+            int count = (int) in.readVInt();
+
+            ArrayList<Mutation> mutations = new ArrayList<>(count);
+            for (int i = 0; i < count; i++)
+            {
+                in.readVInt(); // skip mutation size
+                mutations.add(Mutation.serializer.deserialize(in, version));
+            }
+
+            return mutations;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
new file mode 100644
index 0000000..3c3fcec
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchRemoveVerbHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.util.UUID;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+
+public final class BatchRemoveVerbHandler implements IVerbHandler<UUID>
+{
+    public void doVerb(MessageIn<UUID> message, int id)
+    {
+        BatchlogManager.remove(message.payload);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
new file mode 100644
index 0000000..4bc878c
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchStoreVerbHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import org.apache.cassandra.db.WriteResponse;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+
+public final class BatchStoreVerbHandler implements IVerbHandler<Batch>
+{
+    public void doVerb(MessageIn<Batch> message, int id)
+    {
+        BatchlogManager.store(message.payload);
+        MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
new file mode 100644
index 0000000..934ebaa
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static com.google.common.collect.Iterables.transform;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
+
+public class BatchlogManager implements BatchlogManagerMBean
+{
+    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
+    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
+    static final int DEFAULT_PAGE_SIZE = 128;
+
+    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
+    public static final BatchlogManager instance = new BatchlogManager();
+
+    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
+    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
+
+    // Single-thread executor service for scheduling and serializing log replay.
+    private final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
+
+    public void start()
+    {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        try
+        {
+            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
+                                             StorageService.RING_DELAY,
+                                             REPLAY_INTERVAL,
+                                             TimeUnit.MILLISECONDS);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+        batchlogTasks.shutdown();
+        batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
+    public static void remove(UUID id)
+    {
+        new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
+                                                         UUIDType.instance.decompose(id),
+                                                         FBUtilities.timestampMicros(),
+                                                         FBUtilities.nowInSeconds()))
+            .apply();
+    }
+
+    public static void store(Batch batch)
+    {
+        store(batch, true);
+    }
+
+    public static void store(Batch batch, boolean durableWrites)
+    {
+        RowUpdateBuilder builder =
+            new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
+                .clustering()
+                .add("version", MessagingService.current_version);
+
+        for (ByteBuffer mutation : batch.encodedMutations)
+            builder.addListEntry("mutations", mutation);
+
+        for (Mutation mutation : batch.decodedMutations)
+        {
+            try (DataOutputBuffer buffer = new DataOutputBuffer())
+            {
+                Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
+                builder.addListEntry("mutations", buffer.buffer());
+            }
+            catch (IOException e)
+            {
+                // shouldn't happen
+                throw new AssertionError(e);
+            }
+        }
+
+        builder.build().apply(durableWrites);
+    }
+
+    @VisibleForTesting
+    public int countAllBatches()
+    {
+        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
+        UntypedResultSet results = executeInternal(query);
+        if (results == null || results.isEmpty())
+            return 0;
+
+        return (int) results.one().getLong("count");
+    }
+
+    public long getTotalBatchesReplayed()
+    {
+        return totalBatchesReplayed;
+    }
+
+    public void forceBatchlogReplay() throws Exception
+    {
+        startBatchlogReplay().get();
+    }
+
+    public Future<?> startBatchlogReplay()
+    {
+        // If a replay is already in progress this request will be executed after it completes.
+        return batchlogTasks.submit(this::replayFailedBatches);
+    }
+
+    void performInitialReplay() throws InterruptedException, ExecutionException
+    {
+        // Invokes initial replay. Used for testing only.
+        batchlogTasks.submit(this::replayFailedBatches).get();
+    }
+
+    private void replayFailedBatches()
+    {
+        logger.debug("Started replayFailedBatches");
+
+        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
+        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
+        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+
+        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
+        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
+        int pageSize = calculatePageSize(store);
+        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
+        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
+        // token(id) > token(lastReplayedUuid) as part of the query.
+        String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.BATCHES);
+        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
+        processBatchlogEntries(batches, pageSize, rateLimiter);
+        lastReplayedUuid = limitUuid;
+        logger.debug("Finished replayFailedBatches");
+    }
+
+    // read less rows (batches) per page if they are very large
+    static int calculatePageSize(ColumnFamilyStore store)
+    {
+        double averageRowSize = store.getMeanPartitionSize();
+        if (averageRowSize <= 0)
+            return DEFAULT_PAGE_SIZE;
+
+        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
+    }
+
+    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
+    {
+        int positionInPage = 0;
+        ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
+
+        Set<InetAddress> hintedNodes = new HashSet<>();
+        Set<UUID> replayedBatches = new HashSet<>();
+
+        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
+        for (UntypedResultSet.Row row : batches)
+        {
+            UUID id = row.getUUID("id");
+            int version = row.getInt("version");
+            try
+            {
+                ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
+                if (batch.replay(rateLimiter, hintedNodes) > 0)
+                {
+                    unfinishedBatches.add(batch);
+                }
+                else
+                {
+                    remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
+                    ++totalBatchesReplayed;
+                }
+            }
+            catch (IOException e)
+            {
+                logger.warn("Skipped batch replay of {} due to {}", id, e);
+                remove(id);
+            }
+
+            if (++positionInPage == pageSize)
+            {
+                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
+                // finish processing the page before requesting the next row.
+                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+                positionInPage = 0;
+            }
+        }
+
+        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+
+        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
+        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
+
+        // once all generated hints are fsynced, actually delete the batches
+        replayedBatches.forEach(BatchlogManager::remove);
+    }
+
+    private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
+    {
+        // schedule hints for timed out deliveries
+        for (ReplayingBatch batch : batches)
+        {
+            batch.finish(hintedNodes);
+            replayedBatches.add(batch.id);
+        }
+
+        totalBatchesReplayed += batches.size();
+        batches.clear();
+    }
+
+    public static long getBatchlogTimeout()
+    {
+        return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
+    }
+
+    private static class ReplayingBatch
+    {
+        private final UUID id;
+        private final long writtenAt;
+        private final List<Mutation> mutations;
+        private final int replayedBytes;
+
+        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
+
+        ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
+        {
+            this.id = id;
+            this.writtenAt = UUIDGen.unixTimestamp(id);
+            this.mutations = new ArrayList<>(serializedMutations.size());
+            this.replayedBytes = addMutations(version, serializedMutations);
+        }
+
+        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
+        {
+            logger.debug("Replaying batch {}", id);
+
+            if (mutations.isEmpty())
+                return 0;
+
+            int gcgs = gcgs(mutations);
+            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+                return 0;
+
+            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
+
+            rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
+
+            return replayHandlers.size();
+        }
+
+        public void finish(Set<InetAddress> hintedNodes)
+        {
+            for (int i = 0; i < replayHandlers.size(); i++)
+            {
+                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+                try
+                {
+                    handler.get();
+                }
+                catch (WriteTimeoutException|WriteFailureException e)
+                {
+                    logger.debug("Failed replaying a batched mutation to a node, will write a hint");
+                    logger.debug("Failure was : {}", e.getMessage());
+                    // writing hints for the rest to hints, starting from i
+                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
+                    return;
+                }
+            }
+        }
+
+        private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
+        {
+            int ret = 0;
+            for (ByteBuffer serializedMutation : serializedMutations)
+            {
+                ret += serializedMutation.remaining();
+                try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
+                {
+                    addMutation(Mutation.serializer.deserialize(in, version));
+                }
+            }
+
+            return ret;
+        }
+
+        // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
+        // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
+        // truncated.
+        private void addMutation(Mutation mutation)
+        {
+            for (UUID cfId : mutation.getColumnFamilyIds())
+                if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
+                    mutation = mutation.without(cfId);
+
+            if (!mutation.isEmpty())
+                mutations.add(mutation);
+        }
+
+        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
+        {
+            int gcgs = gcgs(mutations);
+
+            // expired
+            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
+                return;
+
+            for (int i = startFrom; i < replayHandlers.size(); i++)
+            {
+                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
+                Mutation undeliveredMutation = mutations.get(i);
+
+                if (handler != null)
+                {
+                    hintedNodes.addAll(handler.undelivered);
+                    HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+                                                Hint.create(undeliveredMutation, writtenAt));
+                }
+            }
+        }
+
+        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
+                                                                              long writtenAt,
+                                                                              Set<InetAddress> hintedNodes)
+        {
+            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
+            for (Mutation mutation : mutations)
+            {
+                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
+                if (handler != null)
+                    handlers.add(handler);
+            }
+            return handlers;
+        }
+
+        /**
+         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
+         * when a replica is down or a write request times out.
+         *
+         * @return direct delivery handler to wait on or null, if no live nodes found
+         */
+        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
+                                                                                     long writtenAt,
+                                                                                     Set<InetAddress> hintedNodes)
+        {
+            Set<InetAddress> liveEndpoints = new HashSet<>();
+            String ks = mutation.getKeyspaceName();
+            Token tk = mutation.key().getToken();
+
+            for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
+                                                         StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
+            {
+                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                {
+                    mutation.apply();
+                }
+                else if (FailureDetector.instance.isAlive(endpoint))
+                {
+                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+                }
+                else
+                {
+                    hintedNodes.add(endpoint);
+                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+                                                Hint.create(mutation, writtenAt));
+                }
+            }
+
+            if (liveEndpoints.isEmpty())
+                return null;
+
+            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
+            MessageOut<Mutation> message = mutation.createMessage();
+            for (InetAddress endpoint : liveEndpoints)
+                MessagingService.instance().sendRR(message, endpoint, handler, false);
+            return handler;
+        }
+
+        private static int gcgs(Collection<Mutation> mutations)
+        {
+            int gcgs = Integer.MAX_VALUE;
+            for (Mutation mutation : mutations)
+                gcgs = Math.min(gcgs, mutation.smallestGCGS());
+            return gcgs;
+        }
+
+        /**
+         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
+         * which we did not receive a successful reply.
+         */
+        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
+        {
+            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+            {
+                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+                undelivered.addAll(writeEndpoints);
+            }
+
+            @Override
+            protected int totalBlockFor()
+            {
+                return this.naturalEndpoints.size();
+            }
+
+            @Override
+            public void response(MessageIn<T> m)
+            {
+                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
+                assert removed;
+                super.response(m);
+            }
+        }
+    }
+
+    public static class EndpointFilter
+    {
+        private final String localRack;
+        private final Multimap<String, InetAddress> endpoints;
+
+        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+        {
+            this.localRack = localRack;
+            this.endpoints = endpoints;
+        }
+
+        /**
+         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
+         */
+        public Collection<InetAddress> filter()
+        {
+            // special case for single-node data centers
+            if (endpoints.values().size() == 1)
+                return endpoints.values();
+
+            // strip out dead endpoints and localhost
+            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+                if (isValid(entry.getValue()))
+                    validated.put(entry.getKey(), entry.getValue());
+
+            if (validated.size() <= 2)
+                return validated.values();
+
+            if (validated.size() - validated.get(localRack).size() >= 2)
+            {
+                // we have enough endpoints in other racks
+                validated.removeAll(localRack);
+            }
+
+            if (validated.keySet().size() == 1)
+            {
+                // we have only 1 `other` rack
+                Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+                return Lists.newArrayList(Iterables.limit(otherRack, 2));
+            }
+
+            // randomize which racks we pick from if more than 2 remaining
+            Collection<String> racks;
+            if (validated.keySet().size() == 2)
+            {
+                racks = validated.keySet();
+            }
+            else
+            {
+                racks = Lists.newArrayList(validated.keySet());
+                Collections.shuffle((List<String>) racks);
+            }
+
+            // grab a random member of up to two racks
+            List<InetAddress> result = new ArrayList<>(2);
+            for (String rack : Iterables.limit(racks, 2))
+            {
+                List<InetAddress> rackMembers = validated.get(rack);
+                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+            }
+
+            return result;
+        }
+
+        @VisibleForTesting
+        protected boolean isValid(InetAddress input)
+        {
+            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+        }
+
+        @VisibleForTesting
+        protected int getRandomInt(int bound)
+        {
+            return ThreadLocalRandom.current().nextInt(bound);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
new file mode 100644
index 0000000..4dcc9f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManagerMBean.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+public interface BatchlogManagerMBean
+{
+    /**
+     * Counts all batches currently in the batchlog.
+     *
+     * @return total batch count
+     */
+    public int countAllBatches();
+
+    /**
+     * @return total count of batches replayed since node start
+     */
+    public long getTotalBatchesReplayed();
+
+    /**
+     * Forces batchlog replay. Returns immediately if replay is already in progress.
+     */
+    public void forceBatchlogReplay() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
new file mode 100644
index 0000000..13ff81a
--- /dev/null
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+public final class LegacyBatchlogMigrator
+{
+    private static final Logger logger = LoggerFactory.getLogger(LegacyBatchlogMigrator.class);
+
+    private LegacyBatchlogMigrator()
+    {
+        // static class
+    }
+
+    @SuppressWarnings("deprecation")
+    public static void migrate()
+    {
+        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG);
+
+        // nothing to migrate
+        if (store.isEmpty())
+            return;
+
+        logger.info("Migrating legacy batchlog to new storage");
+
+        int convertedBatches = 0;
+        String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.LEGACY_BATCHLOG);
+
+        int pageSize = BatchlogManager.calculatePageSize(store);
+
+        UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(query, pageSize);
+        for (UntypedResultSet.Row row : rows)
+        {
+            if (apply(row, convertedBatches))
+                convertedBatches++;
+        }
+
+        if (convertedBatches > 0)
+            Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
+    }
+
+    @SuppressWarnings("deprecation")
+    public static boolean isLegacyBatchlogMutation(Mutation mutation)
+    {
+        return mutation.getKeyspaceName().equals(SystemKeyspace.NAME)
+            && mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId) != null;
+    }
+
+    @SuppressWarnings("deprecation")
+    public static void handleLegacyMutation(Mutation mutation)
+    {
+        PartitionUpdate update = mutation.getPartitionUpdate(SystemKeyspace.LegacyBatchlog.cfId);
+        logger.debug("Applying legacy batchlog mutation {}", update);
+        update.forEach(row -> apply(UntypedResultSet.Row.fromInternalRow(update.metadata(), update.partitionKey(), row), -1));
+    }
+
+    private static boolean apply(UntypedResultSet.Row row, long counter)
+    {
+        UUID id = row.getUUID("id");
+        long timestamp = id.version() == 1 ? UUIDGen.unixTimestamp(id) : row.getLong("written_at");
+        int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+
+        if (id.version() != 1)
+            id = UUIDGen.getTimeUUID(timestamp, counter);
+
+        logger.debug("Converting mutation at {}", timestamp);
+
+        try (DataInputBuffer in = new DataInputBuffer(row.getBytes("data"), false))
+        {
+            int numMutations = in.readInt();
+            List<Mutation> mutations = new ArrayList<>(numMutations);
+            for (int i = 0; i < numMutations; i++)
+                mutations.add(Mutation.serializer.deserialize(in, version));
+
+            BatchlogManager.store(Batch.createLocal(id, TimeUnit.MILLISECONDS.toMicros(timestamp), mutations));
+            return true;
+        }
+        catch (Throwable t)
+        {
+            logger.error("Failed to convert mutation {} at timestamp {}", id, timestamp, t);
+            return false;
+        }
+    }
+
+    public static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
+    throws WriteTimeoutException, WriteFailureException
+    {
+        for (InetAddress target : endpoints)
+        {
+            logger.debug("Sending legacy batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
+            int targetVersion = MessagingService.instance().getVersion(target);
+            MessagingService.instance().sendRR(getStoreMutation(batch, targetVersion).createMessage(MessagingService.Verb.MUTATION),
+                                               target,
+                                               handler,
+                                               false);
+        }
+    }
+
+    public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+    {
+        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
+                                                                                     Collections.<InetAddress>emptyList(),
+                                                                                     ConsistencyLevel.ANY,
+                                                                                     Keyspace.open(SystemKeyspace.NAME),
+                                                                                     null,
+                                                                                     WriteType.SIMPLE);
+        Mutation mutation = getRemoveMutation(uuid);
+
+        for (InetAddress target : endpoints)
+        {
+            logger.debug("Sending legacy batchlog remove request {} to {}", uuid, target);
+            MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION), target, handler, false);
+        }
+    }
+
+    static void store(Batch batch, int version)
+    {
+        getStoreMutation(batch, version).apply();
+    }
+
+    @SuppressWarnings("deprecation")
+    static Mutation getStoreMutation(Batch batch, int version)
+    {
+        return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, batch.creationTime, batch.id)
+               .clustering()
+               .add("written_at", new Date(batch.creationTime / 1000))
+               .add("data", getSerializedMutations(version, batch.decodedMutations))
+               .add("version", version)
+               .build();
+    }
+
+    @SuppressWarnings("deprecation")
+    private static Mutation getRemoveMutation(UUID uuid)
+    {
+        return new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.LegacyBatchlog,
+                                                                UUIDType.instance.decompose(uuid),
+                                                                FBUtilities.timestampMicros(),
+                                                                FBUtilities.nowInSeconds()));
+    }
+
+    private static ByteBuffer getSerializedMutations(int version, Collection<Mutation> mutations)
+    {
+        try (DataOutputBuffer buf = new DataOutputBuffer())
+        {
+            buf.writeInt(mutations.size());
+            for (Mutation mutation : mutations)
+                Mutation.serializer.serialize(mutation, buf, version);
+            return buf.buffer();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/Stage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java
index e91c515..a57587c 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -27,7 +27,6 @@ public enum Stage
     READ,
     MUTATION,
     COUNTER_MUTATION,
-    BATCHLOG_MUTATION,
     MATERIALIZED_VIEW_MUTATION,
     GOSSIP,
     REQUEST_RESPONSE,
@@ -62,7 +61,6 @@ public enum Stage
                 return "internal";
             case MUTATION:
             case COUNTER_MUTATION:
-            case BATCHLOG_MUTATION:
             case MATERIALIZED_VIEW_MUTATION:
             case READ:
             case REQUEST_RESPONSE:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index ca83829..ee1fbe5 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -47,7 +47,6 @@ public class StageManager
     {
         stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
-        stages.put(Stage.BATCHLOG_MUTATION, multiThreadedLowSignalStage(Stage.BATCHLOG_MUTATION, getConcurrentBatchlogWriters()));
         stages.put(Stage.MATERIALIZED_VIEW_MUTATION, multiThreadedLowSignalStage(Stage.MATERIALIZED_VIEW_MUTATION, getConcurrentMaterializedViewWriters()));
         stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
         stages.put(Stage.REQUEST_RESPONSE, multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9d55fc8..22b09d3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -93,7 +93,6 @@ public class Config
     public Integer concurrent_reads = 32;
     public Integer concurrent_writes = 32;
     public Integer concurrent_counter_writes = 32;
-    public Integer concurrent_batchlog_writes = 32;
     public Integer concurrent_materialized_view_writes = 32;
 
     @Deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4e13911..31a4e9d 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1080,8 +1080,9 @@ public class DatabaseDescriptor
             case PAXOS_COMMIT:
             case PAXOS_PREPARE:
             case PAXOS_PROPOSE:
-            case BATCHLOG_MUTATION:
             case HINT:
+            case BATCH_STORE:
+            case BATCH_REMOVE:
                 return getWriteRpcTimeout();
             case COUNTER_MUTATION:
                 return getCounterWriteRpcTimeout();
@@ -1128,10 +1129,6 @@ public class DatabaseDescriptor
         return conf.concurrent_counter_writes;
     }
 
-    public static int getConcurrentBatchlogWriters()
-    {
-        return conf.concurrent_batchlog_writes;
-    }
     public static int getConcurrentMaterializedViewWriters()
     {
         return conf.concurrent_materialized_view_writes;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 5de4b6c..c8482b3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -524,11 +524,6 @@ public class BatchStatement implements CQLStatement
         }
     }
 
-    public interface BatchVariables
-    {
-        public List<ByteBuffer> getVariablesForStatement(int statementInBatch);
-    }
-
     public String toString()
     {
         return String.format("BatchStatement(type=%s, statements=%s)", type, statements);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
deleted file mode 100644
index de85925..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.*;
-import com.google.common.util.concurrent.RateLimiter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.WriteFailureException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.hints.Hint;
-import org.apache.cassandra.hints.HintsService;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-import static com.google.common.collect.Iterables.transform;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
-import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
-
-public class BatchlogManager implements BatchlogManagerMBean
-{
-    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
-    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
-    private static final int DEFAULT_PAGE_SIZE = 128;
-
-    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
-    public static final BatchlogManager instance = new BatchlogManager();
-
-    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
-    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
-
-    // Single-thread executor service for scheduling and serializing log replay.
-    private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
-
-    public void start()
-    {
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
-
-        batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches,
-                                             StorageService.RING_DELAY + REPLAY_INTERVAL,
-                                             REPLAY_INTERVAL,
-                                             TimeUnit.MILLISECONDS);
-    }
-
-    private void replayInitially()
-    {
-        // Initial run must take care of non-time-uuid batches as written by Version 1.2.
-        convertOldBatchEntries();
-
-        replayAllFailedBatches();
-    }
-
-    public static void shutdown() throws InterruptedException
-    {
-        batchlogTasks.shutdown();
-        batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
-    }
-
-    public int countAllBatches()
-    {
-        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
-        UntypedResultSet results = executeInternal(query);
-        if (results.isEmpty())
-            return 0;
-        return (int) results.one().getLong("count");
-    }
-
-    public long getTotalBatchesReplayed()
-    {
-        return totalBatchesReplayed;
-    }
-
-    public void forceBatchlogReplay() throws Exception
-    {
-        startBatchlogReplay().get();
-    }
-
-    public Future<?> startBatchlogReplay()
-    {
-        // If a replay is already in progress this request will be executed after it completes.
-        return batchlogTasks.submit(this::replayAllFailedBatches);
-    }
-
-    void performInitialReplay() throws InterruptedException, ExecutionException
-    {
-        // Invokes initial replay. Used for testing only.
-        batchlogTasks.submit(this::replayInitially).get();
-    }
-
-    public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
-    {
-        return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid)
-               .clustering()
-               .add("data", serializeMutations(mutations, version))
-               .add("version", version)
-               .build();
-    }
-
-    @VisibleForTesting
-    static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
-    {
-        try (DataOutputBuffer buf = new DataOutputBuffer())
-        {
-            buf.writeInt(mutations.size());
-            for (Mutation mutation : mutations)
-                Mutation.serializer.serialize(mutation, buf, version);
-            return buf.buffer();
-        }
-        catch (IOException e)
-        {
-            throw new AssertionError(); // cannot happen.
-        }
-    }
-
-    private void replayAllFailedBatches()
-    {
-        logger.debug("Started replayAllFailedBatches");
-
-        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
-        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
-        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
-        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
-
-        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
-        int pageSize = calculatePageSize();
-        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
-        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
-        // token(id) > token(lastReplayedUuid) as part of the query.
-        String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
-                                     SystemKeyspace.NAME,
-                                     SystemKeyspace.BATCHES);
-        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
-        processBatchlogEntries(batches, pageSize, rateLimiter);
-        lastReplayedUuid = limitUuid;
-        logger.debug("Finished replayAllFailedBatches");
-    }
-
-    // read less rows (batches) per page if they are very large
-    private static int calculatePageSize()
-    {
-        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
-        double averageRowSize = store.getMeanPartitionSize();
-        if (averageRowSize <= 0)
-            return DEFAULT_PAGE_SIZE;
-
-        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
-    }
-
-    private static void deleteBatch(UUID id)
-    {
-        Mutation mutation = new Mutation(
-                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
-                                                    UUIDType.instance.decompose(id),
-                                                    FBUtilities.timestampMicros(),
-                                                    FBUtilities.nowInSeconds()));
-        mutation.apply();
-    }
-
-    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
-    {
-        int positionInPage = 0;
-        ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize);
-
-        Set<InetAddress> hintedNodes = new HashSet<>();
-        Set<UUID> replayedBatches = new HashSet<>();
-
-        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
-        for (UntypedResultSet.Row row : batches)
-        {
-            UUID id = row.getUUID("id");
-            int version = row.getInt("version");
-            Batch batch = new Batch(id, row.getBytes("data"), version);
-            try
-            {
-                if (batch.replay(rateLimiter, hintedNodes) > 0)
-                {
-                    unfinishedBatches.add(batch);
-                }
-                else
-                {
-                    deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
-                    ++totalBatchesReplayed;
-                }
-            }
-            catch (IOException e)
-            {
-                logger.warn("Skipped batch replay of {} due to {}", id, e);
-                deleteBatch(id);
-            }
-
-            if (++positionInPage == pageSize)
-            {
-                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
-                // finish processing the page before requesting the next row.
-                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
-                positionInPage = 0;
-            }
-        }
-
-        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
-
-        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
-        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
-
-        // once all generated hints are fsynced, actually delete the batches
-        replayedBatches.forEach(BatchlogManager::deleteBatch);
-    }
-
-    private void finishAndClearBatches(ArrayList<Batch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
-    {
-        // schedule hints for timed out deliveries
-        for (Batch batch : batches)
-        {
-            batch.finish(hintedNodes);
-            replayedBatches.add(batch.id);
-        }
-
-        totalBatchesReplayed += batches.size();
-        batches.clear();
-    }
-
-    public static long getBatchlogTimeout()
-    {
-        return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
-    }
-
-    private static class Batch
-    {
-        private final UUID id;
-        private final long writtenAt;
-        private final ByteBuffer data;
-        private final int version;
-
-        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
-
-        Batch(UUID id, ByteBuffer data, int version)
-        {
-            this.id = id;
-            this.writtenAt = UUIDGen.unixTimestamp(id);
-            this.data = data;
-            this.version = version;
-        }
-
-        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
-        {
-            logger.debug("Replaying batch {}", id);
-
-            List<Mutation> mutations = replayingMutations();
-
-            if (mutations.isEmpty())
-                return 0;
-
-            int gcgs = gcgs(mutations);
-            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
-                return 0;
-
-            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
-
-            rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
-
-            return replayHandlers.size();
-        }
-
-        public void finish(Set<InetAddress> hintedNodes)
-        {
-            for (int i = 0; i < replayHandlers.size(); i++)
-            {
-                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
-                try
-                {
-                    handler.get();
-                }
-                catch (WriteTimeoutException|WriteFailureException e)
-                {
-                    logger.debug("Failed replaying a batched mutation to a node, will write a hint");
-                    logger.debug("Failure was : {}", e.getMessage());
-                    // writing hints for the rest to hints, starting from i
-                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
-                    return;
-                }
-            }
-        }
-
-        private List<Mutation> replayingMutations() throws IOException
-        {
-            DataInputPlus in = new DataInputBuffer(data, true);
-            int size = in.readInt();
-            List<Mutation> mutations = new ArrayList<>(size);
-            for (int i = 0; i < size; i++)
-            {
-                Mutation mutation = Mutation.serializer.deserialize(in, version);
-
-                // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
-                // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
-                // truncated.
-                for (UUID cfId : mutation.getColumnFamilyIds())
-                    if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
-                        mutation = mutation.without(cfId);
-
-                if (!mutation.isEmpty())
-                    mutations.add(mutation);
-            }
-            return mutations;
-        }
-
-        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
-        {
-            try
-            {
-                // Here we deserialize mutations 2nd time from byte buffer.
-                // but this is ok, because timeout on batch direct delivery is rare
-                // (it can happen only several seconds until node is marked dead)
-                // so trading some cpu to keep less objects
-                List<Mutation> replayingMutations = replayingMutations();
-                for (int i = startFrom; i < replayHandlers.size(); i++)
-                {
-                    Mutation undeliveredMutation = replayingMutations.get(i);
-                    int gcgs = gcgs(replayingMutations);
-                    ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
-
-                    if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs > FBUtilities.nowInSeconds() && handler != null)
-                    {
-                        hintedNodes.addAll(handler.undelivered);
-                        HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
-                                                    Hint.create(undeliveredMutation, writtenAt));
-                    }
-                }
-            }
-            catch (IOException e)
-            {
-                logger.error("Cannot schedule hints for undelivered batch", e);
-            }
-        }
-
-        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
-                                                                              long writtenAt,
-                                                                              Set<InetAddress> hintedNodes)
-        {
-            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
-            for (Mutation mutation : mutations)
-            {
-                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
-                if (handler != null)
-                    handlers.add(handler);
-            }
-            return handlers;
-        }
-
-        /**
-         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
-         * when a replica is down or a write request times out.
-         *
-         * @return direct delivery handler to wait on or null, if no live nodes found
-         */
-        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
-                                                                                     long writtenAt,
-                                                                                     Set<InetAddress> hintedNodes)
-        {
-            Set<InetAddress> liveEndpoints = new HashSet<>();
-            String ks = mutation.getKeyspaceName();
-            Token tk = mutation.key().getToken();
-
-            for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
-                                                         StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
-            {
-                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    mutation.apply();
-                }
-                else if (FailureDetector.instance.isAlive(endpoint))
-                {
-                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
-                }
-                else
-                {
-                    hintedNodes.add(endpoint);
-                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
-                                                Hint.create(mutation, writtenAt));
-                }
-            }
-
-            if (liveEndpoints.isEmpty())
-                return null;
-
-            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
-            MessageOut<Mutation> message = mutation.createMessage();
-            for (InetAddress endpoint : liveEndpoints)
-                MessagingService.instance().sendRR(message, endpoint, handler, false);
-            return handler;
-        }
-
-        private static int gcgs(Collection<Mutation> mutations)
-        {
-            int gcgs = Integer.MAX_VALUE;
-            for (Mutation mutation : mutations)
-                gcgs = Math.min(gcgs, mutation.smallestGCGS());
-            return gcgs;
-        }
-
-        /**
-         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
-         * which we did not receive a successful reply.
-         */
-        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
-        {
-            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
-
-            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
-            {
-                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
-                undelivered.addAll(writeEndpoints);
-            }
-
-            @Override
-            protected int totalBlockFor()
-            {
-                return this.naturalEndpoints.size();
-            }
-
-            @Override
-            public void response(MessageIn<T> m)
-            {
-                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
-                assert removed;
-                super.response(m);
-            }
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    private static void convertOldBatchEntries()
-    {
-        logger.debug("Started convertOldBatchEntries");
-
-        String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
-                                     SystemKeyspace.NAME,
-                                     SystemKeyspace.LEGACY_BATCHLOG);
-        UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE);
-        int convertedBatches = 0;
-        for (UntypedResultSet.Row row : batches)
-        {
-            UUID id = row.getUUID("id");
-            long timestamp = row.getLong("written_at");
-            int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
-            logger.debug("Converting mutation at " + timestamp);
-
-            UUID newId = id;
-            if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id))
-                newId = UUIDGen.getTimeUUID(timestamp, convertedBatches);
-            ++convertedBatches;
-
-            Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches,
-                                                   FBUtilities.timestampMicros(),
-                                                   newId)
-                    .clustering()
-                    .add("data", row.getBytes("data"))
-                    .add("version", version)
-                    .build();
-
-            addRow.apply();
-        }
-        if (convertedBatches > 0)
-            Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
-        // cleanup will be called after replay
-        logger.debug("Finished convertOldBatchEntries");
-    }
-
-    public static class EndpointFilter
-    {
-        private final String localRack;
-        private final Multimap<String, InetAddress> endpoints;
-
-        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
-        {
-            this.localRack = localRack;
-            this.endpoints = endpoints;
-        }
-
-        /**
-         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
-         */
-        public Collection<InetAddress> filter()
-        {
-            // special case for single-node data centers
-            if (endpoints.values().size() == 1)
-                return endpoints.values();
-
-            // strip out dead endpoints and localhost
-            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
-            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
-                if (isValid(entry.getValue()))
-                    validated.put(entry.getKey(), entry.getValue());
-
-            if (validated.size() <= 2)
-                return validated.values();
-
-            if (validated.size() - validated.get(localRack).size() >= 2)
-            {
-                // we have enough endpoints in other racks
-                validated.removeAll(localRack);
-            }
-
-            if (validated.keySet().size() == 1)
-            {
-                // we have only 1 `other` rack
-                Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
-                return Lists.newArrayList(Iterables.limit(otherRack, 2));
-            }
-
-            // randomize which racks we pick from if more than 2 remaining
-            Collection<String> racks;
-            if (validated.keySet().size() == 2)
-            {
-                racks = validated.keySet();
-            }
-            else
-            {
-                racks = Lists.newArrayList(validated.keySet());
-                Collections.shuffle((List<String>) racks);
-            }
-
-            // grab a random member of up to two racks
-            List<InetAddress> result = new ArrayList<>(2);
-            for (String rack : Iterables.limit(racks, 2))
-            {
-                List<InetAddress> rackMembers = validated.get(rack);
-                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
-            }
-
-            return result;
-        }
-
-        @VisibleForTesting
-        protected boolean isValid(InetAddress input)
-        {
-            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
-        }
-
-        @VisibleForTesting
-        protected int getRandomInt(int bound)
-        {
-            return ThreadLocalRandom.current().nextInt(bound);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java b/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
deleted file mode 100644
index a688117..0000000
--- a/src/java/org/apache/cassandra/db/BatchlogManagerMBean.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-public interface BatchlogManagerMBean
-{
-    /**
-     * Counts all batches currently in the batchlog.
-     *
-     * @return total batch count
-     */
-    public int countAllBatches();
-
-    /**
-     * @return total count of batches replayed since node start
-     */
-    public long getTotalBatchesReplayed();
-
-    /**
-     * Forces batchlog replay. Returns immediately if replay is already in progress.
-     */
-    public void forceBatchlogReplay() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d9ee38a..e349bfc 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -49,7 +49,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
         {
             public void run()
             {
-                MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from);
+                MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6e78b0e..da7d13d 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -58,7 +58,7 @@ public class Mutation implements IMutation
     public final long createdAt = System.currentTimeMillis();
     public Mutation(String keyspaceName, DecoratedKey key)
     {
-        this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
+        this(keyspaceName, key, new HashMap<>());
     }
 
     public Mutation(PartitionUpdate update)
@@ -201,6 +201,11 @@ public class Mutation implements IMutation
         ks.apply(this, ks.getMetadata().params.durableWrites);
     }
 
+    public void apply(boolean durableWrites)
+    {
+        Keyspace.open(keyspaceName).apply(this, durableWrites);
+    }
+
     public void applyUnsafe()
     {
         Keyspace.open(keyspaceName).apply(this, false);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 640e45f..d4670a2 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -18,10 +18,10 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
-import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.*;
@@ -29,31 +29,32 @@ import org.apache.cassandra.tracing.Tracing;
 
 public class MutationVerbHandler implements IVerbHandler<Mutation>
 {
-    private static final boolean TEST_FAIL_WRITES = System.getProperty("cassandra.test.fail_writes", "false").equalsIgnoreCase("true");
-
     public void doVerb(MessageIn<Mutation> message, int id)  throws IOException
     {
-            // Check if there were any forwarding headers in this message
-            byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
-            InetAddress replyTo;
-            if (from == null)
-            {
-                replyTo = message.from;
-                byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
-                if (forwardBytes != null)
-                    forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
-            }
-            else
-            {
-                replyTo = InetAddress.getByAddress(from);
-            }
+        // Check if there were any forwarding headers in this message
+        byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
+        InetAddress replyTo;
+        if (from == null)
+        {
+            replyTo = message.from;
+            byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
+            if (forwardBytes != null)
+                forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
+        }
+        else
+        {
+            replyTo = InetAddress.getByAddress(from);
+        }
 
         try
         {
-            message.payload.apply();
-            WriteResponse response = new WriteResponse();
+            if (message.version < MessagingService.VERSION_30 && LegacyBatchlogMigrator.isLegacyBatchlogMutation(message.payload))
+                LegacyBatchlogMigrator.handleLegacyMutation(message.payload);
+            else
+                message.payload.apply();
+
             Tracing.trace("Enqueuing response to {}", replyTo);
-            MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
+            MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo);
         }
         catch (WriteTimeoutException wto)
         {
@@ -65,7 +66,7 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
      * Older version (< 1.0) will not send this message at all, hence we don't
      * need to check the version of the data.
      */
-    private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
+    private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
     {
         try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
index 849ac70..2e499e7 100644
--- a/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -26,7 +26,6 @@ public class ReadRepairVerbHandler implements IVerbHandler<Mutation>
     public void doVerb(MessageIn<Mutation> message, int id)
     {
         message.payload.apply();
-        WriteResponse response = new WriteResponse();
-        MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+        MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index fb9eb48..cf8e14d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -113,7 +113,7 @@ public final class SystemKeyspace
                 "batches awaiting replay",
                 "CREATE TABLE %s ("
                 + "id timeuuid,"
-                + "data blob,"
+                + "mutations list<blob>,"
                 + "version int,"
                 + "PRIMARY KEY ((id)))")
                 .copy(new LocalPartitioner(TimeUUIDType.instance))


[2/4] cassandra git commit: Improve batchlog write path

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index 824368e..0dddaab 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -28,16 +28,22 @@ import org.apache.cassandra.net.MessagingService;
 /*
  * This empty response is sent by a replica to inform the coordinator that the write succeeded
  */
-public class WriteResponse
+public final class WriteResponse
 {
-    public static final WriteResponseSerializer serializer = new WriteResponseSerializer();
+    public static final Serializer serializer = new Serializer();
 
-    public MessageOut<WriteResponse> createMessage()
+    private static final WriteResponse instance = new WriteResponse();
+
+    private WriteResponse()
+    {
+    }
+
+    public static MessageOut<WriteResponse> createMessage()
     {
-        return new MessageOut<WriteResponse>(MessagingService.Verb.REQUEST_RESPONSE, this, serializer);
+        return new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, instance, serializer);
     }
 
-    public static class WriteResponseSerializer implements IVersionedSerializer<WriteResponse>
+    public static class Serializer implements IVersionedSerializer<WriteResponse>
     {
         public void serialize(WriteResponse wm, DataOutputPlus out, int version) throws IOException
         {
@@ -45,7 +51,7 @@ public class WriteResponse
 
         public WriteResponse deserialize(DataInputPlus in, int version) throws IOException
         {
-            return new WriteResponse();
+            return instance;
         }
 
         public long serializedSize(WriteResponse response, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
index 2797495..56727fc 100644
--- a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
+++ b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
@@ -65,8 +65,8 @@ final class EncodedHintMessage
             if (version != message.version)
                 throw new IllegalArgumentException("serializedSize() called with non-matching version " + version);
 
-            int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
-            size += TypeSizes.sizeof(message.hint.remaining());
+            long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
+            size += TypeSizes.sizeofVInt(message.hint.remaining());
             size += message.hint.remaining();
             return size;
         }
@@ -77,7 +77,7 @@ final class EncodedHintMessage
                 throw new IllegalArgumentException("serialize() called with non-matching version " + version);
 
             UUIDSerializer.serializer.serialize(message.hostId, out, version);
-            out.writeInt(message.hint.remaining());
+            out.writeVInt(message.hint.remaining());
             out.write(message.hint);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index d8f85c5..c88c494 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -26,6 +26,9 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofVInt;
+
 /**
  * Encapsulates the hinted mutation, its creation time, and the gc grace seconds param for each table involved.
  *
@@ -107,8 +110,8 @@ public final class Hint
     {
         public long serializedSize(Hint hint, int version)
         {
-            long size = TypeSizes.sizeof(hint.creationTime);
-            size += TypeSizes.sizeof(hint.gcgs);
+            long size = sizeof(hint.creationTime);
+            size += sizeofVInt(hint.gcgs);
             size += Mutation.serializer.serializedSize(hint.mutation, version);
             return size;
         }
@@ -116,14 +119,14 @@ public final class Hint
         public void serialize(Hint hint, DataOutputPlus out, int version) throws IOException
         {
             out.writeLong(hint.creationTime);
-            out.writeInt(hint.gcgs);
+            out.writeVInt(hint.gcgs);
             Mutation.serializer.serialize(hint.mutation, out, version);
         }
 
         public Hint deserialize(DataInputPlus in, int version) throws IOException
         {
             long creationTime = in.readLong();
-            int gcgs = in.readInt();
+            int gcgs = (int) in.readVInt();
             return new Hint(Mutation.serializer.deserialize(in, version), creationTime, gcgs);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/HintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index 89baa89..6296a8c 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -24,6 +24,8 @@ import java.util.UUID;
 
 import javax.annotation.Nullable;
 
+import com.google.common.primitives.Ints;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -81,10 +83,10 @@ public final class HintMessage
     {
         public long serializedSize(HintMessage message, int version)
         {
-            int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
+            long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
 
-            int hintSize = (int) Hint.serializer.serializedSize(message.hint, version);
-            size += TypeSizes.sizeof(hintSize);
+            long hintSize = Hint.serializer.serializedSize(message.hint, version);
+            size += TypeSizes.sizeofVInt(hintSize);
             size += hintSize;
 
             return size;
@@ -100,7 +102,7 @@ public final class HintMessage
              * We are serializing the hint size so that the receiver of the message could gracefully handle
              * deserialize failure when a table had been dropped, by simply skipping the unread bytes.
              */
-            out.writeInt((int) Hint.serializer.serializedSize(message.hint, version));
+            out.writeVInt(Hint.serializer.serializedSize(message.hint, version));
 
             Hint.serializer.serialize(message.hint, out, version);
         }
@@ -114,7 +116,7 @@ public final class HintMessage
         {
             UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
 
-            int hintSize = in.readInt();
+            long hintSize = in.readVInt();
             BytesReadTracker countingIn = new BytesReadTracker(in);
             try
             {
@@ -122,7 +124,7 @@ public final class HintMessage
             }
             catch (UnknownColumnFamilyException e)
             {
-                in.skipBytes(hintSize - (int) countingIn.getBytesRead());
+                in.skipBytes(Ints.checkedCast(hintSize - countingIn.getBytesRead()));
                 return new HintMessage(hostId, e.cfId);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
index 196f184..b0095ed 100644
--- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
+++ b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
@@ -92,7 +92,7 @@ public final class LegacyHintsMigrator
         compactLegacyHints();
 
         // paginate over legacy hints and write them to the new storage
-        logger.info("Migrating legacy hints to the new storage");
+        logger.info("Writing legacy hints to the new storage");
         migrateLegacyHints();
 
         // truncate the legacy hints table

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e59cd58..15199fe 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.concurrent.TracingAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.dht.IPartitioner;
@@ -103,8 +104,8 @@ public final class MessagingService implements MessagingServiceMBean
         READ_REPAIR,
         READ,
         REQUEST_RESPONSE, // client-initiated reads and writes
-        @Deprecated STREAM_INITIATE,
-        @Deprecated STREAM_INITIATE_DONE,
+        BATCH_STORE,  // was @Deprecated STREAM_INITIATE,
+        BATCH_REMOVE, // was @Deprecated STREAM_INITIATE_DONE,
         @Deprecated STREAM_REPLY,
         @Deprecated STREAM_REQUEST,
         RANGE_SLICE,
@@ -135,7 +136,6 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
         @Deprecated PAGED_RANGE,
-        BATCHLOG_MUTATION,
         // remember to add new verbs at the end, since we serialize by ordinal
         UNUSED_1,
         UNUSED_2,
@@ -149,13 +149,14 @@ public final class MessagingService implements MessagingServiceMBean
     {{
         put(Verb.MUTATION, Stage.MUTATION);
         put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
-        put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
         put(Verb.READ_REPAIR, Stage.MUTATION);
         put(Verb.HINT, Stage.MUTATION);
         put(Verb.TRUNCATE, Stage.MUTATION);
         put(Verb.PAXOS_PREPARE, Stage.MUTATION);
         put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
         put(Verb.PAXOS_COMMIT, Stage.MUTATION);
+        put(Verb.BATCH_STORE, Stage.MUTATION);
+        put(Verb.BATCH_REMOVE, Stage.MUTATION);
 
         put(Verb.READ, Stage.READ);
         put(Verb.RANGE_SLICE, Stage.READ);
@@ -209,7 +210,6 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
 
         put(Verb.MUTATION, Mutation.serializer);
-        put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
         put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
@@ -229,6 +229,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.PAXOS_PROPOSE, Commit.serializer);
         put(Verb.PAXOS_COMMIT, Commit.serializer);
         put(Verb.HINT, HintMessage.serializer);
+        put(Verb.BATCH_STORE, Batch.serializer);
+        put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
     }};
 
     /**
@@ -238,7 +240,6 @@ public final class MessagingService implements MessagingServiceMBean
     {{
         put(Verb.MUTATION, WriteResponse.serializer);
         put(Verb.HINT, HintResponse.serializer);
-        put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
@@ -254,6 +255,9 @@ public final class MessagingService implements MessagingServiceMBean
 
         put(Verb.PAXOS_PREPARE, PrepareResponse.serializer);
         put(Verb.PAXOS_PROPOSE, BooleanSerializer.serializer);
+
+        put(Verb.BATCH_STORE, WriteResponse.serializer);
+        put(Verb.BATCH_REMOVE, WriteResponse.serializer);
     }};
 
     /* This records all the results mapped by message Id */
@@ -286,7 +290,7 @@ public final class MessagingService implements MessagingServiceMBean
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
-    private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
+    private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();
 
     private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
     private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
@@ -301,14 +305,15 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE,
                                                                    Verb.MUTATION,
-                                                                   Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
                                                                    Verb.COUNTER_MUTATION,
                                                                    Verb.HINT,
                                                                    Verb.READ_REPAIR,
                                                                    Verb.READ,
                                                                    Verb.RANGE_SLICE,
                                                                    Verb.PAGED_RANGE,
-                                                                   Verb.REQUEST_RESPONSE);
+                                                                   Verb.REQUEST_RESPONSE,
+                                                                   Verb.BATCH_STORE,
+                                                                   Verb.BATCH_REMOVE);
 
 
     private static final class DroppedMessages
@@ -372,7 +377,7 @@ public final class MessagingService implements MessagingServiceMBean
             droppedMessagesMap.put(verb, new DroppedMessages(verb));
 
         listenGate = new SimpleCondition();
-        verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class);
+        verbHandlers = new EnumMap<>(Verb.class);
         if (!testOnly)
         {
             Runnable logDropped = new Runnable()
@@ -630,7 +635,6 @@ public final class MessagingService implements MessagingServiceMBean
                            boolean allowHints)
     {
         assert message.verb == Verb.MUTATION
-            || message.verb == Verb.BATCHLOG_MUTATION
             || message.verb == Verb.COUNTER_MUTATION
             || message.verb == Verb.PAXOS_COMMIT;
         int messageId = nextId();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index cf1e021..c8b9677 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -27,7 +27,6 @@ import java.rmi.registry.LocateRegistry;
 import java.rmi.server.RMIServerSocketFactory;
 import java.util.Collections;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -41,7 +40,6 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistryListener;
 import com.codahale.metrics.SharedMetricRegistries;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.slf4j.Logger;
@@ -52,6 +50,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.StartupException;
@@ -280,6 +279,9 @@ public class CassandraDaemon
         // migrate any legacy (pre-3.0) hints from system.hints table into the new store
         new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
 
+        // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
+        LegacyBatchlogMigrator.migrate();
+
         // enable auto compaction
         for (Keyspace keyspace : Keyspace.all())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 4952959..59f1c1c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,12 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
@@ -44,14 +41,19 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.batchlog.*;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.MaterializedViewManager;
 import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
@@ -67,8 +69,8 @@ import org.apache.cassandra.service.paxos.PrepareCallback;
 import org.apache.cassandra.service.paxos.ProposeCallback;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.AbstractIterator;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -687,31 +689,19 @@ public class StorageProxy implements StorageProxyMBean
                                                                                  WriteType.BATCH,
                                                                                  cleanup);
 
-                //When local node is the endpoint and there are no pending nodes we can
+                // When local node is the endpoint and there are no pending nodes we can
                 // Just apply the mutation locally.
-                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                    wrapper.handler.pendingEndpoints.isEmpty())
-                {
-                    if (writeCommitLog)
-                        mutation.apply();
-                    else
-                        mutation.applyUnsafe();
-                }
+                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty())
+                    mutation.apply(writeCommitLog);
                 else
-                {
                     wrappers.add(wrapper);
-                }
             }
 
             if (!wrappers.isEmpty())
             {
-                Mutation blMutation = BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version);
-
-                //Apply to local batchlog memtable in this thread
-                if (writeCommitLog)
-                    blMutation.apply();
-                else
-                    blMutation.applyUnsafe();
+                // Apply to local batchlog memtable in this thread
+                BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)),
+                                      writeCommitLog);
 
                 // now actually perform the writes and wait for them to complete
                 asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
@@ -781,16 +771,10 @@ public class StorageProxy implements StorageProxyMBean
                     batchConsistencyLevel = consistency_level;
             }
 
-            final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+            final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
             final UUID batchUUID = UUIDGen.getTimeUUID();
             BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                                                          new BatchlogResponseHandler.BatchlogCleanupCallback()
-                                                                                                          {
-                                                                                                              public void invoke()
-                                                                                                              {
-                                                                                                                  asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
-                                                                                                              }
-                                                                                                          });
+                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
 
             // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
@@ -840,75 +824,64 @@ public class StorageProxy implements StorageProxyMBean
         return replica.equals(FBUtilities.getBroadcastAddress());
     }
 
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
+    throws WriteTimeoutException, WriteFailureException
+    {
+        WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
+                                                                     Collections.<InetAddress>emptyList(),
+                                                                     endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
+                                                                     Keyspace.open(SystemKeyspace.NAME),
+                                                                     null,
+                                                                     WriteType.BATCH_LOG);
+
+        Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
+
+        if (!endpoints.current.isEmpty())
+            syncWriteToBatchlog(handler, batch, endpoints.current);
+
+        if (!endpoints.legacy.isEmpty())
+            LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy);
+
+        handler.get();
+    }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
+    private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
     throws WriteTimeoutException, WriteFailureException
     {
-        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
-                                                                        Collections.<InetAddress>emptyList(),
-                                                                        ConsistencyLevel.ONE,
-                                                                        Keyspace.open(SystemKeyspace.NAME),
-                                                                        null,
-                                                                        WriteType.BATCH_LOG);
+        MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
 
-        MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
-                                                      .createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
         for (InetAddress target : endpoints)
         {
-            int targetVersion = MessagingService.instance().getVersion(target);
+            logger.debug("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
             if (canDoLocalRequest(target))
-            {
-                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
-            }
-            else if (targetVersion < MessagingService.VERSION_30)
-            {
-                MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
-                                                                  .createMessage(MessagingService.Verb.MUTATION),
-                                                   target,
-                                                   handler,
-                                                   false);
-            }
+                performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler);
             else
-            {
-                MessagingService.instance().sendRR(message, target, handler, false);
-            }
+                MessagingService.instance().sendRR(message, target, handler);
         }
+    }
 
-        handler.get();
+    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
+    {
+        if (!endpoints.current.isEmpty())
+            asyncRemoveFromBatchlog(endpoints.current, uuid);
+
+        if (!endpoints.legacy.isEmpty())
+            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
     }
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
     {
-        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
-                                                                        Collections.<InetAddress>emptyList(),
-                                                                        ConsistencyLevel.ANY,
-                                                                        Keyspace.open(SystemKeyspace.NAME),
-                                                                        null,
-                                                                        WriteType.SIMPLE);
-        Mutation mutation = new Mutation(
-                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
-                                                    UUIDType.instance.decompose(uuid),
-                                                    FBUtilities.timestampMicros(),
-                                                    FBUtilities.nowInSeconds()));
-        MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
+        MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer);
         for (InetAddress target : endpoints)
         {
-            int targetVersion = MessagingService.instance().getVersion(target);
+            if (logger.isDebugEnabled())
+                logger.debug("Sending batchlog remove request {} to {}", uuid, target);
+
             if (canDoLocalRequest(target))
-            {
-                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
-            }
-            else if (targetVersion < MessagingService.VERSION_30)
-            {
-                MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION),
-                                                   target,
-                                                   handler,
-                                                   false);
-            }
+                performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid));
             else
-            {
-                MessagingService.instance().sendRR(message, target, handler, false);
-            }
+                MessagingService.instance().sendOneWay(message, target);
         }
     }
 
@@ -1034,13 +1007,38 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /*
+     * A class to filter batchlog endpoints into legacy endpoints (version < 3.0) or not.
+     */
+    private static final class BatchlogEndpoints
+    {
+        public final Collection<InetAddress> all;
+        public final Collection<InetAddress> current;
+        public final Collection<InetAddress> legacy;
+
+        BatchlogEndpoints(Collection<InetAddress> endpoints)
+        {
+            all = endpoints;
+            current = new ArrayList<>(2);
+            legacy = new ArrayList<>(2);
+
+            for (InetAddress ep : endpoints)
+            {
+                if (MessagingService.instance().getVersion(ep) >= MessagingService.VERSION_30)
+                    current.add(ep);
+                else
+                    legacy.add(ep);
+            }
+        }
+    }
+
+    /*
      * Replicas are picked manually:
      * - replicas should be alive according to the failure detector
      * - replicas should be in the local datacenter
      * - choose min(2, number of qualifying candiates above)
      * - allow the local node to be the only replica only if it's a single-node DC
      */
-    private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+    private static BatchlogEndpoints getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
     throws UnavailableException
     {
         TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
@@ -1051,12 +1049,12 @@ public class StorageProxy implements StorageProxyMBean
         if (chosenEndpoints.isEmpty())
         {
             if (consistencyLevel == ConsistencyLevel.ANY)
-                return Collections.singleton(FBUtilities.getBroadcastAddress());
+                return new BatchlogEndpoints(Collections.singleton(FBUtilities.getBroadcastAddress()));
 
             throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
         }
 
-        return chosenEndpoints;
+        return new BatchlogEndpoints(chosenEndpoints);
     }
 
     /**
@@ -1109,7 +1107,8 @@ public class StorageProxy implements StorageProxyMBean
                 if (canDoLocalRequest(destination))
                 {
                     insertLocal = true;
-                } else
+                }
+                else
                 {
                     // belongs on a different server
                     if (message == null)
@@ -1120,14 +1119,15 @@ public class StorageProxy implements StorageProxyMBean
                     if (localDataCenter.equals(dc))
                     {
                         MessagingService.instance().sendRR(message, destination, responseHandler, true);
-                    } else
+                    }
+                    else
                     {
                         Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
                         if (messages == null)
                         {
-                            messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+                            messages = new ArrayList<>(3); // most DCs will have <= 3 replicas
                             if (dcGroups == null)
-                                dcGroups = new HashMap<String, Collection<InetAddress>>();
+                                dcGroups = new HashMap<>();
                             dcGroups.put(dc, messages);
                         }
                         messages.add(destination);
@@ -1149,7 +1149,7 @@ public class StorageProxy implements StorageProxyMBean
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
-            insertLocal(stage, mutation, responseHandler);
+            performLocally(stage, mutation::apply, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
+    private static void performLocally(Stage stage, final Runnable runnable)
     {
         StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
         {
@@ -1206,14 +1206,32 @@ public class StorageProxy implements StorageProxyMBean
             {
                 try
                 {
-                    mutation.apply();
-                    responseHandler.response(null);
+                    runnable.run();
+                }
+                catch (Exception ex)
+                {
+                    logger.error("Failed to apply mutation locally : {}", ex);
+                }
+            }
+        });
+    }
+
+    private static void performLocally(Stage stage, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
+    {
+        StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
+        {
+            public void runMayThrow()
+            {
+                try
+                {
+                    runnable.run();
+                    handler.response(null);
                 }
                 catch (Exception ex)
                 {
                     if (!(ex instanceof WriteTimeoutException))
                         logger.error("Failed to apply mutation locally : {}", ex);
-                    responseHandler.onFailure(FBUtilities.getBroadcastAddress());
+                    handler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a6c2f8b..13dc29c7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -55,6 +55,9 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
+import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
+import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.TransactionLog;
@@ -282,7 +285,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         /* register the verb handlers */
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
@@ -311,6 +313,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler());
+
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler());
     }
 
     public void registerDaemon(CassandraDaemon daemon)
@@ -620,12 +625,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             {
                 inShutdownHook = true;
                 ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
-                ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
                 ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
                 ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
                 if (mutationStage.isShutdown()
                     && counterMutationStage.isShutdown()
-                    && batchlogMutationStage.isShutdown()
                     && materializedViewMutationStage.isShutdown())
                     return; // drained already
 
@@ -638,12 +641,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // before mutation stage, so we can get all the hints saved before shutting down
                 MessagingService.instance().shutdown();
                 materializedViewMutationStage.shutdown();
-                batchlogMutationStage.shutdown();
                 HintsService.instance.pauseDispatch();
                 counterMutationStage.shutdown();
                 mutationStage.shutdown();
                 materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
-                batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 StorageProxy.instance.verifyNoHintsInProgress();
@@ -3846,17 +3847,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         inShutdownHook = true;
 
-        BatchlogManager.shutdown();
+        BatchlogManager.instance.shutdown();
 
         HintsService.instance.pauseDispatch();
 
         ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
-        ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
         ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
         if (mutationStage.isTerminated()
             && counterMutationStage.isTerminated()
-            && batchlogMutationStage.isTerminated()
             && materializedViewMutationStage.isTerminated())
         {
             logger.warn("Cannot drain node (did it already happen?)");
@@ -3872,11 +3871,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         setMode(Mode.DRAINING, "clearing mutation stage", false);
         materializedViewMutationStage.shutdown();
-        batchlogMutationStage.shutdown();
         counterMutationStage.shutdown();
         mutationStage.shutdown();
         materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
-        batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 
@@ -3913,6 +3910,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         FBUtilities.waitOnFutures(flushes);
 
+        BatchlogManager.instance.shutdown();
+
         HintsService.instance.shutdownBlocking();
 
         // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
index 213023e..a702a4d 100644
--- a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
@@ -1,4 +1,3 @@
-package org.apache.cassandra.service.paxos;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,7 @@ package org.apache.cassandra.service.paxos;
  * under the License.
  * 
  */
-
+package org.apache.cassandra.service.paxos;
 
 import org.apache.cassandra.db.WriteResponse;
 import org.apache.cassandra.net.IVerbHandler;
@@ -33,8 +32,7 @@ public class CommitVerbHandler implements IVerbHandler<Commit>
     {
         PaxosState.commit(message.payload);
 
-        WriteResponse response = new WriteResponse();
         Tracing.trace("Enqueuing acknowledge to {}", message.from);
-        MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+        MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 6909ea4..5f77097 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -54,8 +54,8 @@ import javax.management.remote.JMXServiceURL;
 import javax.rmi.ssl.SslRMIClientSocketFactory;
 
 import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.db.BatchlogManager;
-import org.apache.cassandra.db.BatchlogManagerMBean;
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.batchlog.BatchlogManagerMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.HintedHandOffManagerMBean;
 import org.apache.cassandra.db.compaction.CompactionManager;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
index 9738103..b833e60 100644
--- a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
@@ -35,7 +35,7 @@ import com.datastax.driver.core.exceptions.WriteTimeoutException;
 import org.apache.cassandra.concurrent.SEPExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.BatchlogManager;
+import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class MaterializedViewLongTest extends CQLTester

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchTest.java b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
new file mode 100644
index 0000000..b7a4100
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+
+public class BatchTest
+{
+    private static final String KEYSPACE = "BatchRequestTest";
+    private static final String CF_STANDARD = "Standard";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD, 1, BytesType.instance));
+    }
+
+    @Test
+    public void testSerialization() throws IOException
+    {
+        CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata;
+
+        long now = FBUtilities.timestampMicros();
+        int version = MessagingService.current_version;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int i = 0; i < 10; i++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+                          .clustering("name" + i)
+                          .add("val", "val" + i)
+                          .build());
+        }
+
+        Batch batch1 = Batch.createLocal(uuid, now, mutations);
+        assertEquals(uuid, batch1.id);
+        assertEquals(now, batch1.creationTime);
+        assertEquals(mutations, batch1.decodedMutations);
+
+        DataOutputBuffer out = new DataOutputBuffer();
+        Batch.serializer.serialize(batch1, out, version);
+
+        assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version));
+
+        DataInputPlus dis = new DataInputBuffer(out.getData());
+        Batch batch2 = Batch.serializer.deserialize(dis, version);
+
+        assertEquals(batch1.id, batch2.id);
+        assertEquals(batch1.creationTime, batch2.creationTime);
+        assertEquals(batch1.decodedMutations.size(), batch2.encodedMutations.size());
+
+        Iterator<Mutation> it1 = batch1.decodedMutations.iterator();
+        Iterator<ByteBuffer> it2 = batch2.encodedMutations.iterator();
+        while (it1.hasNext())
+        {
+            try (DataInputBuffer in = new DataInputBuffer(it2.next().array()))
+            {
+                assertEquals(it1.next().toString(), Mutation.serializer.deserialize(in, version).toString());
+            }
+        }
+    }
+
+    /**
+     * This is just to test decodeMutations() when deserializing,
+     * since Batch will never be serialized at a version 2.2.
+     * @throws IOException
+     */
+    @Test
+    public void testSerializationNonCurrentVersion() throws IOException
+    {
+        CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata;
+
+        long now = FBUtilities.timestampMicros();
+        int version = MessagingService.VERSION_22;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int i = 0; i < 10; i++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+                          .clustering("name" + i)
+                          .add("val", "val" + i)
+                          .build());
+        }
+
+        Batch batch1 = Batch.createLocal(uuid, now, mutations);
+        assertEquals(uuid, batch1.id);
+        assertEquals(now, batch1.creationTime);
+        assertEquals(mutations, batch1.decodedMutations);
+
+        DataOutputBuffer out = new DataOutputBuffer();
+        Batch.serializer.serialize(batch1, out, version);
+
+        assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version));
+
+        DataInputPlus dis = new DataInputBuffer(out.getData());
+        Batch batch2 = Batch.serializer.deserialize(dis, version);
+
+        assertEquals(batch1.id, batch2.id);
+        assertEquals(batch1.creationTime, batch2.creationTime);
+        assertEquals(batch1.decodedMutations.size(), batch2.decodedMutations.size());
+
+        Iterator<Mutation> it1 = batch1.decodedMutations.iterator();
+        Iterator<Mutation> it2 = batch2.decodedMutations.iterator();
+        while (it1.hasNext())
+            assertEquals(it1.next().toString(), it2.next().toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
new file mode 100644
index 0000000..23aeaaa
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class BatchlogEndpointFilterTest
+{
+    private static final String LOCAL = "local";
+
+    @Test
+    public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("2", InetAddress.getByName("2"))
+                .put("2", InetAddress.getByName("22"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+    }
+
+    @Test
+    public void shouldSelectHostFromLocal() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+
+    @Test
+    public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(1));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+
+    @Test
+    public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("1", InetAddress.getByName("111"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        // result should contain random two distinct values
+        assertThat(new HashSet<>(result).size(), is(2));
+    }
+
+    private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
+    {
+        TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+        {
+            super(localRack, endpoints);
+        }
+
+        @Override
+        protected boolean isValid(InetAddress input)
+        {
+            // We will use always alive non-localhost endpoints
+            return true;
+        }
+
+        @Override
+        protected int getRandomInt(int bound)
+        {
+            // We don't need random behavior here
+            return bound - 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
new file mode 100644
index 0000000..dfb17c3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import org.junit.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.junit.Assert.*;
+
+public class BatchlogManagerTest
+{
+    private static final String KEYSPACE1 = "BatchlogManagerTest1";
+    private static final String CF_STANDARD1 = "Standard1";
+    private static final String CF_STANDARD2 = "Standard2";
+    private static final String CF_STANDARD3 = "Standard3";
+    private static final String CF_STANDARD4 = "Standard4";
+    private static final String CF_STANDARD5 = "Standard5";
+
+    static PartitionerSwitcher sw;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        sw = Util.switchPartitioner(Murmur3Partitioner.instance);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5, 1, BytesType.instance));
+    }
+
+    @AfterClass
+    public static void cleanup()
+    {
+        sw.close();
+    }
+
+    @Before
+    @SuppressWarnings("deprecation")
+    public void setUp() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        InetAddress localhost = InetAddress.getByName("127.0.0.1");
+        metadata.updateNormalToken(Util.token("A"), localhost);
+        metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking();
+    }
+
+    @Test
+    public void testDelete()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+        CFMetaData cfm = cfs.metadata;
+        new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234"))
+                .clustering("c")
+                .add("val", "val" + 1234)
+                .build()
+                .applyUnsafe();
+
+        DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
+        ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
+        Iterator<Row> iter = results.iterator();
+        assert iter.hasNext();
+
+        Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
+                                                         dk,
+                                                         FBUtilities.timestampMicros(),
+                                                         FBUtilities.nowInSeconds()));
+        mutation.applyUnsafe();
+
+        Util.assertEmpty(Util.cmd(cfs, dk).build());
+    }
+
+    @Test
+    public void testReplay() throws Exception
+    {
+        testReplay(false);
+    }
+
+    @Test
+    public void testLegacyReplay() throws Exception
+    {
+        testReplay(true);
+    }
+
+    @SuppressWarnings("deprecation")
+    private static void testReplay(boolean legacy) throws Exception
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+
+        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
+
+        // Generate 1000 mutations (100 batches of 10 mutations each) and put them all into the batchlog.
+        // Half batches (50) ready to be replayed, half not.
+        for (int i = 0; i < 100; i++)
+        {
+            List<Mutation> mutations = new ArrayList<>(10);
+            for (int j = 0; j < 10; j++)
+            {
+                mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                              .clustering("name" + j)
+                              .add("val", "val" + j)
+                              .build());
+            }
+
+            long timestamp = i < 50
+                           ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+            if (legacy)
+                LegacyBatchlogMigrator.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations), MessagingService.current_version);
+            else
+                BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations));
+        }
+
+        if (legacy)
+        {
+            Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
+            LegacyBatchlogMigrator.migrate();
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+        assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.startBatchlogReplay().get();
+
+        // Ensure that the first half, and only the first half, got replayed.
+        assertEquals(50, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(50, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        for (int i = 0; i < 100; i++)
+        {
+            String query = String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i);
+            UntypedResultSet result = executeInternal(query);
+            assertNotNull(result);
+            if (i < 50)
+            {
+                Iterator<UntypedResultSet.Row> it = result.iterator();
+                assertNotNull(it);
+                for (int j = 0; j < 10; j++)
+                {
+                    assertTrue(it.hasNext());
+                    UntypedResultSet.Row row = it.next();
+
+                    assertEquals(ByteBufferUtil.bytes(i), row.getBytes("key"));
+                    assertEquals("name" + j, row.getString("name"));
+                    assertEquals("val" + j, row.getString("val"));
+                }
+
+                assertFalse(it.hasNext());
+            }
+            else
+            {
+                assertTrue(result.isEmpty());
+            }
+        }
+
+        // Ensure that no stray mutations got somehow applied.
+        UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1));
+        assertNotNull(result);
+        assertEquals(500, result.one().getLong("count"));
+    }
+
+    @Test
+    public void testTruncatedReplay() throws InterruptedException, ExecutionException
+    {
+        CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
+        CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
+        // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+        // Each batchlog entry with a mutation for Standard2 and Standard3.
+        // In the middle of the process, 'truncate' Standard2.
+        for (int i = 0; i < 1000; i++)
+        {
+            Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+            Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
+
+            // Make sure it's ready to be replayed, so adjust the timestamp.
+            long timestamp = System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout();
+
+            if (i == 500)
+                SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2),
+                                                    timestamp,
+                                                    ReplayPosition.NONE);
+
+            // Adjust the timestamp (slightly) to make the test deterministic.
+            if (i >= 500)
+                timestamp++;
+            else
+                timestamp--;
+
+            BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), FBUtilities.timestampMicros(), mutations));
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.startBatchlogReplay().get();
+
+        // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i));
+            assertNotNull(result);
+            if (i >= 500)
+            {
+                assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+                assertEquals("name" + i, result.one().getString("name"));
+                assertEquals("val" + i, result.one().getString("val"));
+            }
+            else
+            {
+                assertTrue(result.isEmpty());
+            }
+        }
+
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
+            assertNotNull(result);
+            assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+            assertEquals("name" + i, result.one().getString("name"));
+            assertEquals("val" + i, result.one().getString("val"));
+        }
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testConversion() throws Exception
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+        CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
+
+        // Generate 1400 version 2.0 mutations and put them all into the batchlog.
+        // Half ready to be replayed, half not.
+        for (int i = 0; i < 1400; i++)
+        {
+            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            long timestamp = i < 700
+                           ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+
+            Mutation batchMutation = LegacyBatchlogMigrator.getStoreMutation(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
+                                                                                               TimeUnit.MILLISECONDS.toMicros(timestamp),
+                                                                                               Collections.singleton(mutation)),
+                                                                             MessagingService.VERSION_20);
+            assertTrue(LegacyBatchlogMigrator.isLegacyBatchlogMutation(batchMutation));
+            LegacyBatchlogMigrator.handleLegacyMutation(batchMutation);
+        }
+
+        // Mix in 100 current version mutations, 50 ready for replay.
+        for (int i = 1400; i < 1500; i++)
+        {
+            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            long timestamp = i < 1450
+                           ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+
+            BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
+                                                    FBUtilities.timestampMicros(),
+                                                    Collections.singleton(mutation)));
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+        assertEquals(1500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+        assertNotNull(result);
+        assertEquals("Count in blog legacy", 0, result.one().getLong("count"));
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+        assertNotNull(result);
+        assertEquals("Count in blog", 1500, result.one().getLong("count"));
+
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.performInitialReplay();
+
+        // Ensure that the first half, and only the first half, got replayed.
+        assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        for (int i = 0; i < 1500; i++)
+        {
+            result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
+            assertNotNull(result);
+            if (i < 700 || i >= 1400 && i < 1450)
+            {
+                assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+                assertEquals("name" + i, result.one().getString("name"));
+                assertEquals("val" + i, result.one().getString("val"));
+            }
+            else
+            {
+                assertTrue("Present at " + i, result.isEmpty());
+            }
+        }
+
+        // Ensure that no stray mutations got somehow applied.
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
+        assertNotNull(result);
+        assertEquals(750, result.one().getLong("count"));
+
+        // Ensure batchlog is left as expected.
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+        assertNotNull(result);
+        assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+        assertNotNull(result);
+        assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
+    }
+
+    @Test
+    public void testAddBatch() throws IOException
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+
+        long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        // Add a batch with 10 mutations
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int j = 0; j < 10; j++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
+                          .clustering("name" + j)
+                          .add("val", "val" + j)
+                          .build());
+        }
+
+
+        BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
+        Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
+
+        String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.BATCHES,
+                                     uuid);
+        UntypedResultSet result = executeInternal(query);
+        assertNotNull(result);
+        assertEquals(1L, result.one().getLong("count"));
+    }
+
+    @Test
+    public void testRemoveBatch()
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+
+        long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        // Add a batch with 10 mutations
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int j = 0; j < 10; j++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
+                          .clustering("name" + j)
+                          .add("val", "val" + j)
+                          .build());
+        }
+
+        // Store the batch
+        BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
+        Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
+
+        // Remove the batch
+        BatchlogManager.remove(uuid);
+
+        assertEquals(initialAllBatches, BatchlogManager.instance.countAllBatches());
+
+        String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.BATCHES,
+                                     uuid);
+        UntypedResultSet result = executeInternal(query);
+        assertNotNull(result);
+        assertEquals(0L, result.one().getLong("count"));
+    }
+}