You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:30 UTC

[06/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 3d3ca09..19f81be 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -40,63 +39,66 @@ public class HeapPool extends MemtablePool
 
     public MemtableAllocator newAllocator()
     {
-        return new Allocator(this);
+        // TODO
+        throw new UnsupportedOperationException();
+        //return new Allocator(this);
     }
 
-    public static class Allocator extends MemtableBufferAllocator
-    {
-        Allocator(HeapPool pool)
-        {
-            super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
-        }
+    // TODO
+    //public static class Allocator extends MemtableBufferAllocator
+    //{
+    //    Allocator(HeapPool pool)
+    //    {
+    //        super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
+    //    }
 
-        public ByteBuffer allocate(int size, OpOrder.Group opGroup)
-        {
-            super.onHeap().allocate(size, opGroup);
-            return ByteBuffer.allocate(size);
-        }
+    //    public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+    //    {
+    //        super.onHeap().allocate(size, opGroup);
+    //        return ByteBuffer.allocate(size);
+    //    }
 
-        public DataReclaimer reclaimer()
-        {
-            return new Reclaimer();
-        }
+    //    public DataReclaimer reclaimer()
+    //    {
+    //        return new Reclaimer();
+    //    }
 
-        private class Reclaimer implements DataReclaimer
-        {
-            List<Cell> delayed;
+    //    private class Reclaimer implements DataReclaimer
+    //    {
+    //        List<Cell> delayed;
 
-            public Reclaimer reclaim(Cell cell)
-            {
-                if (delayed == null)
-                    delayed = new ArrayList<>();
-                delayed.add(cell);
-                return this;
-            }
+    //        public Reclaimer reclaim(Cell cell)
+    //        {
+    //            if (delayed == null)
+    //                delayed = new ArrayList<>();
+    //            delayed.add(cell);
+    //            return this;
+    //        }
 
-            public Reclaimer reclaimImmediately(Cell cell)
-            {
-                onHeap().release(cell.name().dataSize() + cell.value().remaining());
-                return this;
-            }
+    //        public Reclaimer reclaimImmediately(Cell cell)
+    //        {
+    //            onHeap().release(cell.name().dataSize() + cell.value().remaining());
+    //            return this;
+    //        }
 
-            public Reclaimer reclaimImmediately(DecoratedKey key)
-            {
-                onHeap().release(key.getKey().remaining());
-                return this;
-            }
+    //        public Reclaimer reclaimImmediately(DecoratedKey key)
+    //        {
+    //            onHeap().release(key.getKey().remaining());
+    //            return this;
+    //        }
 
-            public void cancel()
-            {
-                if (delayed != null)
-                    delayed.clear();
-            }
+    //        public void cancel()
+    //        {
+    //            if (delayed != null)
+    //                delayed.clear();
+    //        }
 
-            public void commit()
-            {
-                if (delayed != null)
-                    for (Cell cell : delayed)
-                        reclaimImmediately(cell);
-            }
-        }
-    }
+    //        public void commit()
+    //        {
+    //            if (delayed != null)
+    //                for (Cell cell : delayed)
+    //                    reclaimImmediately(cell);
+    //        }
+    //    }
+    //}
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index e814b4d..1e0c11e 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
@@ -58,10 +59,8 @@ public abstract class MemtableAllocator
         this.offHeap = offHeap;
     }
 
-    public abstract Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp);
-    public abstract CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp);
-    public abstract DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp);
-    public abstract ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp);
+    public abstract MemtableRowData.ReusableRow newReusableRow();
+    public abstract RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp);
     public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup);
     public abstract DataReclaimer reclaimer();
 
@@ -104,10 +103,16 @@ public abstract class MemtableAllocator
         return state == LifeCycle.LIVE;
     }
 
+    public static interface RowAllocator extends Row.Writer
+    {
+        public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic);
+        public MemtableRowData allocatedRowData();
+    }
+
     public static interface DataReclaimer
     {
-        public DataReclaimer reclaim(Cell cell);
-        public DataReclaimer reclaimImmediately(Cell cell);
+        public DataReclaimer reclaim(MemtableRowData row);
+        public DataReclaimer reclaimImmediately(MemtableRowData row);
         public DataReclaimer reclaimImmediately(DecoratedKey key);
         public void cancel();
         public void commit();
@@ -115,12 +120,12 @@ public abstract class MemtableAllocator
 
     public static final DataReclaimer NO_OP = new DataReclaimer()
     {
-        public DataReclaimer reclaim(Cell cell)
+        public DataReclaimer reclaim(MemtableRowData update)
         {
             return this;
         }
 
-        public DataReclaimer reclaimImmediately(Cell cell)
+        public DataReclaimer reclaimImmediately(MemtableRowData update)
         {
             return this;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
index 7034d76..144f439 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java
@@ -20,12 +20,9 @@ package org.apache.cassandra.utils.memory;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.BufferDecoratedKey;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.CounterCell;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletedCell;
-import org.apache.cassandra.db.ExpiringCell;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public abstract class MemtableBufferAllocator extends MemtableAllocator
@@ -36,24 +33,14 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator
         super(onHeap, offHeap);
     }
 
-    public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    public MemtableRowData.ReusableRow newReusableRow()
     {
-        return cell.localCopy(cfm, allocator(writeOp));
+        return MemtableRowData.BufferRowData.createReusableRow();
     }
 
-    public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp)
     {
-        return cell.localCopy(cfm, allocator(writeOp));
-    }
-
-    public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp)
-    {
-        return cell.localCopy(cfm, allocator(writeOp));
-    }
-
-    public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp)
-    {
-        return cell.localCopy(cfm, allocator(writeOp));
+        return new RowBufferAllocator(allocator(writeOp), cfm.isCounter());
     }
 
     public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
@@ -67,4 +54,71 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator
     {
         return new ContextAllocator(writeOp, this);
     }
+
+    private static class RowBufferAllocator extends RowDataBlock.Writer implements RowAllocator
+    {
+        private final AbstractAllocator allocator;
+        private final boolean isCounter;
+
+        private MemtableRowData.BufferClustering clustering;
+        private int clusteringIdx;
+        private LivenessInfo info;
+        private DeletionTime deletion;
+        private RowDataBlock data;
+
+        private RowBufferAllocator(AbstractAllocator allocator, boolean isCounter)
+        {
+            super(true);
+            this.allocator = allocator;
+            this.isCounter = isCounter;
+        }
+
+        public void allocateNewRow(int clusteringSize, Columns columns, boolean isStatic)
+        {
+            data = new RowDataBlock(columns, 1, false, isCounter);
+            clustering = isStatic ? null : new MemtableRowData.BufferClustering(clusteringSize);
+            clusteringIdx = 0;
+            updateWriter(data);
+        }
+
+        public MemtableRowData allocatedRowData()
+        {
+            MemtableRowData row = new MemtableRowData.BufferRowData(clustering == null ? Clustering.STATIC_CLUSTERING : clustering,
+                                                                    info,
+                                                                    deletion,
+                                                                    data);
+
+            clustering = null;
+            info = LivenessInfo.NONE;
+            deletion = DeletionTime.LIVE;
+            data = null;
+
+            return row;
+        }
+
+        public void writeClusteringValue(ByteBuffer value)
+        {
+            clustering.setClusteringValue(clusteringIdx++, value == null ? null : allocator.clone(value));
+        }
+
+        public void writePartitionKeyLivenessInfo(LivenessInfo info)
+        {
+            this.info = info;
+        }
+
+        public void writeRowDeletion(DeletionTime deletion)
+        {
+            this.deletion = deletion;
+        }
+
+        @Override
+        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+        {
+            ByteBuffer v = allocator.clone(value);
+            if (column.isComplex())
+                complexWriter.addCell(column, v, info, MemtableRowData.BufferCellPath.clone(path, allocator));
+            else
+                simpleWriter.addCell(column, v, info);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 88846c5..7ca859d 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -25,16 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.CounterCell;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletedCell;
-import org.apache.cassandra.db.ExpiringCell;
-import org.apache.cassandra.db.NativeCell;
-import org.apache.cassandra.db.NativeCounterCell;
 import org.apache.cassandra.db.NativeDecoratedKey;
-import org.apache.cassandra.db.NativeDeletedCell;
-import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.db.rows.MemtableRowData;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class NativeAllocator extends MemtableAllocator
@@ -60,28 +53,16 @@ public class NativeAllocator extends MemtableAllocator
         super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
     }
 
-    @Override
-    public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp)
-    {
-        return new NativeCell(this, writeOp, cell);
-    }
-
-    @Override
-    public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    public MemtableRowData.ReusableRow newReusableRow()
     {
-        return new NativeCounterCell(this, writeOp, cell);
+        // TODO
+        throw new UnsupportedOperationException();
     }
 
-    @Override
-    public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp)
-    {
-        return new NativeDeletedCell(this, writeOp, cell);
-    }
-
-    @Override
-    public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    public RowAllocator newRowAllocator(CFMetaData cfm, OpOrder.Group writeOp)
     {
-        return new NativeExpiringCell(this, writeOp, cell);
+        // TODO
+        throw new UnsupportedOperationException();
     }
 
     public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
deleted file mode 100644
index 9641930..0000000
--- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSearchIterator;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-// TODO : should probably lower fan-factor for tests to make them more intensive
-public class LongBTreeTest
-{
-
-    private static final MetricRegistry metrics = new MetricRegistry();
-    private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE"));
-    private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE"));
-    private static final ExecutorService MODIFY = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY"));
-    private static final ExecutorService COMPARE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE"));
-    private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f);
-
-    static
-    {
-        System.setProperty("cassandra.btree.fanfactor", "4");
-    }
-
-    @Test
-    public void testOversizedMiddleInsert()
-    {
-        TreeSet<Integer> canon = new TreeSet<>();
-        for (int i = 0 ; i < 10000000 ; i++)
-            canon.add(i);
-        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
-        btree = BTree.update(btree, ICMP, canon, true);
-        canon.add(Integer.MIN_VALUE);
-        canon.add(Integer.MAX_VALUE);
-        Assert.assertTrue(BTree.isWellFormed(btree, ICMP));
-        testEqual("Oversize", BTree.<Integer>slice(btree, true), canon.iterator());
-    }
-
-    @Test
-    public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException
-    {
-        testInsertions(10000000, 50, 1, 1, true);
-    }
-
-    @Test
-    public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException
-    {
-        testInsertions(10000000, 50, 1, 5, true);
-    }
-
-    @Test
-    public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException
-    {
-        testInsertions(10000000, 500, 10, 1, true);
-    }
-
-    @Test
-    public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException
-    {
-        testInsertions(10000000, 500, 10, 10, true);
-    }
-
-    @Test
-    public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException
-    {
-        testInsertions(100000000, 5000, 3, 100, true);
-    }
-
-    @Test
-    public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException
-    {
-        testInsertions(10000, 50, 10, 10, false);
-    }
-
-    @Test
-    public void testSearchIterator() throws InterruptedException
-    {
-        int threads = Runtime.getRuntime().availableProcessors();
-        final CountDownLatch latch = new CountDownLatch(threads);
-        final AtomicLong errors = new AtomicLong();
-        final AtomicLong count = new AtomicLong();
-        final int perThreadTrees = 100;
-        final int perTreeSelections = 100;
-        final long totalCount = threads * perThreadTrees * perTreeSelections;
-        for (int t = 0 ; t < threads ; t++)
-        {
-            MODIFY.execute(new Runnable()
-            {
-                public void run()
-                {
-                    ThreadLocalRandom random = ThreadLocalRandom.current();
-                    for (int i = 0 ; i < perThreadTrees ; i++)
-                    {
-                        Object[] tree = randomTree(10000, random);
-                        for (int j = 0 ; j < perTreeSelections ; j++)
-                        {
-                            BTreeSearchIterator<Integer, Integer, Integer> searchIterator = new BTreeSearchIterator<>(tree, ICMP);
-                            for (Integer key : randomSelection(tree, random))
-                                if (key != searchIterator.next(key))
-                                    errors.incrementAndGet();
-                            searchIterator = new BTreeSearchIterator<Integer, Integer, Integer>(tree, ICMP);
-                            for (Integer key : randomMix(tree, random))
-                                if (key != searchIterator.next(key))
-                                    if (BTree.find(tree, ICMP, key) == key)
-                                        errors.incrementAndGet();
-                            count.incrementAndGet();
-                        }
-                    }
-                    latch.countDown();
-                }
-            });
-        }
-        while (latch.getCount() > 0)
-        {
-            latch.await(10L, TimeUnit.SECONDS);
-            System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""));
-            assert errors.get() == 0;
-        }
-    }
-
-    private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException
-    {
-        int batchesPerTest = perTestCount / modificationBatchSize;
-        int maximumRunLength = 100;
-        int testKeyRange = perTestCount * testKeyRatio;
-        int tests = totalCount / perTestCount;
-        System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops",
-                tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize));
-
-        // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks
-        int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2));
-        for (int chunk = 0 ; chunk < tests ; chunk += chunkSize)
-        {
-            final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>();
-            for (int i = 0 ; i < chunkSize ; i++)
-            {
-                outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality));
-            }
-
-            final List<ListenableFuture<?>> inner = new ArrayList<>();
-            int complete = 0;
-            int reportInterval = totalCount / 100;
-            int lastReportAt = 0;
-            for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer)
-            {
-                inner.addAll(f.get());
-                complete += perTestCount;
-                if (complete - lastReportAt >= reportInterval)
-                {
-                    System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount));
-                    lastReportAt = complete;
-                }
-            }
-            Futures.allAsList(inner).get();
-        }
-        Snapshot snap = BTREE_TIMER.getSnapshot();
-        System.out.println(String.format("btree   : %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
-        snap = TREE_TIMER.getSnapshot();
-        System.out.println(String.format("snaptree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()));
-        System.out.println("Done");
-    }
-
-    private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality)
-    {
-        ListenableFutureTask<List<ListenableFuture<?>>> f = ListenableFutureTask.create(new Callable<List<ListenableFuture<?>>>()
-        {
-            @Override
-            public List<ListenableFuture<?>> call()
-            {
-                final List<ListenableFuture<?>> r = new ArrayList<>();
-                NavigableMap<Integer, Integer> canon = new TreeMap<>();
-                Object[] btree = BTree.empty();
-                final TreeMap<Integer, Integer> buffer = new TreeMap<>();
-                final Random rnd = new Random();
-                for (int i = 0 ; i < iterations ; i++)
-                {
-                    buffer.clear();
-                    int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration);
-                    while (mods > 0)
-                    {
-                        int v = rnd.nextInt(upperBound);
-                        int rc = Math.max(0, Math.min(mods, maxRunLength) - 1);
-                        int c = 1 + (rc <= 0 ? 0 : rnd.nextInt(rc));
-                        for (int j = 0 ; j < c ; j++)
-                        {
-                            buffer.put(v, v);
-                            v++;
-                        }
-                        mods -= c;
-                    }
-                    Timer.Context ctxt;
-                    ctxt = TREE_TIMER.time();
-                    canon.putAll(buffer);
-                    ctxt.stop();
-                    ctxt = BTREE_TIMER.time();
-                    Object[] next = null;
-                    while (next == null)
-                        next = BTree.update(btree, ICMP, buffer.keySet(), true, SPORADIC_ABORT);
-                    btree = next;
-                    ctxt.stop();
-
-                    if (!BTree.isWellFormed(btree, ICMP))
-                    {
-                        System.out.println("ERROR: Not well formed");
-                        throw new AssertionError("Not well formed!");
-                    }
-                    if (quickEquality)
-                        testEqual("", BTree.<Integer>slice(btree, true), canon.keySet().iterator());
-                    else
-                        r.addAll(testAllSlices("RND", btree, new TreeSet<>(canon.keySet())));
-                }
-                return r;
-            }
-        });
-        MODIFY.execute(f);
-        return f;
-    }
-
-    @Test
-    public void testSlicingAllSmallTrees() throws ExecutionException, InterruptedException
-    {
-        Object[] cur = BTree.empty();
-        TreeSet<Integer> canon = new TreeSet<>();
-        // we set FAN_FACTOR to 4, so 128 items is four levels deep, three fully populated
-        for (int i = 0 ; i < 128 ; i++)
-        {
-            String id = String.format("[0..%d)", canon.size());
-            System.out.println("Testing " + id);
-            Futures.allAsList(testAllSlices(id, cur, canon)).get();
-            Object[] next = null;
-            while (next == null)
-                next = BTree.update(cur, ICMP, Arrays.asList(i), true, SPORADIC_ABORT);
-            cur = next;
-            canon.add(i);
-        }
-    }
-
-    static final Comparator<Integer> ICMP = new Comparator<Integer>()
-    {
-        @Override
-        public int compare(Integer o1, Integer o2)
-        {
-            return Integer.compare(o1, o2);
-        }
-    };
-
-    private static List<ListenableFuture<?>> testAllSlices(String id, Object[] btree, NavigableSet<Integer> canon)
-    {
-        List<ListenableFuture<?>> waitFor = new ArrayList<>();
-        testAllSlices(id + " ASC", new BTreeSet<>(btree, ICMP), canon, true, waitFor);
-        testAllSlices(id + " DSC", new BTreeSet<>(btree, ICMP).descendingSet(), canon.descendingSet(), false, waitFor);
-        return waitFor;
-    }
-
-    private static void testAllSlices(String id, NavigableSet<Integer> btree, NavigableSet<Integer> canon, boolean ascending, List<ListenableFuture<?>> results)
-    {
-        testOneSlice(id, btree, canon, results);
-        for (Integer lb : range(canon.size(), Integer.MIN_VALUE, ascending))
-        {
-            // test head/tail sets
-            testOneSlice(String.format("%s->[%d..)", id, lb), btree.headSet(lb, true), canon.headSet(lb, true), results);
-            testOneSlice(String.format("%s->(%d..)", id, lb), btree.headSet(lb, false), canon.headSet(lb, false), results);
-            testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, true), canon.tailSet(lb, true), results);
-            testOneSlice(String.format("%s->(..%d]", id, lb), btree.tailSet(lb, false), canon.tailSet(lb, false), results);
-            for (Integer ub : range(canon.size(), lb, ascending))
-            {
-                // test subsets
-                testOneSlice(String.format("%s->[%d..%d]", id, lb, ub), btree.subSet(lb, true, ub, true), canon.subSet(lb, true, ub, true), results);
-                testOneSlice(String.format("%s->(%d..%d]", id, lb, ub), btree.subSet(lb, false, ub, true), canon.subSet(lb, false, ub, true), results);
-                testOneSlice(String.format("%s->[%d..%d)", id, lb, ub), btree.subSet(lb, true, ub, false), canon.subSet(lb, true, ub, false), results);
-                testOneSlice(String.format("%s->(%d..%d)", id, lb, ub), btree.subSet(lb, false, ub, false), canon.subSet(lb, false, ub, false), results);
-            }
-        }
-    }
-
-    private static void testOneSlice(final String id, final NavigableSet<Integer> test, final NavigableSet<Integer> canon, List<ListenableFuture<?>> results)
-    {
-        ListenableFutureTask<?> f = ListenableFutureTask.create(new Runnable()
-        {
-
-            @Override
-            public void run()
-            {
-                test(id + " Count", test.size(), canon.size());
-                testEqual(id, test.iterator(), canon.iterator());
-                testEqual(id + "->DSCI", test.descendingIterator(), canon.descendingIterator());
-                testEqual(id + "->DSCS", test.descendingSet().iterator(), canon.descendingSet().iterator());
-                testEqual(id + "->DSCS->DSCI", test.descendingSet().descendingIterator(), canon.descendingSet().descendingIterator());
-            }
-        }, null);
-        results.add(f);
-        COMPARE.execute(f);
-    }
-
-    private static void test(String id, int test, int expect)
-    {
-        if (test != expect)
-        {
-            System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test));
-        }
-    }
-
-    private static <V> void testEqual(String id, Iterator<V> btree, Iterator<V> canon)
-    {
-        boolean equal = true;
-        while (btree.hasNext() && canon.hasNext())
-        {
-            Object i = btree.next();
-            Object j = canon.next();
-            if (!i.equals(j))
-            {
-                System.out.println(String.format("%s: Expected %d, Got %d", id, j, i));
-                equal = false;
-            }
-        }
-        while (btree.hasNext())
-        {
-            System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next()));
-            equal = false;
-        }
-        while (canon.hasNext())
-        {
-            System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next()));
-            equal = false;
-        }
-        if (!equal)
-            throw new AssertionError("Not equal");
-    }
-
-    // should only be called on sets that range from 0->N or N->0
-    private static final Iterable<Integer> range(final int size, final int from, final boolean ascending)
-    {
-        return new Iterable<Integer>()
-        {
-            int cur;
-            int delta;
-            int end;
-            {
-                if (ascending)
-                {
-                    end = size + 1;
-                    cur = from == Integer.MIN_VALUE ? -1 : from;
-                    delta = 1;
-                }
-                else
-                {
-                    end = -2;
-                    cur = from == Integer.MIN_VALUE ? size : from;
-                    delta = -1;
-                }
-            }
-            @Override
-            public Iterator<Integer> iterator()
-            {
-                return new Iterator<Integer>()
-                {
-                    @Override
-                    public boolean hasNext()
-                    {
-                        return cur != end;
-                    }
-
-                    @Override
-                    public Integer next()
-                    {
-                        Integer r = cur;
-                        cur += delta;
-                        return r;
-                    }
-
-                    @Override
-                    public void remove()
-                    {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
-    }
-
-    private static Object[] randomTree(int maxSize, Random random)
-    {
-        TreeSet<Integer> build = new TreeSet<>();
-        int size = random.nextInt(maxSize);
-        for (int i = 0 ; i < size ; i++)
-        {
-            build.add(random.nextInt());
-        }
-        return BTree.build(build, ICMP, true, UpdateFunction.NoOp.<Integer>instance());
-    }
-
-    private static Iterable<Integer> randomSelection(Object[] iter, final Random rnd)
-    {
-        final float proportion = rnd.nextFloat();
-        return Iterables.filter(new BTreeSet<>(iter, ICMP), new Predicate<Integer>()
-        {
-            public boolean apply(Integer integer)
-            {
-                return rnd.nextFloat() < proportion;
-            }
-        });
-    }
-
-    private static Iterable<Integer> randomMix(Object[] iter, final Random rnd)
-    {
-        final float proportion = rnd.nextFloat();
-        return Iterables.transform(new BTreeSet<>(iter, ICMP), new Function<Integer, Integer>()
-        {
-            long last = Integer.MIN_VALUE;
-
-            public Integer apply(Integer v)
-            {
-                long last = this.last;
-                this.last = v;
-                if (rnd.nextFloat() < proportion)
-                    return v;
-                return (int)((v - last) / 2);
-            }
-        });
-    }
-
-    private static final class RandomAbort<V> implements UpdateFunction<V>
-    {
-        final Random rnd;
-        final float chance;
-        private RandomAbort(Random rnd, float chance)
-        {
-            this.rnd = rnd;
-            this.chance = chance;
-        }
-
-        public V apply(V replacing, V update)
-        {
-            return update;
-        }
-
-        public boolean abortEarly()
-        {
-            return rnd.nextFloat() < chance;
-        }
-
-        public void allocated(long heapSize)
-        {
-
-        }
-
-        public V apply(V v)
-        {
-            return v;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 3d3de84..cf76e75 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -3,7 +3,7 @@
 # Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
 #
 cluster_name: Test Cluster
-memtable_allocation_type: offheap_objects
+memtable_allocation_type: heap_buffers
 commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size_in_mb: 5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/conf/logback-test.xml
----------------------------------------------------------------------
diff --git a/test/conf/logback-test.xml b/test/conf/logback-test.xml
index 8cb2d6f..9facb83 100644
--- a/test/conf/logback-test.xml
+++ b/test/conf/logback-test.xml
@@ -68,7 +68,7 @@
       <appender-ref ref="TEE"/>
   </appender>
 
-  <root level="DEBUG">
+  <root level="INFO">
     <appender-ref ref="ASYNC" />
   </root>
 </configuration>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db
deleted file mode 100644
index 44d2e59..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-CompressionInfo.db and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db
deleted file mode 100644
index f75c4e6..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Data.db and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db
deleted file mode 100644
index 8f0a999..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Filter.db and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db
deleted file mode 100644
index da84fbc..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Index.db and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db
deleted file mode 100644
index 0762615..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Statistics.db and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db
deleted file mode 100644
index 376ca9d..0000000
Binary files a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-Summary.db and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt b/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt
deleted file mode 100644
index cf6efa8..0000000
--- a/test/data/corrupt-sstables/Keyspace1-Standard3-jb-1-TOC.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-CompressionInfo.db
-TOC.txt
-Filter.db
-Statistics.db
-Data.db
-Summary.db
-Index.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-CRC.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-CRC.db b/test/data/corrupt-sstables/la-1-big-CRC.db
new file mode 100644
index 0000000..f13b9c7
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-CRC.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Data.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Data.db b/test/data/corrupt-sstables/la-1-big-Data.db
new file mode 100644
index 0000000..dc516d8
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Data.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Digest.adler32
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Digest.adler32 b/test/data/corrupt-sstables/la-1-big-Digest.adler32
new file mode 100644
index 0000000..e447277
--- /dev/null
+++ b/test/data/corrupt-sstables/la-1-big-Digest.adler32
@@ -0,0 +1 @@
+2370519993
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Filter.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Filter.db b/test/data/corrupt-sstables/la-1-big-Filter.db
new file mode 100644
index 0000000..709d2ea
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Filter.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Index.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Index.db b/test/data/corrupt-sstables/la-1-big-Index.db
new file mode 100644
index 0000000..178221e
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Index.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Statistics.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Statistics.db b/test/data/corrupt-sstables/la-1-big-Statistics.db
new file mode 100644
index 0000000..23b76ac
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Statistics.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-Summary.db b/test/data/corrupt-sstables/la-1-big-Summary.db
new file mode 100644
index 0000000..732f27c
Binary files /dev/null and b/test/data/corrupt-sstables/la-1-big-Summary.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/data/corrupt-sstables/la-1-big-TOC.txt
----------------------------------------------------------------------
diff --git a/test/data/corrupt-sstables/la-1-big-TOC.txt b/test/data/corrupt-sstables/la-1-big-TOC.txt
new file mode 100644
index 0000000..9cbcd44
--- /dev/null
+++ b/test/data/corrupt-sstables/la-1-big-TOC.txt
@@ -0,0 +1,8 @@
+Statistics.db
+Filter.db
+Data.db
+Summary.db
+Digest.adler32
+CRC.db
+TOC.txt
+Index.db

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java b/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java
deleted file mode 100644
index a0bacea..0000000
--- a/test/long/org/apache/cassandra/cql3/DropKeyspaceCommitLogRecycleTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3;
-
-import org.junit.After;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.SchemaLoader;
-
-import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
-
-/**
- * Base class for CQL tests.
- */
-public class DropKeyspaceCommitLogRecycleTest
-{
-    protected static final Logger logger = LoggerFactory.getLogger(DropKeyspaceCommitLogRecycleTest.class);
-
-    private static final String KEYSPACE = "cql_test_keyspace";
-    private static final String KEYSPACE2 = "cql_test_keyspace2";
-
-    static
-    {
-        // Once per-JVM is enough
-        SchemaLoader.prepareServer();
-    }
-
-    private void create(boolean both)
-    {
-        executeOnceInternal(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE));
-        executeOnceInternal(String.format("CREATE TABLE %s.test (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))", KEYSPACE));
-        
-        if (both)
-        {
-            executeOnceInternal(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}", KEYSPACE2));
-            executeOnceInternal(String.format("CREATE TABLE %s.test (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))", KEYSPACE2));
-        }
-    }
-
-    private void insert()
-    {
-        executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (0, 0, 0)", KEYSPACE));
-        executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (1, 1, 1)", KEYSPACE));
-        executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (2, 2, 2)", KEYSPACE));
-
-        executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (0, 0, 0)", KEYSPACE2));
-        executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (1, 1, 1)", KEYSPACE2));
-        executeOnceInternal(String.format("INSERT INTO %s.test (k1, k2, v) VALUES (2, 2, 2)", KEYSPACE2));       
-    }
-
-    private void drop(boolean both)
-    {
-        executeOnceInternal(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE));
-        if (both)
-            executeOnceInternal(String.format("DROP KEYSPACE IF EXISTS %s", KEYSPACE2));
-    }
-
-    @Test
-    public void testRecycle()
-    {
-        for (int i = 0 ; i < 1000 ; i++)
-        {
-            create(i == 0);
-            insert();
-            drop(false);
-        }
-    }
-
-    @After
-    public void afterTest() throws Throwable
-    {
-        drop(true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
deleted file mode 100644
index 24993c8..0000000
--- a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.cassandra.db;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class LongFlushMemtableTest
-{
-    public static final String KEYSPACE1 = "LongFlushMemtableTest";
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.loadSchema();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1));
-    }
-
-    @Test
-    public void testFlushMemtables() throws ConfigurationException
-    {
-        Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        for (int i = 0; i < 100; i++)
-        {
-            CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance);
-            MigrationManager.announceNewColumnFamily(metadata);
-        }
-
-        for (int j = 0; j < 200; j++)
-        {
-            for (int i = 0; i < 100; i++)
-            {
-                Mutation rm = new Mutation(KEYSPACE1, ByteBufferUtil.bytes("key" + j));
-                ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "_CF" + i);
-                // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
-                ByteBuffer value = ByteBuffer.allocate(100000);
-                cf.addColumn(new BufferCell(Util.cellname("c"), value));
-                rm.add(cf);
-                rm.applyUnsafe();
-            }
-        }
-
-        int flushes = 0;
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-        {
-            if (cfs.name.startsWith("_CF"))
-                flushes += cfs.metric.memtableSwitchCount.getCount();
-        }
-        assert flushes > 0;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java b/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
deleted file mode 100644
index fe22da8..0000000
--- a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.utils.WrappedRunnable;
-import static org.apache.cassandra.Util.column;
-
-import org.apache.cassandra.Util;
-
-public class LongKeyspaceTest
-{
-    public static final String KEYSPACE1 = "LongKeyspaceTest";
-    public static final String CF_STANDARD = "Standard1";
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
-    }
-
-    @Test
-    public void testGetRowMultiColumn() throws Throwable
-    {
-        final Keyspace keyspace = Keyspace.open(KEYSPACE1);
-        final ColumnFamilyStore cfStore = keyspace.getColumnFamilyStore("Standard1");
-
-        for (int i = 1; i < 5000; i += 100)
-        {
-            Mutation rm = new Mutation(KEYSPACE1, Util.dk("key" + i).getKey());
-            ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
-            for (int j = 0; j < i; j++)
-                cf.addColumn(column("c" + j, "v" + j, 1L));
-            rm.add(cf);
-            rm.applyUnsafe();
-        }
-
-        Runnable verify = new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                ColumnFamily cf;
-                for (int i = 1; i < 5000; i += 100)
-                {
-                    for (int j = 0; j < i; j++)
-                    {
-                        cf = cfStore.getColumnFamily(Util.namesQueryFilter(cfStore, Util.dk("key" + i), "c" + j));
-                        KeyspaceTest.assertColumns(cf, "c" + j);
-                    }
-                }
-
-            }
-        };
-        KeyspaceTest.reTest(keyspace.getColumnFamilyStore("Standard1"), verify);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java b/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
deleted file mode 100644
index b4efd49..0000000
--- a/test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.apache.cassandra.db.commitlog;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.cassandra.Util;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
-
-public class ComitLogStress
-{
-
-    public static final String format = "%s,%s,%s,%s,%s,%s";
-
-    public static void main(String[] args) throws Exception {
-        int NUM_THREADS = Runtime.getRuntime().availableProcessors();
-        if (args.length >= 1) {
-            NUM_THREADS = Integer.parseInt(args[0]);
-            System.out.println("Setting num threads to: " + NUM_THREADS);
-        }
-        ExecutorService executor = new JMXEnabledThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60,
-                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10 * NUM_THREADS), new NamedThreadFactory("Stress"), "");
-        ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
-
-        org.apache.cassandra.SchemaLoader.loadSchema();
-        org.apache.cassandra.SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
-        final AtomicLong count = new AtomicLong();
-        final long start = System.currentTimeMillis();
-        System.out.println(String.format(format, "seconds", "max_mb", "allocated_mb", "free_mb", "diffrence", "count"));
-        scheduled.scheduleAtFixedRate(new Runnable() {
-            long lastUpdate = 0;
-
-            public void run() {
-                Runtime runtime = Runtime.getRuntime();
-                long maxMemory = mb(runtime.maxMemory());
-                long allocatedMemory = mb(runtime.totalMemory());
-                long freeMemory = mb(runtime.freeMemory());
-                long temp = count.get();
-                System.out.println(String.format(format, ((System.currentTimeMillis() - start) / 1000),
-                        maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate));
-                lastUpdate = temp;
-            }
-        }, 1, 1, TimeUnit.SECONDS);
-
-        while (true) {
-            executor.execute(new CommitlogExecutor());
-            count.incrementAndGet();
-        }
-    }
-
-    private static long mb(long maxMemory) {
-        return maxMemory / (1024 * 1024);
-    }
-
-    static final String keyString = UUIDGen.getTimeUUID().toString();
-    public static class CommitlogExecutor implements Runnable {
-        public void run() {
-            String ks = "Keyspace1";
-            ByteBuffer key = ByteBufferUtil.bytes(keyString);
-            for (int i=0; i<100; ++i) {
-                Mutation mutation = new Mutation(ks, key);
-                mutation.add("Standard1", Util.cellname("name"), ByteBufferUtil.bytes("value" + i),
-                        System.currentTimeMillis());
-                CommitLog.instance.add(mutation);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 5897dec..9b4dde7 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -30,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -48,15 +49,20 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.config.Config.CommitLogSync;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnSerializer;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class CommitLogStressTest
 {
@@ -354,8 +360,7 @@ public class CommitLogStressTest
                 }
                 double time = (System.currentTimeMillis() - start) / 1000.0;
                 double avg = (temp / time);
-                System.out
-                        .println(
+                System.out.println(
                         String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
                                       ((System.currentTimeMillis() - start) / 1000),
                                       mb(maxMemory),
@@ -421,20 +426,20 @@ public class CommitLogStressTest
             {
                 if (rl != null)
                     rl.acquire();
-                String ks = "Keyspace1";
                 ByteBuffer key = randomBytes(16, rand);
-                Mutation mutation = new Mutation(ks, key);
 
+                UpdateBuilder builder = UpdateBuilder.create(Schema.instance.getCFMetaData("Keyspace1", "Standard1"), Util.dk(key));
                 for (int ii = 0; ii < numCells; ii++)
                 {
                     int sz = randomSize ? rand.nextInt(cellSize) : cellSize;
                     ByteBuffer bytes = randomBytes(sz, rand);
-                    mutation.add("Standard1", Util.cellname("name" + ii), bytes, System.currentTimeMillis());
+                    builder.newRow("name" + ii).add("val", bytes);
                     hash = hash(hash, bytes);
                     ++cells;
                     dataSize += sz;
                 }
-                rp = commitLog.add(mutation);
+
+                rp = commitLog.add(new Mutation(builder.build()));
                 counter.incrementAndGet();
             }
         }
@@ -468,7 +473,7 @@ public class CommitLogStressTest
             {
                 mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn),
                                                            desc.getMessagingVersion(),
-                                                           ColumnSerializer.Flag.LOCAL);
+                                                           SerializationHelper.Flag.LOCAL);
             }
             catch (IOException e)
             {
@@ -476,18 +481,24 @@ public class CommitLogStressTest
                 throw new AssertionError(e);
             }
 
-            for (ColumnFamily cf : mutation.getColumnFamilies())
+            for (PartitionUpdate cf : mutation.getPartitionUpdates())
             {
-                for (Cell c : cf.getSortedColumns())
+
+                Iterator<Row> rowIterator = cf.iterator();
+
+                while (rowIterator.hasNext())
                 {
-                    if (new String(c.name().toByteBuffer().array(), StandardCharsets.UTF_8).startsWith("name"))
+                    Row row = rowIterator.next();
+                    if (!(UTF8Type.instance.compose(row.clustering().get(0)).startsWith("name")))
+                        continue;
+
+                    for (Cell cell : row)
                     {
-                        hash = hash(hash, c.value());
+                        hash = hash(hash, cell.value());
                         ++cells;
                     }
                 }
             }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index e6c8f56..ca223ca 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -26,12 +26,14 @@ import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.SSTableUtils;
@@ -93,7 +95,7 @@ public class LongCompactionsTest
         testCompaction(100, 800, 5);
     }
 
-    protected void testCompaction(int sstableCount, int rowsPerSSTable, int colsPerRow) throws Exception
+    protected void testCompaction(int sstableCount, int partitionsPerSSTable, int rowsPerPartition) throws Exception
     {
         CompactionManager.instance.disableAutoCompaction();
 
@@ -103,17 +105,16 @@ public class LongCompactionsTest
         ArrayList<SSTableReader> sstables = new ArrayList<>();
         for (int k = 0; k < sstableCount; k++)
         {
-            SortedMap<String,ColumnFamily> rows = new TreeMap<String,ColumnFamily>();
-            for (int j = 0; j < rowsPerSSTable; j++)
+            SortedMap<String, PartitionUpdate> rows = new TreeMap<>();
+            for (int j = 0; j < partitionsPerSSTable; j++)
             {
                 String key = String.valueOf(j);
-                Cell[] cols = new Cell[colsPerRow];
-                for (int i = 0; i < colsPerRow; i++)
-                {
-                    // last sstable has highest timestamps
-                    cols[i] = Util.column(String.valueOf(i), String.valueOf(i), k);
-                }
-                rows.put(key, SSTableUtils.createCF(KEYSPACE1, CF_STANDARD, Long.MIN_VALUE, Integer.MIN_VALUE, cols));
+                // last sstable has highest timestamps
+                UpdateBuilder builder = UpdateBuilder.create(store.metadata, String.valueOf(j))
+                                                     .withTimestamp(k);
+                for (int i = 0; i < rowsPerPartition; i++)
+                    builder.newRow(String.valueOf(i)).add("val", String.valueOf(i));
+                rows.put(key, builder.build());
             }
             SSTableReader sstable = SSTableUtils.prepare().write(rows);
             sstables.add(sstable);
@@ -133,8 +134,8 @@ public class LongCompactionsTest
         System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
                                          this.getClass().getName(),
                                          sstableCount,
-                                         rowsPerSSTable,
-                                         colsPerRow,
+                                         partitionsPerSSTable,
+                                         rowsPerPartition,
                                          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)));
     }
 
@@ -157,23 +158,23 @@ public class LongCompactionsTest
         for (int j = 0; j < SSTABLES; j++) {
             for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
                 DecoratedKey key = Util.dk(String.valueOf(i % 2));
-                Mutation rm = new Mutation(KEYSPACE1, key.getKey());
                 long timestamp = j * ROWS_PER_SSTABLE + i;
-                rm.add("Standard1", Util.cellname(String.valueOf(i / 2)),
-                       ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                       timestamp);
                 maxTimestampExpected = Math.max(timestamp, maxTimestampExpected);
-                rm.apply();
+                UpdateBuilder.create(cfs.metadata, key)
+                             .withTimestamp(timestamp)
+                             .newRow(String.valueOf(i / 2)).add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+                             .apply();
+
                 inserted.add(key);
             }
             cfs.forceBlockingFlush();
             CompactionsTest.assertMaxTimestamp(cfs, maxTimestampExpected);
-            assertEquals(inserted.toString(), inserted.size(), Util.getRangeSlice(cfs).size());
+
+            assertEquals(inserted.toString(), inserted.size(), Util.getAll(Util.cmd(cfs).build()).size());
         }
 
         forceCompactions(cfs);
-
-        assertEquals(inserted.size(), Util.getRangeSlice(cfs).size());
+        assertEquals(inserted.toString(), inserted.size(), Util.getAll(Util.cmd(cfs).build()).size());
 
         // make sure max timestamp of compacted sstables is recorded properly after compaction.
         CompactionsTest.assertMaxTimestamp(cfs, maxTimestampExpected);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index cc8203e..fbee72a 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.UpdateBuilder;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -75,11 +76,11 @@ public class LongLeveledCompactionStrategyTest
         for (int r = 0; r < rows; r++)
         {
             DecoratedKey key = Util.dk(String.valueOf(r));
-            Mutation rm = new Mutation(ksname, key.getKey());
+            UpdateBuilder builder = UpdateBuilder.create(store.metadata, key);
             for (int c = 0; c < columns; c++)
-            {
-                rm.add(cfname, Util.cellname("column" + c), value, 0);
-            }
+                builder.newRow("column" + c).add("val", value);
+
+            Mutation rm = new Mutation(builder.build());
             rm.apply();
             store.forceBlockingFlush();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java b/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java
new file mode 100644
index 0000000..575036f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/AbstractReadCommandBuilder.java
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractReadCommandBuilder
+{
+    protected final ColumnFamilyStore cfs;
+    protected int nowInSeconds;
+
+    private int cqlLimit = -1;
+    private int pagingLimit = -1;
+    private boolean reversed = false;
+
+    private Set<ColumnIdentifier> columns;
+    protected final RowFilter filter = RowFilter.create();
+
+    private Slice.Bound lowerClusteringBound;
+    private Slice.Bound upperClusteringBound;
+
+    private NavigableSet<Clustering> clusterings;
+
+    // Use Util.cmd() instead of this ctor directly
+    AbstractReadCommandBuilder(ColumnFamilyStore cfs)
+    {
+        this.cfs = cfs;
+        this.nowInSeconds = FBUtilities.nowInSeconds();
+    }
+
+    public AbstractReadCommandBuilder withNowInSeconds(int nowInSec)
+    {
+        this.nowInSeconds = nowInSec;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder fromIncl(Object... values)
+    {
+        assert lowerClusteringBound == null && clusterings == null;
+        this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, true, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder fromExcl(Object... values)
+    {
+        assert lowerClusteringBound == null && clusterings == null;
+        this.lowerClusteringBound = Slice.Bound.create(cfs.metadata.comparator, true, false, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder toIncl(Object... values)
+    {
+        assert upperClusteringBound == null && clusterings == null;
+        this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, true, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder toExcl(Object... values)
+    {
+        assert upperClusteringBound == null && clusterings == null;
+        this.upperClusteringBound = Slice.Bound.create(cfs.metadata.comparator, false, false, values);
+        return this;
+    }
+
+    public AbstractReadCommandBuilder includeRow(Object... values)
+    {
+        assert lowerClusteringBound == null && upperClusteringBound == null;
+
+        if (this.clusterings == null)
+            this.clusterings = new TreeSet<>(cfs.metadata.comparator);
+
+        this.clusterings.add(cfs.metadata.comparator.make(values));
+        return this;
+    }
+
+    public AbstractReadCommandBuilder reverse()
+    {
+        this.reversed = true;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder withLimit(int newLimit)
+    {
+        this.cqlLimit = newLimit;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder withPagingLimit(int newLimit)
+    {
+        this.pagingLimit = newLimit;
+        return this;
+    }
+
+    public AbstractReadCommandBuilder columns(String... columns)
+    {
+        if (this.columns == null)
+            this.columns = new HashSet<>();
+
+        for (String column : columns)
+            this.columns.add(ColumnIdentifier.getInterned(column, true));
+        return this;
+    }
+
+    private ByteBuffer bb(Object value, AbstractType<?> type)
+    {
+        return value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)type).decompose(value);
+    }
+
+    private AbstractType<?> forValues(AbstractType<?> collectionType)
+    {
+        assert collectionType instanceof CollectionType;
+        CollectionType ct = (CollectionType)collectionType;
+        switch (ct.kind)
+        {
+            case LIST:
+            case MAP:
+                return ct.valueComparator();
+            case SET:
+                return ct.nameComparator();
+        }
+        throw new AssertionError();
+    }
+
+    private AbstractType<?> forKeys(AbstractType<?> collectionType)
+    {
+        assert collectionType instanceof CollectionType;
+        CollectionType ct = (CollectionType)collectionType;
+        switch (ct.kind)
+        {
+            case LIST:
+            case MAP:
+                return ct.nameComparator();
+        }
+        throw new AssertionError();
+    }
+
+    public AbstractReadCommandBuilder filterOn(String column, Operator op, Object value)
+    {
+        ColumnDefinition def = cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned(column, true));
+        assert def != null;
+
+        AbstractType<?> type = def.type;
+        if (op == Operator.CONTAINS)
+            type = forValues(type);
+        else if (op == Operator.CONTAINS_KEY)
+            type = forKeys(type);
+
+        this.filter.add(def, op, bb(value, type));
+        return this;
+    }
+
+    protected ColumnFilter makeColumnFilter()
+    {
+        if (columns == null)
+            return ColumnFilter.all(cfs.metadata);
+
+        ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(cfs.metadata);
+        for (ColumnIdentifier column : columns)
+            builder.add(cfs.metadata.getColumnDefinition(column));
+        return builder.build();
+    }
+
+    protected ClusteringIndexFilter makeFilter()
+    {
+        if (clusterings != null)
+        {
+            return new ClusteringIndexNamesFilter(clusterings, reversed);
+        }
+        else
+        {
+            Slice slice = Slice.make(lowerClusteringBound == null ? Slice.Bound.BOTTOM : lowerClusteringBound,
+                                     upperClusteringBound == null ? Slice.Bound.TOP : upperClusteringBound);
+            return new ClusteringIndexSliceFilter(Slices.with(cfs.metadata.comparator, slice), reversed);
+        }
+    }
+
+    protected DataLimits makeLimits()
+    {
+        DataLimits limits = cqlLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(cqlLimit);
+        if (pagingLimit >= 0)
+            limits = limits.forPaging(pagingLimit);
+        return limits;
+    }
+
+    public Row getOnlyRow()
+    {
+        return Util.getOnlyRow(build());
+    }
+
+    public Row getOnlyRowUnfiltered()
+    {
+        return Util.getOnlyRowUnfiltered(build());
+    }
+
+    public FilteredPartition getOnlyPartition()
+    {
+        return Util.getOnlyPartition(build());
+    }
+
+    public Partition getOnlyPartitionUnfiltered()
+    {
+        return Util.getOnlyPartitionUnfiltered(build());
+    }
+
+    public abstract ReadCommand build();
+
+    public static class SinglePartitionBuilder extends AbstractReadCommandBuilder
+    {
+        private final DecoratedKey partitionKey;
+
+        SinglePartitionBuilder(ColumnFamilyStore cfs, DecoratedKey key)
+        {
+            super(cfs);
+            this.partitionKey = key;
+        }
+
+        @Override
+        public ReadCommand build()
+        {
+            return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
+        }
+    }
+
+    public static class PartitionRangeBuilder extends AbstractReadCommandBuilder
+    {
+        private DecoratedKey startKey;
+        private boolean startInclusive;
+        private DecoratedKey endKey;
+        private boolean endInclusive;
+
+        PartitionRangeBuilder(ColumnFamilyStore cfs)
+        {
+            super(cfs);
+        }
+
+        public PartitionRangeBuilder fromKeyIncl(Object... values)
+        {
+            assert startKey == null;
+            this.startInclusive = true;
+            this.startKey = Util.makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        public PartitionRangeBuilder fromKeyExcl(Object... values)
+        {
+            assert startKey == null;
+            this.startInclusive = false;
+            this.startKey = Util.makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        public PartitionRangeBuilder toKeyIncl(Object... values)
+        {
+            assert endKey == null;
+            this.endInclusive = true;
+            this.endKey = Util.makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        public PartitionRangeBuilder toKeyExcl(Object... values)
+        {
+            assert endKey == null;
+            this.endInclusive = false;
+            this.endKey = Util.makeKey(cfs.metadata, values);
+            return this;
+        }
+
+        @Override
+        public ReadCommand build()
+        {
+            PartitionPosition start = startKey;
+            if (start == null)
+            {
+                start = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+                startInclusive = false;
+            }
+            PartitionPosition end = endKey;
+            if (end == null)
+            {
+                end = StorageService.getPartitioner().getMinimumToken().maxKeyBound();
+                endInclusive = true;
+            }
+            
+            AbstractBounds<PartitionPosition> bounds;
+            if (startInclusive && endInclusive)
+                bounds = new Bounds<PartitionPosition>(start, end);
+            else if (startInclusive && !endInclusive)
+                bounds = new IncludingExcludingBounds<PartitionPosition>(start, end);
+            else if (!startInclusive && endInclusive)
+                bounds = new Range<PartitionPosition>(start, end);
+            else
+                bounds = new ExcludingBounds<PartitionPosition>(start, end);
+
+            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/EmbeddedServer.java b/test/unit/org/apache/cassandra/EmbeddedServer.java
deleted file mode 100644
index 25754ea..0000000
--- a/test/unit/org/apache/cassandra/EmbeddedServer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.service.CassandraDaemon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-public class EmbeddedServer extends SchemaLoader
-{
-    protected static CassandraDaemon daemon = null;
-
-    enum GatewayService
-    {
-        Thrift
-    }
-
-    public static GatewayService getDaemonGatewayService()
-    {
-        return GatewayService.Thrift;
-    }
-
-    static ExecutorService executor = Executors.newSingleThreadExecutor();
-
-    @BeforeClass
-    public static void startCassandra()
-
-    {
-        executor.execute(new Runnable()
-        {
-            public void run()
-            {
-                switch (getDaemonGatewayService())
-                {
-                    case Thrift:
-                    default:
-                        daemon = new org.apache.cassandra.service.CassandraDaemon();
-                }
-                daemon.activate();
-            }
-        });
-        try
-        {
-            TimeUnit.SECONDS.sleep(3);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    @AfterClass
-    public static void stopCassandra() throws Exception
-    {
-        if (daemon != null)
-        {
-            daemon.deactivate();
-        }
-        executor.shutdown();
-        executor.shutdownNow();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index c71c98b..f0a849b 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -21,7 +21,7 @@ package org.apache.cassandra;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableMap;
@@ -32,7 +32,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.SimpleSparseCellNameType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.io.sstable.Component;
@@ -119,12 +118,13 @@ public class MockSchema
                 throw new RuntimeException(e);
             }
         }
+        SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
         StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
-                                                 .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1)
+                                                 .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header)
                                                  .get(MetadataType.STATS);
         SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata, Murmur3Partitioner.instance,
                                                           segmentedFile.sharedCopy(), segmentedFile.sharedCopy(), indexSummary.sharedCopy(),
-                                                          new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL);
+                                                          new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
         reader.first = reader.last = readerBounds(generation);
         if (!keepRef)
             reader.selfRef().release();
@@ -140,10 +140,11 @@ public class MockSchema
 
     private static CFMetaData newCFMetaData(String ksname, String cfname)
     {
-        CFMetaData metadata = new CFMetaData(ksname,
-                                             cfname,
-                                             ColumnFamilyType.Standard,
-                                             new SimpleSparseCellNameType(UTF8Type.instance));
+        CFMetaData metadata = CFMetaData.Builder.create(ksname, cfname)
+                                                .addPartitionKey("key", UTF8Type.instance)
+                                                .addClusteringColumn("col", UTF8Type.instance)
+                                                .addRegularColumn("value", UTF8Type.instance)
+                                                .build();
         metadata.caching(CachingOptions.NONE);
         return metadata;
     }