You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/06 16:33:41 UTC

[1/2] cassandra git commit: Optimize batchlog replay to avoid full scans

Repository: cassandra
Updated Branches:
  refs/heads/trunk bf8ac1acd -> c35bfc09c


Optimize batchlog replay to avoid full scans

patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-7237


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/762db474
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/762db474
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/762db474

Branch: refs/heads/trunk
Commit: 762db474273f764b189d3613fce33943cd64701b
Parents: ef59624
Author: Branimir Lambov <br...@datastax.com>
Authored: Sat Aug 1 11:55:47 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Aug 6 17:12:28 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 NEWS.txt                                        |   2 +
 .../apache/cassandra/db/BatchlogManager.java    | 226 +++++++++--------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 +
 src/java/org/apache/cassandra/db/Memtable.java  |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  28 ++-
 .../apache/cassandra/dht/LocalPartitioner.java  |  30 ++-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../cassandra/db/BatchlogManagerTest.java       | 246 +++++++++++++++----
 9 files changed, 395 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80e0e50..95fade9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta1
+ * Optimize batchlog replay to avoid full scans (CASSANDRA-7237)
  * Repair improvements when using vnodes (CASSANDRA-5220)
  * Disable scripted UDFs by default (CASSANDRA-9889)
  * Add transparent data encryption core classes (CASSANDRA-9945)
@@ -11,6 +12,7 @@ Merged from 2.1:
 Merged from 2.0:
  * Don't cast expected bf size to an int (CASSANDRA-9959)
 
+
 3.0.0-alpha1
  * Implement proper sandboxing for UDFs (CASSANDRA-9402)
  * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1fcbb12..ef61f6c 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -58,6 +58,8 @@ Upgrading
      be done by setting the new option `enabled` to `false`.
    - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
      has been deprecated since 2.1.0 and is being removed in 3.0.0.
+   - Batchlog entries are now stored in a new table - system.batches.
+     The old one has been deprecated.
 
 
 2.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 9e90d9d..8ea4318 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -23,30 +23,24 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -57,20 +51,22 @@ import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
 
 public class BatchlogManager implements BatchlogManagerMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
-    private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
-    private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics. TODO: set based on avg batch size.
+    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
+    private static final int DEFAULT_PAGE_SIZE = 128;
 
     private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
     public static final BatchlogManager instance = new BatchlogManager();
 
-    private final AtomicLong totalBatchesReplayed = new AtomicLong();
+    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
+    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
 
     // Single-thread executor service for scheduling and serializing log replay.
     private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
@@ -87,15 +83,20 @@ public class BatchlogManager implements BatchlogManagerMBean
             throw new RuntimeException(e);
         }
 
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws ExecutionException, InterruptedException
-            {
-                replayAllFailedBatches();
-            }
-        };
+        batchlogTasks.schedule(this::replayInitially, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
+
+        batchlogTasks.scheduleWithFixedDelay(this::replayAllFailedBatches,
+                                             StorageService.RING_DELAY + REPLAY_INTERVAL,
+                                             REPLAY_INTERVAL,
+                                             TimeUnit.MILLISECONDS);
+    }
+
+    private void replayInitially()
+    {
+        // Initial run must take care of non-time-uuid batches as written by Version 1.2.
+        convertOldBatchEntries();
 
-        batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL, TimeUnit.MILLISECONDS);
+        replayAllFailedBatches();
     }
 
     public static void shutdown() throws InterruptedException
@@ -106,13 +107,16 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public int countAllBatches()
     {
-        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG);
-        return (int) executeInternal(query).one().getLong("count");
+        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
+        UntypedResultSet results = executeInternal(query);
+        if (results.isEmpty())
+            return 0;
+        return (int) results.one().getLong("count");
     }
 
     public long getTotalBatchesReplayed()
     {
-        return totalBatchesReplayed.longValue();
+        return totalBatchesReplayed;
     }
 
     public void forceBatchlogReplay() throws Exception
@@ -122,34 +126,27 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public Future<?> startBatchlogReplay()
     {
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws ExecutionException, InterruptedException
-            {
-                replayAllFailedBatches();
-            }
-        };
         // If a replay is already in progress this request will be executed after it completes.
-        return batchlogTasks.submit(runnable);
+        return batchlogTasks.submit(this::replayAllFailedBatches);
     }
 
-    public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
+    void performInitialReplay() throws InterruptedException, ExecutionException
     {
-        return getBatchlogMutationFor(mutations, uuid, version, FBUtilities.timestampMicros());
+        // Invokes initial replay. Used for testing only.
+        batchlogTasks.submit(this::replayInitially).get();
     }
 
-    @VisibleForTesting
-    static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now)
+    public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version)
     {
-        return new RowUpdateBuilder(SystemKeyspace.Batchlog, now, uuid)
+        return new RowUpdateBuilder(SystemKeyspace.Batches, FBUtilities.timestampMicros(), uuid)
                .clustering()
                .add("data", serializeMutations(mutations, version))
-               .add("written_at", new Date(now / 1000))
                .add("version", version)
                .build();
     }
 
-    private static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
+    @VisibleForTesting
+    static ByteBuffer serializeMutations(Collection<Mutation> mutations, int version)
     {
         try (DataOutputBuffer buf = new DataOutputBuffer())
         {
@@ -164,7 +161,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         }
     }
 
-    private void replayAllFailedBatches() throws ExecutionException, InterruptedException
+    private void replayAllFailedBatches()
     {
         logger.debug("Started replayAllFailedBatches");
 
@@ -173,67 +170,62 @@ public class BatchlogManager implements BatchlogManagerMBean
         int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 
-        UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d",
-                                                              SystemKeyspace.NAME,
-                                                              SystemKeyspace.BATCHLOG,
-                                                              PAGE_SIZE));
-
-        while (!page.isEmpty())
-        {
-            UUID id = processBatchlogPage(page, rateLimiter);
-
-            if (page.size() < PAGE_SIZE)
-                break; // we've exhausted the batchlog, next query would be empty.
-
-            page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d",
-                                                 SystemKeyspace.NAME,
-                                                 SystemKeyspace.BATCHLOG,
-                                                 PAGE_SIZE),
-                                                 id);
-        }
+        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
+        int pageSize = calculatePageSize();
+        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
+        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
+        // token(id) > token(lastReplayedUuid) as part of the query.
+        String query = String.format("SELECT id, data, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.BATCHES);
+        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
+        processBatchlogEntries(batches, pageSize, rateLimiter);
+        lastReplayedUuid = limitUuid;
+        logger.debug("Finished replayAllFailedBatches");
+    }
 
-        cleanup();
+    // read less rows (batches) per page if they are very large
+    private static int calculatePageSize()
+    {
+        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
+        double averageRowSize = store.getMeanPartitionSize();
+        if (averageRowSize <= 0)
+            return DEFAULT_PAGE_SIZE;
 
-        logger.debug("Finished replayAllFailedBatches");
+        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
     }
 
-    private void deleteBatch(UUID id)
+    private static void deleteBatch(UUID id)
     {
         Mutation mutation = new Mutation(
-                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog,
+                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
                                                     UUIDType.instance.decompose(id),
                                                     FBUtilities.timestampMicros(),
                                                     FBUtilities.nowInSeconds()));
         mutation.apply();
     }
 
-    private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
+    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
     {
-        UUID id = null;
-        ArrayList<Batch> batches = new ArrayList<>(page.size());
+        int positionInPage = 0;
+        ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize);
 
         // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
-        for (UntypedResultSet.Row row : page)
+        for (UntypedResultSet.Row row : batches)
         {
-            id = row.getUUID("id");
-            long writtenAt = row.getLong("written_at");
-            // enough time for the actual write + batchlog entry mutation delivery (two separate requests).
-            long timeout = getBatchlogTimeout();
-            if (System.currentTimeMillis() < writtenAt + timeout)
-                continue; // not ready to replay yet, might still get a deletion.
-
-            int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
-            Batch batch = new Batch(id, writtenAt, row.getBytes("data"), version);
+            UUID id = row.getUUID("id");
+            int version = row.getInt("version");
+            Batch batch = new Batch(id, row.getBytes("data"), version);
             try
             {
                 if (batch.replay(rateLimiter) > 0)
                 {
-                    batches.add(batch);
+                    unfinishedBatches.add(batch);
                 }
                 else
                 {
                     deleteBatch(id); // no write mutations were sent (either expired or all CFs involved truncated).
-                    totalBatchesReplayed.incrementAndGet();
+                    ++totalBatchesReplayed;
                 }
             }
             catch (IOException e)
@@ -241,22 +233,31 @@ public class BatchlogManager implements BatchlogManagerMBean
                 logger.warn("Skipped batch replay of {} due to {}", id, e);
                 deleteBatch(id);
             }
+
+            if (++positionInPage == pageSize)
+            {
+                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
+                // finish processing the page before requesting the next row.
+                finishAndClearBatches(unfinishedBatches);
+                positionInPage = 0;
+            }
         }
+        finishAndClearBatches(unfinishedBatches);
+    }
 
-        // now waiting for all batches to complete their processing
+    private void finishAndClearBatches(ArrayList<Batch> batches)
+    {
         // schedule hints for timed out deliveries
         for (Batch batch : batches)
         {
             batch.finish();
             deleteBatch(batch.id);
         }
-
-        totalBatchesReplayed.addAndGet(batches.size());
-
-        return id;
+        totalBatchesReplayed += batches.size();
+        batches.clear();
     }
 
-    public long getBatchlogTimeout()
+    public static long getBatchlogTimeout()
     {
         return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual write + BM removal mutation
     }
@@ -270,10 +271,10 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
 
-        public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
+        Batch(UUID id, ByteBuffer data, int version)
         {
             this.id = id;
-            this.writtenAt = writtenAt;
+            this.writtenAt = UUIDGen.unixTimestamp(id);
             this.data = data;
             this.version = version;
         }
@@ -366,7 +367,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             }
         }
 
-        private List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
+        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
         {
             List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
             for (Mutation mutation : mutations)
@@ -384,7 +385,7 @@ public class BatchlogManager implements BatchlogManagerMBean
          *
          * @return direct delivery handler to wait on or null, if no live nodes found
          */
-        private ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
+        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
         {
             Set<InetAddress> liveEndpoints = new HashSet<>();
             String ks = mutation.getKeyspaceName();
@@ -429,9 +430,9 @@ public class BatchlogManager implements BatchlogManagerMBean
          */
         private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
         {
-            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
+            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-            public ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
             {
                 super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
                 undelivered.addAll(writeEndpoints);
@@ -453,17 +454,42 @@ public class BatchlogManager implements BatchlogManagerMBean
         }
     }
 
-    // force flush + compaction to reclaim space from the replayed batches
-    private void cleanup() throws ExecutionException, InterruptedException
+    @SuppressWarnings("deprecation")
+    private static void convertOldBatchEntries()
     {
-        ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG);
-        cfs.forceBlockingFlush();
-        Collection<Descriptor> descriptors = new ArrayList<>();
-        // expects ALL sstables to be available for compaction, so just use live set...
-        for (SSTableReader sstr : cfs.getSSTables(SSTableSet.LIVE))
-            descriptors.add(sstr.descriptor);
-        if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
-            CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
+        logger.debug("Started convertOldBatchEntries");
+
+        String query = String.format("SELECT id, data, written_at, version FROM %s.%s",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.LEGACY_BATCHLOG);
+        UntypedResultSet batches = executeInternalWithPaging(query, DEFAULT_PAGE_SIZE);
+        int convertedBatches = 0;
+        for (UntypedResultSet.Row row : batches)
+        {
+            UUID id = row.getUUID("id");
+            long timestamp = row.getLong("written_at");
+            int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
+            logger.debug("Converting mutation at " + timestamp);
+
+            UUID newId = id;
+            if (id.version() != 1 || timestamp != UUIDGen.unixTimestamp(id))
+                newId = UUIDGen.getTimeUUID(timestamp, convertedBatches);
+            ++convertedBatches;
+
+            Mutation addRow = new RowUpdateBuilder(SystemKeyspace.Batches,
+                                                   FBUtilities.timestampMicros(),
+                                                   newId)
+                    .clustering()
+                    .add("data", row.getBytes("data"))
+                    .add("version", version)
+                    .build();
+
+            addRow.apply();
+        }
+        if (convertedBatches > 0)
+            Keyspace.openAndGetStore(SystemKeyspace.LegacyBatchlog).truncateBlocking();
+        // cleanup will be called after replay
+        logger.debug("Finished convertOldBatchEntries");
     }
 
     public static class EndpointFilter
@@ -504,9 +530,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             if (validated.keySet().size() == 1)
             {
                 // we have only 1 `other` rack
-                // pick up to two random nodes from there
-                List<InetAddress> otherRack = validated.get(validated.keySet().iterator().next());
-                Collections.shuffle(otherRack);
+                Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
                 return Lists.newArrayList(Iterables.limit(otherRack, 2));
             }
 
@@ -519,7 +543,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             else
             {
                 racks = Lists.newArrayList(validated.keySet());
-                Collections.shuffle((List) racks);
+                Collections.shuffle((List<String>) racks);
             }
 
             // grab a random member of up to two racks

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 1f3c7db..255f9a0 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2054,6 +2054,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return count > 0 ? (int) (sum / count) : 0;
     }
 
+    public double getMeanPartitionSize()
+    {
+        long sum = 0;
+        long count = 0;
+        for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
+        {
+            long n = sstable.getEstimatedPartitionSize().count();
+            sum += sstable.getEstimatedPartitionSize().mean() * n;
+            count += n;
+        }
+        return count > 0 ? sum * 1.0 / count : 0;
+    }
+
     public long estimateKeys()
     {
         long n = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index a950e17..2db0ce9 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -342,7 +342,7 @@ public class Memtable implements Comparable<Memtable>
                                     + liveDataSize.get()) // data
                                     * 1.2); // bloom filter and row index overhead
 
-            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
+            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
         }
 
         public long getExpectedWriteSize()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2d0ca24..bc0be65 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -88,7 +89,7 @@ public final class SystemKeyspace
     public static final String NAME = "system";
 
     public static final String HINTS = "hints";
-    public static final String BATCHLOG = "batchlog";
+    public static final String BATCHES = "batches";
     public static final String PAXOS = "paxos";
     public static final String BUILT_INDEXES = "IndexInfo";
     public static final String LOCAL = "local";
@@ -102,6 +103,7 @@ public final class SystemKeyspace
     public static final String MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS = "materialized_views_builds_in_progress";
     public static final String BUILT_MATERIALIZED_VIEWS = "built_materialized_views";
 
+    @Deprecated public static final String LEGACY_BATCHLOG = "batchlog";
     @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
     @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
     @Deprecated public static final String LEGACY_COLUMNS = "schema_columns";
@@ -123,15 +125,15 @@ public final class SystemKeyspace
                 .compaction(CompactionParams.scts(singletonMap("enabled", "false")))
                 .gcGraceSeconds(0);
 
-    public static final CFMetaData Batchlog =
-        compile(BATCHLOG,
+    public static final CFMetaData Batches =
+        compile(BATCHES,
                 "batches awaiting replay",
                 "CREATE TABLE %s ("
-                + "id uuid,"
+                + "id timeuuid,"
                 + "data blob,"
                 + "version int,"
-                + "written_at timestamp,"
                 + "PRIMARY KEY ((id)))")
+                .copy(new LocalPartitioner(TimeUUIDType.instance))
                 .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
                 .gcGraceSeconds(0);
 
@@ -280,6 +282,19 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), view_name))");
 
     @Deprecated
+    public static final CFMetaData LegacyBatchlog =
+        compile(LEGACY_BATCHLOG,
+                "*DEPRECATED* batchlog entries",
+                "CREATE TABLE %s ("
+                + "id uuid,"
+                + "data blob,"
+                + "version int,"
+                + "written_at timestamp,"
+                + "PRIMARY KEY ((id)))")
+                .compaction(CompactionParams.scts(singletonMap("min_threshold", "2")))
+                .gcGraceSeconds(0);
+
+    @Deprecated
     public static final CFMetaData LegacyKeyspaces =
         compile(LEGACY_KEYSPACES,
                 "*DEPRECATED* keyspace definitions",
@@ -409,7 +424,7 @@ public final class SystemKeyspace
     {
         return Tables.of(BuiltIndexes,
                          Hints,
-                         Batchlog,
+                         Batches,
                          Paxos,
                          Local,
                          Peers,
@@ -421,6 +436,7 @@ public final class SystemKeyspace
                          AvailableRanges,
                          MaterializedViewsBuildsInProgress,
                          BuiltMaterializedViews,
+                         LegacyBatchlog,
                          LegacyKeyspaces,
                          LegacyColumnfamilies,
                          LegacyColumns,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/dht/LocalPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index 2a5a16e..f9421c5 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -66,9 +66,37 @@ public class LocalPartitioner implements IPartitioner
 
     public Token.TokenFactory getTokenFactory()
     {
-        throw new UnsupportedOperationException();
+        return tokenFactory;
     }
 
+    private final Token.TokenFactory tokenFactory = new Token.TokenFactory()
+    {
+        public ByteBuffer toByteArray(Token token)
+        {
+            return ((LocalToken)token).token;
+        }
+
+        public Token fromByteArray(ByteBuffer bytes)
+        {
+            return new LocalToken(bytes);
+        }
+
+        public String toString(Token token)
+        {
+            return comparator.getString(((LocalToken)token).token);
+        }
+
+        public void validate(String token)
+        {
+            comparator.validate(comparator.fromString(token));
+        }
+
+        public Token fromString(String string)
+        {
+            return new LocalToken(comparator.fromString(string));
+        }
+    };
+
     public boolean preservesOrder()
     {
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 51aa48f..b637b17 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -863,7 +863,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                         null,
                                                                         WriteType.SIMPLE);
         Mutation mutation = new Mutation(
-                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog,
+                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
                                                     UUIDType.instance.decompose(uuid),
                                                     FBUtilities.timestampMicros(),
                                                     FBUtilities.nowInSeconds()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/762db474/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 5f1523e..fbb7a5b 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -17,58 +17,74 @@
  */
 package org.apache.cassandra.db;
 
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.partitions.ArrayBackedPartition;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-import org.junit.BeforeClass;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import com.google.common.collect.Lists;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.partitions.ArrayBackedPartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-
 public class BatchlogManagerTest
 {
     private static final String KEYSPACE1 = "BatchlogManagerTest1";
     private static final String CF_STANDARD1 = "Standard1";
     private static final String CF_STANDARD2 = "Standard2";
     private static final String CF_STANDARD3 = "Standard3";
+    private static final String CF_STANDARD4 = "Standard4";
+
+    static PartitionerSwitcher sw;
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
+        sw = Util.switchPartitioner(Murmur3Partitioner.instance);
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
                                     SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3));
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance));
         System.out.println(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata.partitionKeyColumns());
     }
 
+    @AfterClass
+    public static void cleanup()
+    {
+        sw.close();
+    }
+
     @Before
     public void setUp() throws Exception
     {
@@ -76,6 +92,8 @@ public class BatchlogManagerTest
         InetAddress localhost = InetAddress.getByName("127.0.0.1");
         metadata.updateNormalToken(Util.token("A"), localhost);
         metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+        Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.Batches.cfId).truncateBlocking();
+        Schema.instance.getColumnFamilyStoreInstance(SystemKeyspace.LegacyBatchlog.cfId).truncateBlocking();
     }
 
     @Test
@@ -122,18 +140,17 @@ public class BatchlogManagerTest
                     .build();
 
             long timestamp = i < 500
-                           ? (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000
-                           : Long.MAX_VALUE;
-
-            Mutation m2 = BatchlogManager.getBatchlogMutationFor(Collections.singleton(m),
-                                                   UUIDGen.getTimeUUID(),
-                                                   MessagingService.current_version,
-                                                   timestamp);
-            m2.applyUnsafe();
+                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+            BatchlogManager.getBatchlogMutationFor(Collections.singleton(m),
+                                                   UUIDGen.getTimeUUID(timestamp, i),
+                                                   MessagingService.current_version)
+                           .applyUnsafe();
         }
 
         // Flush the batchlog to disk (see CASSANDRA-6822).
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG).forceBlockingFlush();
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
 
         assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
@@ -165,25 +182,29 @@ public class BatchlogManagerTest
         assertEquals(500, result.one().getLong("count"));
     }
 
-    /*
     @Test
     public void testTruncatedReplay() throws InterruptedException, ExecutionException
     {
-        CellNameType comparator2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2").metadata.comparator;
-        CellNameType comparator3 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard3").metadata.comparator;
+        CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
+        CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
         // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
         // Each batchlog entry with a mutation for Standard2 and Standard3.
         // In the middle of the process, 'truncate' Standard2.
         for (int i = 0; i < 1000; i++)
         {
-            Mutation mutation1 = new Mutation(KEYSPACE1, bytes(i));
-            mutation1.add("Standard2", comparator2.makeCellName(bytes(i)), bytes(i), 0);
-            Mutation mutation2 = new Mutation(KEYSPACE1, bytes(i));
-            mutation2.add("Standard3", comparator3.makeCellName(bytes(i)), bytes(i), 0);
+            Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+            Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
             List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
 
             // Make sure it's ready to be replayed, so adjust the timestamp.
-            long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2;
+            long timestamp = System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout();
 
             if (i == 500)
                 SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"),
@@ -197,14 +218,13 @@ public class BatchlogManagerTest
                 timestamp--;
 
             BatchlogManager.getBatchlogMutationFor(mutations,
-                                                   UUIDGen.getTimeUUID(),
-                                                   MessagingService.current_version,
-                                                   timestamp * 1000)
+                                                   UUIDGen.getTimeUUID(timestamp, i),
+                                                   MessagingService.current_version)
                            .applyUnsafe();
         }
 
         // Flush the batchlog to disk (see CASSANDRA-6822).
-        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG).forceFlush();
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
 
         // Force batchlog replay and wait for it to complete.
         BatchlogManager.instance.startBatchlogReplay().get();
@@ -216,8 +236,8 @@ public class BatchlogManagerTest
             if (i >= 500)
             {
                 assertEquals(bytes(i), result.one().getBytes("key"));
-                assertEquals(bytes(i), result.one().getBytes("column1"));
-                assertEquals(bytes(i), result.one().getBytes("value"));
+                assertEquals("name" + i, result.one().getString("name"));
+                assertEquals("val" + i, result.one().getString("val"));
             }
             else
             {
@@ -229,9 +249,143 @@ public class BatchlogManagerTest
         {
             UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
             assertEquals(bytes(i), result.one().getBytes("key"));
-            assertEquals(bytes(i), result.one().getBytes("column1"));
-            assertEquals(bytes(i), result.one().getBytes("value"));
+            assertEquals("name" + i, result.one().getString("name"));
+            assertEquals("val" + i, result.one().getString("val"));
         }
     }
-    */
+
+    static Mutation fakeVersion12MutationFor(Collection<Mutation> mutations, long now) throws IOException
+    {
+        // Serialization can't write version 1.2 mutations, pretend this is old by using random id and written_at and
+        // saving it in the legacy batchlog.
+        UUID uuid = UUID.randomUUID();
+        ByteBuffer writtenAt = LongType.instance.decompose(now);
+        int version = MessagingService.VERSION_30;
+        ByteBuffer data = BatchlogManager.serializeMutations(mutations, version);
+
+        return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
+            .clustering()
+            .add("written_at", writtenAt)
+            .add("data", data)
+            .add("version", version)
+            .build();
+    }
+
+    static Mutation fakeVersion20MutationFor(Collection<Mutation> mutations, UUID uuid)
+    {
+        // Serialization can't write version 1.2 mutations, pretend this is old by saving it in the legacy batchlog.
+        int version = MessagingService.VERSION_30;
+        ByteBuffer writtenAt = LongType.instance.decompose(UUIDGen.unixTimestamp(uuid));
+        return new RowUpdateBuilder(SystemKeyspace.LegacyBatchlog, FBUtilities.timestampMicros(), uuid)
+               .clustering()
+               .add("data", BatchlogManager.serializeMutations(mutations, version))
+               .add("written_at", writtenAt)
+               .add("version", version)
+               .build();
+    }
+
+    @Test
+    public void testConversion() throws Exception
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+        CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
+
+        // Generate 1000 mutations and put them all into the batchlog.
+        // Half (500) ready to be replayed, half not.
+        for (int i = 0; i < 1000; i++)
+        {
+            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            long timestamp = i < 500
+                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+
+            fakeVersion12MutationFor(Collections.singleton(mutation), timestamp).applyUnsafe();
+        }
+
+        // Add 400 version 2.0 mutations and put them all into the batchlog.
+        // Half (200) ready to be replayed, half not.
+        for (int i = 1000; i < 1400; i++)
+        {
+            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            long timestamp = i < 1200
+                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+
+            fakeVersion20MutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(timestamp, i)).applyUnsafe();
+        }
+
+        // Mix in 100 current version mutations, 50 ready for replay.
+        for (int i = 1400; i < 1500; i++)
+        {
+            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            long timestamp = i < 1450
+                           ? (System.currentTimeMillis() - BatchlogManager.instance.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.instance.getBatchlogTimeout());
+
+
+            BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation),
+                                                   UUIDGen.getTimeUUID(timestamp, i),
+                                                   MessagingService.current_version)
+                           .applyUnsafe();
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+        assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        UntypedResultSet result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+        assertEquals("Count in blog legacy", 1400, result.one().getLong("count"));
+        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+        assertEquals("Count in blog", 100, result.one().getLong("count"));
+
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.performInitialReplay();
+
+        // Ensure that the first half, and only the first half, got replayed.
+        assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        for (int i = 0; i < 1500; i++)
+        {
+            result = QueryProcessor.executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
+            if (i < 500 || i >= 1000 && i < 1200 || i >= 1400 && i < 1450)
+            {
+                assertEquals(bytes(i), result.one().getBytes("key"));
+                assertEquals("name" + i, result.one().getString("name"));
+                assertEquals("val" + i, result.one().getString("val"));
+            }
+            else
+            {
+                assertTrue("Present at " + i, result.isEmpty());
+            }
+        }
+
+        // Ensure that no stray mutations got somehow applied.
+        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
+        assertEquals(750, result.one().getLong("count"));
+
+        // Ensure batchlog is left as expected.
+        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+        assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
+        result = QueryProcessor.executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+        assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
+    }
 }


[2/2] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c35bfc09
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c35bfc09
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c35bfc09

Branch: refs/heads/trunk
Commit: c35bfc09cca4add42afed0733a29d6f6843dbba0
Parents: bf8ac1a 762db47
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Aug 6 17:32:39 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Aug 6 17:32:39 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 NEWS.txt                                        |   2 +
 .../apache/cassandra/db/BatchlogManager.java    | 226 +++++++++--------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  13 +
 src/java/org/apache/cassandra/db/Memtable.java  |   2 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  28 ++-
 .../apache/cassandra/dht/LocalPartitioner.java  |  30 ++-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../cassandra/db/BatchlogManagerTest.java       | 246 +++++++++++++++----
 9 files changed, 395 insertions(+), 156 deletions(-)
----------------------------------------------------------------------