You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/05/04 21:04:34 UTC

cassandra git commit: Faster Streaming

Repository: cassandra
Updated Branches:
  refs/heads/trunk 594183b03 -> 1e92ce43a


Faster Streaming

Patch by tjake; reviewed by Stefania Alborghetti for CASSANDRA-9766


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e92ce43
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e92ce43
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e92ce43

Branch: refs/heads/trunk
Commit: 1e92ce43a5a730f81d3f6cfd72e7f4b126db788a
Parents: 594183b
Author: T Jake Luciani <ja...@apache.org>
Authored: Sat Apr 9 10:03:32 2016 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed May 4 17:04:12 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../concurrent/NamedThreadFactory.java          |   4 +-
 .../apache/cassandra/concurrent/SEPWorker.java  |   3 +-
 .../org/apache/cassandra/db/ColumnIndex.java    |  69 +++++---
 .../columniterator/SSTableReversedIterator.java |   2 +-
 .../db/commitlog/FileDirectSegment.java         |   3 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  30 ++--
 .../cassandra/db/rows/ComplexColumnData.java    |  10 +-
 .../cassandra/db/rows/UnfilteredSerializer.java |  40 ++++-
 .../io/sstable/format/big/BigTableWriter.java   |  50 +++---
 .../cassandra/io/util/DataOutputBuffer.java     |  38 ++++-
 .../io/util/DataOutputBufferFixed.java          |   4 +-
 .../cassandra/io/util/DataOutputStreamPlus.java |   3 +-
 .../cassandra/io/util/SafeMemoryWriter.java     |   2 +-
 .../cassandra/streaming/ConnectionHandler.java  |   4 +-
 .../compress/CompressedInputStream.java         |  59 +++++--
 .../compress/CompressedStreamReader.java        |  11 +-
 .../cassandra/tools/SSTableMetadataViewer.java  |  10 +-
 .../org/apache/cassandra/utils/BloomFilter.java |   7 +-
 .../apache/cassandra/utils/FilterFactory.java   |   6 +-
 .../cassandra/utils/StreamingHistogram.java     |  84 +++++----
 .../org/apache/cassandra/utils/btree/BTree.java |  67 ++++----
 .../apache/cassandra/utils/btree/BTreeSet.java  |   3 -
 .../cassandra/utils/btree/TreeBuilder.java      |  30 +++-
 .../concurrent/WrappedSharedCloseable.java      |   4 +-
 .../cassandra/utils/memory/BufferPool.java      |  16 +-
 .../apache/cassandra/utils/vint/VIntCoding.java |   3 +-
 .../cassandra/streaming/LongStreamingTest.java  | 171 +++++++++++++++++++
 .../apache/cassandra/db/RowIndexEntryTest.java  |   9 +-
 .../org/apache/cassandra/utils/BTreeTest.java   |  20 ++-
 .../cassandra/utils/StreamingHistogramTest.java |  48 +++++-
 31 files changed, 597 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6fe57b2..c2165db 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.8
+ * Faster streaming (CASSANDRA-9766)
 
 3.6
  * Enhanced Compaction Logging (CASSANDRA-10805)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 33c80d5..85edf74 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.concurrent;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.util.concurrent.FastThreadLocalThread;
+
 /**
  * This class is an implementation of the <i>ThreadFactory</i> interface. This
  * is useful to give Java threads meaningful names which is useful when using
@@ -55,7 +57,7 @@ public class NamedThreadFactory implements ThreadFactory
     public Thread newThread(Runnable runnable)
     {
         String name = id + ":" + n.getAndIncrement();
-        Thread thread = new Thread(threadGroup, runnable, name);
+        Thread thread = new FastThreadLocalThread(threadGroup, runnable, name);
         thread.setPriority(priority);
         thread.setDaemon(true);
         if (contextClassLoader != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/concurrent/SEPWorker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index 3b3e7ad..d7c21bc 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.LockSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnable
@@ -45,7 +46,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
     {
         this.pool = pool;
         this.workerId = workerId;
-        thread = new Thread(this, pool.poolName + "-Worker-" + workerId);
+        thread = new FastThreadLocalThread(this, pool.poolName + "-Worker-" + workerId);
         thread.setDaemon(true);
         set(initialState);
         thread.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 4dcceff..2e7a2ee 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -41,13 +41,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public class ColumnIndex
 {
-
     // used, if the row-index-entry reaches config column_index_cache_size_in_kb
     private DataOutputBuffer buffer;
     // used to track the size of the serialized size of row-index-entry (unused for buffer)
     private int indexSamplesSerializedSize;
     // used, until the row-index-entry reaches config column_index_cache_size_in_kb
-    public List<IndexInfo> indexSamples = new ArrayList<>();
+    private final List<IndexInfo> indexSamples = new ArrayList<>();
 
     public int columnIndexCount;
     private int[] indexOffsets;
@@ -55,11 +54,10 @@ public class ColumnIndex
     private final SerializationHeader header;
     private final int version;
     private final SequentialWriter writer;
-    private final long initialPosition;
-    private final ISerializer<IndexInfo> idxSerializer;
-    public long headerLength = -1;
-
-    private long startPosition = -1;
+    private long initialPosition;
+    private final  ISerializer<IndexInfo> idxSerializer;
+    public long headerLength;
+    private long startPosition;
 
     private int written;
     private long previousRowStart;
@@ -72,17 +70,32 @@ public class ColumnIndex
     private final Collection<SSTableFlushObserver> observers;
 
     public ColumnIndex(SerializationHeader header,
-                       SequentialWriter writer,
-                       Version version,
-                       Collection<SSTableFlushObserver> observers,
-                       ISerializer<IndexInfo> indexInfoSerializer)
+                        SequentialWriter writer,
+                        Version version,
+                        Collection<SSTableFlushObserver> observers,
+                        ISerializer<IndexInfo> indexInfoSerializer)
     {
         this.header = header;
-        this.idxSerializer = indexInfoSerializer;
         this.writer = writer;
         this.version = version.correspondingMessagingVersion();
         this.observers = observers;
+        this.idxSerializer = indexInfoSerializer;
+    }
+
+    public void reset()
+    {
         this.initialPosition = writer.position();
+        this.headerLength = -1;
+        this.startPosition = -1;
+        this.previousRowStart = 0;
+        this.columnIndexCount = 0;
+        this.written = 0;
+        this.indexSamplesSerializedSize = 0;
+        this.indexSamples.clear();
+        this.firstClustering = null;
+        this.lastClustering = null;
+        this.openMarker = null;
+        this.buffer = null;
     }
 
     public void buildRowIndex(UnfilteredRowIterator iterator) throws IOException
@@ -93,7 +106,7 @@ public class ColumnIndex
         while (iterator.hasNext())
             add(iterator.next());
 
-        close();
+        finish();
     }
 
     private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
@@ -120,6 +133,16 @@ public class ColumnIndex
         return buffer != null ? buffer.buffer() : null;
     }
 
+    public List<IndexInfo> indexSamples()
+    {
+        if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) <= DatabaseDescriptor.getColumnIndexCacheSize())
+        {
+            return indexSamples;
+        }
+
+        return null;
+    }
+
     public int[] offsets()
     {
         return indexOffsets != null
@@ -136,7 +159,7 @@ public class ColumnIndex
                                              openMarker);
 
         // indexOffsets is used for both shallow (ShallowIndexedEntry) and non-shallow IndexedEntry.
-        // For shallow ones, we need it to serialize the offsts in close().
+        // For shallow ones, we need it to serialize the offsts in finish().
         // For non-shallow ones, the offsts are passed into IndexedEntry, so we don't have to
         // calculate the offsets again.
 
@@ -149,10 +172,19 @@ public class ColumnIndex
         {
             if (columnIndexCount >= indexOffsets.length)
                 indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length + 10);
-            indexOffsets[columnIndexCount] =
+
+            //the 0th element is always 0
+            if (columnIndexCount == 0)
+            {
+                indexOffsets[columnIndexCount] = 0;
+            }
+            else
+            {
+                indexOffsets[columnIndexCount] =
                 buffer != null
-                    ? Ints.checkedCast(buffer.position())
-                    : indexSamplesSerializedSize;
+                ? Ints.checkedCast(buffer.position())
+                : indexSamplesSerializedSize;
+            }
         }
         columnIndexCount++;
 
@@ -168,7 +200,6 @@ public class ColumnIndex
                 {
                     idxSerializer.serialize(indexSample, buffer);
                 }
-                indexSamples = null;
             }
             else
             {
@@ -216,7 +247,7 @@ public class ColumnIndex
             addIndexBlock();
     }
 
-    private void close() throws IOException
+    private void finish() throws IOException
     {
         UnfilteredSerializer.serializer.writeEndOfPartition(writer);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 7f6b462..6fef70f 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -353,7 +353,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
         public void reset()
         {
             built = null;
-            rowBuilder.reuse();
+            rowBuilder = BTree.builder(metadata.comparator);
             deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index ec4aa91..50f9efd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -23,6 +23,7 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.BufferType;
@@ -34,7 +35,7 @@ import org.apache.cassandra.io.util.FileUtils;
  */
 public abstract class FileDirectSegment extends CommitLogSegment
 {
-    protected static final ThreadLocal<ByteBuffer> reusableBufferHolder = new ThreadLocal<ByteBuffer>()
+    protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>()
     {
         protected ByteBuffer initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 47cfd58..63aa157 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -330,9 +330,10 @@ public class BTreeRow extends AbstractRow
             if (cd.column().isSimple())
                 return false;
 
-            if (!((ComplexColumnData)cd).complexDeletion().isLive())
+            if (!((ComplexColumnData) cd).complexDeletion().isLive())
                 return true;
         }
+
         return false;
     }
 
@@ -620,7 +621,7 @@ public class BTreeRow extends AbstractRow
         protected Deletion deletion = Deletion.LIVE;
 
         private final boolean isSorted;
-        private final BTree.Builder<Cell> cells;
+        private BTree.Builder<Cell> cells_;
         private final CellResolver resolver;
         private boolean hasComplex = false;
 
@@ -633,10 +634,19 @@ public class BTreeRow extends AbstractRow
 
         protected Builder(boolean isSorted, int nowInSecs)
         {
-            this.cells = BTree.builder(ColumnData.comparator);
+            cells_ = null;
             resolver = new CellResolver(nowInSecs);
             this.isSorted = isSorted;
-            this.cells.auto(false);
+        }
+
+        private BTree.Builder<Cell> getCells()
+        {
+            if (cells_ == null)
+            {
+                cells_ = BTree.builder(ColumnData.comparator);
+                cells_.auto(false);
+            }
+            return cells_;
         }
 
         public boolean isSorted()
@@ -660,7 +670,7 @@ public class BTreeRow extends AbstractRow
             this.clustering = null;
             this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
             this.deletion = Deletion.LIVE;
-            this.cells.reuse();
+            this.cells_ = null;
         }
 
         public void addPrimaryKeyLivenessInfo(LivenessInfo info)
@@ -676,25 +686,25 @@ public class BTreeRow extends AbstractRow
         public void addCell(Cell cell)
         {
             assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
-            cells.add(cell);
+            getCells().add(cell);
             hasComplex |= cell.column.isComplex();
         }
 
         public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
         {
-            cells.add(new ComplexColumnDeletion(column, complexDeletion));
+            getCells().add(new ComplexColumnDeletion(column, complexDeletion));
             hasComplex = true;
         }
 
         public Row build()
         {
             if (!isSorted)
-                cells.sort();
+                getCells().sort();
             // we can avoid resolving if we're sorted and have no complex values
             // (because we'll only have unique simple cells, which are already in their final condition)
             if (!isSorted | hasComplex)
-                cells.resolve(resolver);
-            Object[] btree = cells.build();
+                getCells().resolve(resolver);
+            Object[] btree = getCells().build();
 
             if (deletion.isShadowedBy(primaryKeyLivenessInfo))
                 deletion = Deletion.LIVE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index f4678b7..154fc77 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -19,23 +19,22 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.util.*;
+import java.util.Iterator;
+import java.util.Objects;
 import java.util.function.BiFunction;
 
 import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.LivenessInfo;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.ByteType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.UpdateFunction;
 
 /**
  * The data for a complex column, that is it's cells and potential complex
@@ -240,8 +239,7 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
         {
             this.column = column;
             this.complexDeletion = DeletionTime.LIVE; // default if writeComplexDeletion is not called
-            if (builder == null) builder = BTree.builder(column.cellComparator());
-            else builder.reuse(column.cellComparator());
+            this.builder = BTree.builder(column.cellComparator());
         }
 
         public void addComplexDeletion(DeletionTime complexDeletion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 8157bbb..bcc65aa 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -21,9 +21,11 @@ import java.io.IOException;
 
 import com.google.common.collect.Collections2;
 
+import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
 /**
@@ -178,11 +180,37 @@ public class UnfilteredSerializer
 
         if (header.isForSSTable())
         {
-            out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize, version));
-            // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler).
-            // This is currently not used however and using it is tbd.
-            out.writeUnsignedVInt(previousUnfilteredSize);
+            DataOutputBuffer dob = DataOutputBuffer.RECYCLER.get();
+            try
+            {
+                serializeRowBody(row, flags, header, dob);
+
+                out.writeUnsignedVInt(dob.position() + TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize));
+                // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler).
+                // This is currently not used however and using it is tbd.
+                out.writeUnsignedVInt(previousUnfilteredSize);
+                out.write(dob.buffer());
+            }
+            finally
+            {
+                dob.recycle();
+            }
         }
+        else
+        {
+            serializeRowBody(row, flags, header, out);
+        }
+    }
+
+    @Inline
+    private void serializeRowBody(Row row, int flags, SerializationHeader header, DataOutputPlus out)
+    throws IOException
+    {
+        boolean isStatic = row.isStatic();
+
+        Columns headerColumns = header.columns(isStatic);
+        LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
+        Row.Deletion deletion = row.deletion();
 
         if ((flags & HAS_TIMESTAMP) != 0)
             header.writeTimestamp(pkLiveness.timestamp(), out);
@@ -194,7 +222,7 @@ public class UnfilteredSerializer
         if ((flags & HAS_DELETION) != 0)
             header.writeDeletionTime(deletion.time(), out);
 
-        if (!hasAllColumns)
+        if ((flags & HAS_ALL_COLUMNS) == 0)
             Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
 
         for (ColumnData data : row)
@@ -202,7 +230,7 @@ public class UnfilteredSerializer
             if (data.column.isSimple())
                 Cell.serializer.serialize((Cell) data, out, pkLiveness, header);
             else
-                writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out);
+                writeComplexColumn((ComplexColumnData) data, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index b6ea1d9..44b1c3a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -17,50 +17,46 @@
  */
 package org.apache.cassandra.io.sstable.format.big;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.ChunkCache;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.FilterFactory;
-import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
-import org.apache.cassandra.utils.SyncUtil;
-
 public class BigTableWriter extends SSTableWriter
 {
     private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
 
+    private final ColumnIndex columnIndexWriter;
     private final IndexWriter iwriter;
     private final SegmentedFile.Builder dbuilder;
     protected final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
-
     private DataPosition dataMark;
     private long lastEarlyOpenLength = 0;
 
@@ -90,6 +86,8 @@ public class BigTableWriter extends SSTableWriter
             dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
         }
         iwriter = new IndexWriter(keyCount, dataFile);
+
+        columnIndexWriter = new ColumnIndex(header, dataFile, descriptor.version, observers, getRowIndexEntrySerializer().indexInfoSerializer());
     }
 
     public void mark()
@@ -153,29 +151,31 @@ public class BigTableWriter extends SSTableWriter
         long startPosition = beforeAppend(key);
         observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
 
+        //Reuse the writer for each row
+        columnIndexWriter.reset();
+
         try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector)))
         {
-            ColumnIndex columnIndex = new ColumnIndex(header, dataFile, descriptor.version, observers,
-                                                      getRowIndexEntrySerializer().indexInfoSerializer());
-
-            columnIndex.buildRowIndex(collecting);
+            columnIndexWriter.buildRowIndex(collecting);
 
             // afterAppend() writes the partition key before the first RowIndexEntry - so we have to add it's
             // serialized size to the index-writer position
             long indexFilePosition = ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) + iwriter.indexFile.position();
 
             RowIndexEntry entry = RowIndexEntry.create(startPosition, indexFilePosition,
-                                                       collecting.partitionLevelDeletion(), columnIndex.headerLength, columnIndex.columnIndexCount,
-                                                       columnIndex.indexInfoSerializedSize(),
-                                                       columnIndex.indexSamples,
-                                                       columnIndex.offsets(),
+                                                       collecting.partitionLevelDeletion(),
+                                                       columnIndexWriter.headerLength,
+                                                       columnIndexWriter.columnIndexCount,
+                                                       columnIndexWriter.indexInfoSerializedSize(),
+                                                       columnIndexWriter.indexSamples(),
+                                                       columnIndexWriter.offsets(),
                                                        getRowIndexEntrySerializer().indexInfoSerializer());
 
             long endPosition = dataFile.position();
             long rowSize = endPosition - startPosition;
             maybeLogLargePartitionWarning(key, rowSize);
             metadataCollector.addPartitionSizeInBytes(rowSize);
-            afterAppend(key, endPosition, entry, columnIndex.buffer());
+            afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
             return entry;
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 91242b8..8dbad8c 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -21,11 +21,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import org.apache.cassandra.config.Config;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import io.netty.util.Recycler;
+import org.apache.cassandra.config.Config;
+
 /**
  * An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
  * its buffer so copies can be avoided.
@@ -39,6 +40,21 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
      */
     private static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64);
 
+    public static final Recycler<DataOutputBuffer> RECYCLER = new Recycler<DataOutputBuffer>()
+    {
+        protected DataOutputBuffer newObject(Handle handle)
+        {
+            return new DataOutputBuffer(handle);
+        }
+    };
+
+    private final Recycler.Handle handle;
+
+    private DataOutputBuffer(Recycler.Handle handle)
+    {
+        this(128, handle);
+    }
+
     public DataOutputBuffer()
     {
         this(128);
@@ -46,12 +62,26 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
 
     public DataOutputBuffer(int size)
     {
-        super(ByteBuffer.allocate(size));
+        this(size, null);
+    }
+
+    protected DataOutputBuffer(int size, Recycler.Handle handle)
+    {
+        this(ByteBuffer.allocate(size), handle);
     }
 
-    protected DataOutputBuffer(ByteBuffer buffer)
+    protected DataOutputBuffer(ByteBuffer buffer, Recycler.Handle handle)
     {
         super(buffer);
+        this.handle = handle;
+    }
+
+    public void recycle()
+    {
+        assert handle != null;
+        buffer.rewind();
+
+        RECYCLER.recycle(this, handle);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
index c2cc549..5193401 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
@@ -38,12 +38,12 @@ public class DataOutputBufferFixed extends DataOutputBuffer
 
     public DataOutputBufferFixed(int size)
     {
-        super(ByteBuffer.allocate(size));
+        super(size, null);
     }
 
     public DataOutputBufferFixed(ByteBuffer buffer)
     {
-        super(buffer);
+        super(buffer, null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
index a846384..755783b 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -64,7 +65,7 @@ public abstract class DataOutputStreamPlus extends OutputStream implements DataO
         return bytes;
     }
 
-    private static final ThreadLocal<byte[]> tempBuffer = new ThreadLocal<byte[]>()
+    private static final FastThreadLocal<byte[]> tempBuffer = new FastThreadLocal<byte[]>()
     {
         @Override
         public byte[] initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 24eb93c..88912f9 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -33,7 +33,7 @@ public class SafeMemoryWriter extends DataOutputBuffer
 
     private SafeMemoryWriter(SafeMemory memory)
     {
-        super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN));
+        super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN), null);
         this.memory = memory;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 960b531..3600d5e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -206,7 +208,7 @@ public class ConnectionHandler
             this.socket = socket;
             this.protocolVersion = protocolVersion;
 
-            new Thread(this, name() + "-" + socket.getRemoteSocketAddress()).start();
+            new FastThreadLocalThread(this, name() + "-" + socket.getRemoteSocketAddress()).start();
         }
 
         public ListenableFuture<?> close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index c7d5f98..3aaa1a3 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -25,14 +25,13 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
-import java.util.zip.Checksum;
 
 import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.utils.ChecksumType;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -82,31 +81,61 @@ public class CompressedInputStream extends InputStream
         this.crcCheckChanceSupplier = crcCheckChanceSupplier;
         this.checksumType = checksumType;
 
-        new Thread(new Reader(source, info, dataBuffer)).start();
+        new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start();
     }
 
-    public int read() throws IOException
+    private void decompressNextChunk() throws IOException
     {
-        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+        try
         {
-            try
-            {
-                byte[] compressedWithCRC = dataBuffer.take();
-                if (compressedWithCRC == POISON_PILL)
-                    throw new EOFException("No chunk available");
-                decompress(compressedWithCRC);
-            }
-            catch (InterruptedException e)
-            {
+            byte[] compressedWithCRC = dataBuffer.take();
+            if (compressedWithCRC == POISON_PILL)
                 throw new EOFException("No chunk available");
-            }
+            decompress(compressedWithCRC);
         }
+        catch (InterruptedException e)
+        {
+            throw new EOFException("No chunk available");
+        }
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+            decompressNextChunk();
 
         assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
 
         return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
     }
 
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        long nextCurrent = current + len;
+
+        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+            decompressNextChunk();
+
+        assert nextCurrent >= bufferOffset;
+
+        int read = 0;
+        while (read < len)
+        {
+            int nextLen = Math.min((len - read), (int)((bufferOffset + validBufferBytes) - current));
+
+            System.arraycopy(buffer, (int)(current - bufferOffset), b, off + read, nextLen);
+            read += nextLen;
+
+            current += nextLen;
+            if (read != len)
+                decompressNextChunk();
+        }
+
+        return len;
+    }
+
     public void position(long position)
     {
         assert position >= current : "stream can only read forward.";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 5aa393e..8dafa9c 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -17,16 +17,11 @@
  */
 package org.apache.cassandra.streaming.compress;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
-
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,11 +29,12 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.util.TrackedInputStream;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamReader;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.io.util.TrackedInputStream;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -92,6 +88,7 @@ public class CompressedStreamReader extends StreamReader
         try
         {
             writer = createWriter(cfs, totalSize, repairedAt, format);
+            String filename = writer.getFilename();
             int sectionIdx = 0;
             for (Pair<Long, Long> section : sections)
             {
@@ -107,7 +104,7 @@ public class CompressedStreamReader extends StreamReader
                 {
                     writePartition(deserializer, writer);
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+                    session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
             logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index d2e0513..de43c2f 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -17,11 +17,7 @@
  */
 package org.apache.cassandra.tools;
 
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.PrintStream;
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -121,9 +117,9 @@ public class SSTableMetadataViewer
                     out.printf("totalRows: %s%n", stats.totalRows);
                     out.println("Estimated tombstone drop times:");
 
-                    for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
+                    for (Map.Entry<Number, long[]> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
                     {
-                        out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue());
+                        out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()[0]);
                     }
                     printHistograms(stats, out);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index ce6c638..4ff07b7 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -19,13 +19,15 @@ package org.apache.cassandra.utils;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.netty.util.concurrent.FastThreadLocal;
+import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 import org.apache.cassandra.utils.obs.IBitSet;
 
 public class BloomFilter extends WrappedSharedCloseable implements IFilter
 {
-    private static final ThreadLocal<long[]> reusableIndexes = new ThreadLocal<long[]>()
+    private final static FastThreadLocal<long[]> reusableIndexes = new FastThreadLocal<long[]>()
     {
         protected long[] initialValue()
         {
@@ -84,16 +86,19 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter
     // to avoid generating a lot of garbage since stack allocation currently does not support stores
     // (CASSANDRA-6609).  it returns the array so that the caller does not need to perform
     // a second threadlocal lookup.
+    @Inline
     private long[] indexes(FilterKey key)
     {
         // we use the same array both for storing the hash result, and for storing the indexes we return,
         // so that we do not need to allocate two arrays.
         long[] indexes = reusableIndexes.get();
+
         key.filterHash(indexes);
         setIndexes(indexes[1], indexes[0], hashCount, bitset.capacity(), indexes);
         return indexes;
     }
 
+    @Inline
     private void setIndexes(long base, long inc, int count, long max, long[] results)
     {
         if (oldBfHashOrder)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 869f3fa..298e734 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -20,14 +20,14 @@ package org.apache.cassandra.utils;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.obs.IBitSet;
 import org.apache.cassandra.utils.obs.OffHeapBitSet;
 import org.apache.cassandra.utils.obs.OpenBitSet;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class FilterFactory
 {
     public static final IFilter AlwaysPresent = new AlwaysPresentFilter();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index b925395..7ea97e5 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -39,7 +39,10 @@ public class StreamingHistogram
     public static final StreamingHistogramSerializer serializer = new StreamingHistogramSerializer();
 
     // TreeMap to hold bins of histogram.
-    private final TreeMap<Double, Long> bin;
+    // The key is a numeric type so we can avoid boxing/unboxing streams of different key types
+    // The value is a unboxed long array always of length == 1
+    // Serialized Histograms always writes with double keys for backwards compatibility
+    private final TreeMap<Number, long[]> bin;
 
     // maximum bin size for this histogram
     private final int maxBinSize;
@@ -51,22 +54,28 @@ public class StreamingHistogram
     public StreamingHistogram(int maxBinSize)
     {
         this.maxBinSize = maxBinSize;
-        bin = new TreeMap<>();
+        bin = new TreeMap<>((o1, o2) -> {
+            if (o1.getClass().equals(o2.getClass()))
+                return ((Comparable)o1).compareTo(o2);
+            else
+                return ((Double)o1.doubleValue()).compareTo(o2.doubleValue());
+        });
     }
 
     private StreamingHistogram(int maxBinSize, Map<Double, Long> bin)
     {
-        this.maxBinSize = maxBinSize;
-        this.bin = new TreeMap<>(bin);
+        this(maxBinSize);
+        for (Map.Entry<Double, Long> entry : bin.entrySet())
+            this.bin.put(entry.getKey(), new long[]{entry.getValue()});
     }
 
     /**
      * Adds new point p to this histogram.
      * @param p
      */
-    public void update(double p)
+    public void update(Number p)
     {
-        update(p, 1);
+        update(p, 1L);
     }
 
     /**
@@ -74,30 +83,31 @@ public class StreamingHistogram
      * @param p
      * @param m
      */
-    public void update(double p, long m)
+    public void update(Number p, long m)
     {
-        Long mi = bin.get(p);
+        long[] mi = bin.get(p);
         if (mi != null)
         {
             // we found the same p so increment that counter
-            bin.put(p, mi + m);
+            mi[0] += m;
         }
         else
         {
-            bin.put(p, m);
+            mi = new long[]{m};
+            bin.put(p, mi);
             // if bin size exceeds maximum bin size then trim down to max size
             while (bin.size() > maxBinSize)
             {
                 // find points p1, p2 which have smallest difference
-                Iterator<Double> keys = bin.keySet().iterator();
-                double p1 = keys.next();
-                double p2 = keys.next();
+                Iterator<Number> keys = bin.keySet().iterator();
+                double p1 = keys.next().doubleValue();
+                double p2 = keys.next().doubleValue();
                 double smallestDiff = p2 - p1;
                 double q1 = p1, q2 = p2;
                 while (keys.hasNext())
                 {
                     p1 = p2;
-                    p2 = keys.next();
+                    p2 = keys.next().doubleValue();
                     double diff = p2 - p1;
                     if (diff < smallestDiff)
                     {
@@ -107,9 +117,13 @@ public class StreamingHistogram
                     }
                 }
                 // merge those two
-                long k1 = bin.remove(q1);
-                long k2 = bin.remove(q2);
-                bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2);
+                long[] a1 = bin.remove(q1);
+                long[] a2 = bin.remove(q2);
+                long k1 = a1[0];
+                long k2 = a2[0];
+
+                a1[0] += k2;
+                bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1);
             }
         }
     }
@@ -124,8 +138,8 @@ public class StreamingHistogram
         if (other == null)
             return;
 
-        for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet())
-            update(entry.getKey(), entry.getValue());
+        for (Map.Entry<Number, long[]> entry : other.getAsMap().entrySet())
+            update(entry.getKey(), entry.getValue()[0]);
     }
 
     /**
@@ -138,32 +152,32 @@ public class StreamingHistogram
     {
         double sum = 0;
         // find the points pi, pnext which satisfy pi <= b < pnext
-        Map.Entry<Double, Long> pnext = bin.higherEntry(b);
+        Map.Entry<Number, long[]> pnext = bin.higherEntry(b);
         if (pnext == null)
         {
             // if b is greater than any key in this histogram,
             // just count all appearance and return
-            for (Long value : bin.values())
-                sum += value;
+            for (long[] value : bin.values())
+                sum += value[0];
         }
         else
         {
-            Map.Entry<Double, Long> pi = bin.floorEntry(b);
+            Map.Entry<Number, long[]> pi = bin.floorEntry(b);
             if (pi == null)
                 return 0;
             // calculate estimated count mb for point b
-            double weight = (b - pi.getKey()) / (pnext.getKey() - pi.getKey());
-            double mb = pi.getValue() + (pnext.getValue() - pi.getValue()) * weight;
-            sum += (pi.getValue() + mb) * weight / 2;
+            double weight = (b - pi.getKey().doubleValue()) / (pnext.getKey().doubleValue() - pi.getKey().doubleValue());
+            double mb = pi.getValue()[0] + (pnext.getValue()[0] - pi.getValue()[0]) * weight;
+            sum += (pi.getValue()[0] + mb) * weight / 2;
 
-            sum += pi.getValue() / 2.0;
-            for (Long value : bin.headMap(pi.getKey(), false).values())
-                sum += value;
+            sum += pi.getValue()[0] / 2.0;
+            for (long[] value : bin.headMap(pi.getKey(), false).values())
+                sum += value[0];
         }
         return sum;
     }
 
-    public Map<Double, Long> getAsMap()
+    public Map<Number, long[]> getAsMap()
     {
         return Collections.unmodifiableMap(bin);
     }
@@ -173,12 +187,12 @@ public class StreamingHistogram
         public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException
         {
             out.writeInt(histogram.maxBinSize);
-            Map<Double, Long> entries = histogram.getAsMap();
+            Map<Number, long[]> entries = histogram.getAsMap();
             out.writeInt(entries.size());
-            for (Map.Entry<Double, Long> entry : entries.entrySet())
+            for (Map.Entry<Number, long[]> entry : entries.entrySet())
             {
-                out.writeDouble(entry.getKey());
-                out.writeLong(entry.getValue());
+                out.writeDouble(entry.getKey().doubleValue());
+                out.writeLong(entry.getValue()[0]);
             }
         }
 
@@ -198,7 +212,7 @@ public class StreamingHistogram
         public long serializedSize(StreamingHistogram histogram)
         {
             long size = TypeSizes.sizeof(histogram.maxBinSize);
-            Map<Double, Long> entries = histogram.getAsMap();
+            Map<Number, long[]> entries = histogram.getAsMap();
             size += TypeSizes.sizeof(entries.size());
             // size of entries = size * (8(double) + 8(long))
             size += entries.size() * (8L + 8L);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 1c3d2e2..4f21d26 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -24,6 +24,7 @@ import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
 
+import io.netty.util.Recycler;
 import org.apache.cassandra.utils.ObjectSizes;
 
 import static com.google.common.collect.Iterables.concat;
@@ -139,12 +140,9 @@ public class BTree
             return values;
         }
 
-        Queue<TreeBuilder> queue = modifier.get();
-        TreeBuilder builder = queue.poll();
-        if (builder == null)
-            builder = new TreeBuilder();
+        TreeBuilder builder = TreeBuilder.newInstance();
         Object[] btree = builder.build(source, updateF, size);
-        queue.add(builder);
+
         return btree;
     }
 
@@ -176,12 +174,9 @@ public class BTree
         if (isEmpty(btree))
             return build(updateWith, updateWithLength, updateF);
 
-        Queue<TreeBuilder> queue = modifier.get();
-        TreeBuilder builder = queue.poll();
-        if (builder == null)
-            builder = new TreeBuilder();
+
+        TreeBuilder builder = TreeBuilder.newInstance();
         btree = builder.update(btree, comparator, updateWith, updateF);
-        queue.add(builder);
         return btree;
     }
 
@@ -203,12 +198,12 @@ public class BTree
 
     public static <V> Iterator<V> iterator(Object[] btree, Dir dir)
     {
-        return new BTreeSearchIterator<V, V>(btree, null, dir);
+        return new BTreeSearchIterator<>(btree, null, dir);
     }
 
     public static <V> Iterator<V> iterator(Object[] btree, int lb, int ub, Dir dir)
     {
-        return new BTreeSearchIterator<V, V>(btree, null, dir, lb, ub);
+        return new BTreeSearchIterator<>(btree, null, dir, lb, ub);
     }
 
     public static <V> Iterable<V> iterable(Object[] btree)
@@ -771,28 +766,29 @@ public class BTree
         return 1 + lookupSizeMap(root, childIndex - 1);
     }
 
-    private static final ThreadLocal<Queue<TreeBuilder>> modifier = new ThreadLocal<Queue<TreeBuilder>>()
+    final static Recycler<Builder> builderRecycler = new Recycler<Builder>()
     {
-        @Override
-        protected Queue<TreeBuilder> initialValue()
+        protected Builder newObject(Handle handle)
         {
-            return new ArrayDeque<>();
+            return new Builder(handle);
         }
     };
 
     public static <V> Builder<V> builder(Comparator<? super V> comparator)
     {
-        return new Builder<>(comparator);
+        Builder<V> builder = builderRecycler.get();
+        builder.reuse(comparator);
+
+        return builder;
     }
 
     public static <V> Builder<V> builder(Comparator<? super V> comparator, int initialCapacity)
     {
-        return new Builder<>(comparator);
+        return builder(comparator);
     }
 
     public static class Builder<V>
     {
-
         // a user-defined bulk resolution, to be applied manually via resolve()
         public static interface Resolver
         {
@@ -817,16 +813,13 @@ public class BTree
         boolean detected = true; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
         boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates
         QuickResolver<V> quickResolver;
+        final Recycler.Handle recycleHandle;
 
-        protected Builder(Comparator<? super V> comparator)
-        {
-            this(comparator, 16);
-        }
 
-        protected Builder(Comparator<? super V> comparator, int initialCapacity)
+        private Builder(Recycler.Handle handle)
         {
-            this.comparator = comparator;
-            this.values = new Object[initialCapacity];
+            this.recycleHandle = handle;
+            this.values = new Object[16];
         }
 
         public Builder<V> setQuickResolver(QuickResolver<V> quickResolver)
@@ -835,16 +828,19 @@ public class BTree
             return this;
         }
 
-        public void reuse()
+        public void recycle()
         {
-            reuse(comparator);
+            if (recycleHandle != null)
+                builderRecycler.recycle(this, recycleHandle);
         }
 
-        public void reuse(Comparator<? super V> comparator)
+        private void reuse(Comparator<? super V> comparator)
         {
             this.comparator = comparator;
+            quickResolver = null;
             count = 0;
             detected = true;
+            auto = true;
         }
 
         public Builder<V> auto(boolean auto)
@@ -1071,9 +1067,16 @@ public class BTree
 
         public Object[] build()
         {
-            if (auto)
-                autoEnforce();
-            return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
+            try
+            {
+                if (auto)
+                    autoEnforce();
+                return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
+            }
+            finally
+            {
+                this.recycle();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
index 9517009..a59e481 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
@@ -21,14 +21,11 @@ package org.apache.cassandra.utils.btree;
 import java.util.*;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 
 import org.apache.cassandra.utils.btree.BTree.Dir;
 
 import static org.apache.cassandra.utils.btree.BTree.findIndex;
-import static org.apache.cassandra.utils.btree.BTree.lower;
-import static org.apache.cassandra.utils.btree.BTree.toArray;
 
 public class BTreeSet<V> implements NavigableSet<V>, List<V>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
index 024902e..f42de0f 100644
--- a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
+++ b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.utils.btree;
 
 import java.util.Comparator;
 
+import io.netty.util.Recycler;
+
 import static org.apache.cassandra.utils.btree.BTree.EMPTY_LEAF;
 import static org.apache.cassandra.utils.btree.BTree.FAN_SHIFT;
 import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY;
@@ -28,12 +30,32 @@ import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY;
  * A class for constructing a new BTree, either from an existing one and some set of modifications
  * or a new tree from a sorted collection of items.
  * <p/>
- * This is a fairly heavy-weight object, so a ThreadLocal instance is created for making modifications to a tree
+ * This is a fairly heavy-weight object, so a Recycled instance is created for making modifications to a tree
  */
 final class TreeBuilder
 {
+
+    private final static Recycler<TreeBuilder> builderRecycler = new Recycler<TreeBuilder>()
+    {
+        protected TreeBuilder newObject(Handle handle)
+        {
+            return new TreeBuilder(handle);
+        }
+    };
+
+    public static TreeBuilder newInstance()
+    {
+        return builderRecycler.get();
+    }
+
+    private final Recycler.Handle recycleHandle;
     private final NodeBuilder rootBuilder = new NodeBuilder();
 
+    private TreeBuilder(Recycler.Handle handle)
+    {
+        this.recycleHandle = handle;
+    }
+
     /**
      * At the highest level, we adhere to the classic b-tree insertion algorithm:
      *
@@ -93,6 +115,9 @@ final class TreeBuilder
 
         Object[] r = current.toNode();
         current.clear();
+
+        builderRecycler.recycle(this, recycleHandle);
+
         return r;
     }
 
@@ -114,6 +139,9 @@ final class TreeBuilder
 
         Object[] r = current.toNode();
         current.clear();
+
+        builderRecycler.recycle(this, recycleHandle);
+
         return r;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
index 0eefae3..31894b1 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.util.Arrays;
 
-import org.apache.cassandra.utils.Throwables;
-
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -35,7 +33,7 @@ public abstract class WrappedSharedCloseable extends SharedCloseableImpl
 
     public WrappedSharedCloseable(final AutoCloseable closeable)
     {
-        this(new AutoCloseable[] { closeable});
+        this(new AutoCloseable[] {closeable});
     }
 
     public WrappedSharedCloseable(final AutoCloseable[] closeable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index ad2404f..68f8911 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -21,23 +21,23 @@ package org.apache.cassandra.utils.memory;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Queue;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
-
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.BufferPoolMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.Ref;
 
 /**
@@ -68,7 +68,7 @@ public class BufferPool
     private static final GlobalPool globalPool = new GlobalPool();
 
     /** A thread local pool of chunks, where chunks come from the global pool */
-    private static final ThreadLocal<LocalPool> localPool = new ThreadLocal<LocalPool>() {
+    private static final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>() {
         @Override
         protected LocalPool initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index daf5006..3872424 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -50,6 +50,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
 /**
@@ -103,7 +104,7 @@ public class VIntCoding
         return Integer.numberOfLeadingZeros(~firstByte) - 24;
     }
 
-    protected static final ThreadLocal<byte[]> encodingBuffer = new ThreadLocal<byte[]>()
+    protected static final FastThreadLocal<byte[]> encodingBuffer = new FastThreadLocal<byte[]>()
     {
         @Override
         public byte[] initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
new file mode 100644
index 0000000..7e53ba2
--- /dev/null
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongStreamingTest
+{
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        SchemaLoader.cleanupAndLeaveDirs();
+        Keyspace.setInitialized();
+        StorageService.instance.initServer();
+
+        StorageService.instance.setCompactionThroughputMbPerSec(0);
+        StorageService.instance.setStreamThroughputMbPerSec(0);
+        StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        Config.setClientMode(false);
+    }
+
+    @Test
+    public void testCompressedStream() throws InvalidRequestException, IOException, ExecutionException, InterruptedException
+    {
+        String KS = "cql_keyspace";
+        String TABLE = "table1";
+
+        File tempdir = Files.createTempDir();
+        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+        assert dataDir.mkdirs();
+
+        String schema = "CREATE TABLE cql_keyspace.table1 ("
+                        + "  k int PRIMARY KEY,"
+                        + "  v1 text,"
+                        + "  v2 int"
+                        + ");";// with compression = {};";
+        String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .sorted()
+                                                  .inDirectory(dataDir)
+                                                  .forTable(schema)
+                                                  .using(insert).build();
+        long start = System.nanoTime();
+
+        for (int i = 0; i < 10_000_000; i++)
+            writer.addRow(i, "test1", 24);
+
+        writer.close();
+        System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start)));
+
+        File[] dataFiles = dataDir.listFiles((dir, name) -> name.endsWith("-Data.db"));
+        long dataSize = 0l;
+        for (File file : dataFiles)
+        {
+            System.err.println("File : "+file.getAbsolutePath());
+            dataSize += file.length();
+        }
+
+        SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+        {
+            private String ks;
+            public void init(String keyspace)
+            {
+                for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
+                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+
+                this.ks = keyspace;
+            }
+
+            public CFMetaData getTableMetadata(String cfName)
+            {
+                return Schema.instance.getCFMetaData(ks, cfName);
+            }
+        }, new OutputHandler.SystemOutput(false, false));
+
+        start = System.nanoTime();
+        loader.stream().get();
+
+        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec",
+                                         millis/1000d,
+                                         (dataSize / (1 << 20) / (millis / 1000d)) * 8));
+
+
+        //Stream again
+        loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+        {
+            private String ks;
+            public void init(String keyspace)
+            {
+                for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
+                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+
+                this.ks = keyspace;
+            }
+
+            public CFMetaData getTableMetadata(String cfName)
+            {
+                return Schema.instance.getCFMetaData(ks, cfName);
+            }
+        }, new OutputHandler.SystemOutput(false, false));
+
+        start = System.nanoTime();
+        loader.stream().get();
+
+        millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec",
+                                         millis/1000d,
+                                         (dataSize / (1 << 20) / (millis / 1000d)) * 8));
+
+
+        //Compact them both
+        start = System.nanoTime();
+        Keyspace.open(KS).getColumnFamilyStore(TABLE).forceMajorCompaction();
+        millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+
+        System.err.println(String.format("Finished Compacting in %.2f seconds: %.2f Mb/sec",
+                                         millis / 1000d,
+                                         (dataSize * 2 / (1 << 20) / (millis / 1000d)) * 8));
+
+        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1 limit 100;");
+        assertEquals(100, rs.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index ebacf34..244018e 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -28,6 +28,8 @@ import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.primitives.Ints;
+import org.junit.Assert;
+import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.IMeasurableMemory;
@@ -37,12 +39,12 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.columniterator.AbstractSSTableIterator;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
 import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
 import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Unfiltered;
@@ -67,9 +69,6 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 
-import org.junit.Assert;
-import org.junit.Test;
-
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 
@@ -191,7 +190,7 @@ public class RowIndexEntryTest extends CQLTester
             rieNew = RowIndexEntry.create(startPosition, 0L,
                                           deletionInfo, columnIndex.headerLength, columnIndex.columnIndexCount,
                                           columnIndex.indexInfoSerializedSize(),
-                                          columnIndex.indexSamples, columnIndex.offsets(),
+                                          columnIndex.indexSamples(), columnIndex.offsets(),
                                           rieSerializer.indexInfoSerializer());
             rieSerializer.serialize(rieNew, rieOutput, columnIndex.buffer());
             rieNewSerialized = rieOutput.buffer().duplicate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/utils/BTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BTreeTest.java b/test/unit/org/apache/cassandra/utils/BTreeTest.java
index ffd7315..a01ad2e 100644
--- a/test/unit/org/apache/cassandra/utils/BTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/BTreeTest.java
@@ -214,14 +214,16 @@ public class BTreeTest
                 builder.add(i);
             // for sorted input, check non-resolve path works before checking resolution path
             checkResolverOutput(count, builder.build(), BTree.Dir.ASC);
-            builder.reuse();
+            builder = BTree.builder(Comparator.naturalOrder());
+            builder.setQuickResolver(resolver);
             for (int i = 0 ; i < 10 ; i++)
             {
                 // now do a few runs of randomized inputs
                 for (Accumulator j : resolverInput(count, true))
                     builder.add(j);
                 checkResolverOutput(count, builder.build(), BTree.Dir.ASC);
-                builder.reuse();
+                builder = BTree.builder(Comparator.naturalOrder());
+                builder.setQuickResolver(resolver);
             }
             for (List<Accumulator> add : splitResolverInput(count))
             {
@@ -231,7 +233,6 @@ public class BTreeTest
                     builder.addAll(new TreeSet<>(add));
             }
             checkResolverOutput(count, builder.build(), BTree.Dir.ASC);
-            builder.reuse();
         }
     }
 
@@ -278,7 +279,14 @@ public class BTreeTest
                 builder.add(i);
             // for sorted input, check non-resolve path works before checking resolution path
             Assert.assertTrue(Iterables.elementsEqual(sorted, BTree.iterable(builder.build())));
+
+            builder = BTree.builder(Comparator.naturalOrder());
+            builder.auto(false);
+            for (Accumulator i : sorted)
+                builder.add(i);
+            // check resolution path
             checkResolverOutput(count, builder.resolve(resolver).build(), BTree.Dir.ASC);
+
             builder = BTree.builder(Comparator.naturalOrder());
             builder.auto(false);
             for (int i = 0 ; i < 10 ; i++)
@@ -287,11 +295,13 @@ public class BTreeTest
                 for (Accumulator j : resolverInput(count, true))
                     builder.add(j);
                 checkResolverOutput(count, builder.sort().resolve(resolver).build(), BTree.Dir.ASC);
-                builder.reuse();
+                builder = BTree.builder(Comparator.naturalOrder());
+                builder.auto(false);
                 for (Accumulator j : resolverInput(count, true))
                     builder.add(j);
                 checkResolverOutput(count, builder.sort().reverse().resolve(resolver).build(), BTree.Dir.DESC);
-                builder.reuse();
+                builder = BTree.builder(Comparator.naturalOrder());
+                builder.auto(false);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
index b6b1882..94aac9e 100644
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@ -22,6 +22,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.junit.Test;
+
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 
@@ -50,11 +51,11 @@ public class StreamingHistogramTest
         expected1.put(36.0, 1L);
 
         Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
-        for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet())
+        for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet())
         {
             Map.Entry<Double, Long> entry = expectedItr.next();
-            assertEquals(entry.getKey(), actual.getKey(), 0.01);
-            assertEquals(entry.getValue(), actual.getValue());
+            assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
+            assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
         }
 
         // merge test
@@ -72,11 +73,11 @@ public class StreamingHistogramTest
         expected2.put(32.67, 3L);
         expected2.put(45.0, 1L);
         expectedItr = expected2.entrySet().iterator();
-        for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet())
+        for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet())
         {
             Map.Entry<Double, Long> entry = expectedItr.next();
-            assertEquals(entry.getKey(), actual.getKey(), 0.01);
-            assertEquals(entry.getValue(), actual.getValue());
+            assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
+            assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
         }
 
         // sum test
@@ -112,11 +113,40 @@ public class StreamingHistogramTest
         expected1.put(36.0, 1L);
 
         Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
-        for (Map.Entry<Double, Long> actual : deserialized.getAsMap().entrySet())
+        for (Map.Entry<Number, long[]> actual : deserialized.getAsMap().entrySet())
         {
             Map.Entry<Double, Long> entry = expectedItr.next();
-            assertEquals(entry.getKey(), actual.getKey(), 0.01);
-            assertEquals(entry.getValue(), actual.getValue());
+            assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
+            assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
         }
     }
+
+
+    @Test
+    public void testNumericTypes() throws Exception
+    {
+        StreamingHistogram hist = new StreamingHistogram(5);
+
+        hist.update(2);
+        hist.update(2.0);
+        hist.update(2L);
+
+        Map<Number, long[]> asMap = hist.getAsMap();
+
+        assertEquals(1, asMap.size());
+        assertEquals(3L, asMap.get(2)[0]);
+
+        //Make sure it's working with Serde
+        DataOutputBuffer out = new DataOutputBuffer();
+        StreamingHistogram.serializer.serialize(hist, out);
+        byte[] bytes = out.toByteArray();
+
+        StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes));
+
+        deserialized.update(2L);
+
+        asMap = deserialized.getAsMap();
+        assertEquals(1, asMap.size());
+        assertEquals(4L, asMap.get(2)[0]);
+    }
 }