You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/05/04 21:04:34 UTC
cassandra git commit: Faster Streaming
Repository: cassandra
Updated Branches:
refs/heads/trunk 594183b03 -> 1e92ce43a
Faster Streaming
Patch by tjake; reviewed by Stefania Alborghetti for CASSANDRA-9766
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e92ce43
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e92ce43
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e92ce43
Branch: refs/heads/trunk
Commit: 1e92ce43a5a730f81d3f6cfd72e7f4b126db788a
Parents: 594183b
Author: T Jake Luciani <ja...@apache.org>
Authored: Sat Apr 9 10:03:32 2016 -0400
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed May 4 17:04:12 2016 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 4 +-
.../apache/cassandra/concurrent/SEPWorker.java | 3 +-
.../org/apache/cassandra/db/ColumnIndex.java | 69 +++++---
.../columniterator/SSTableReversedIterator.java | 2 +-
.../db/commitlog/FileDirectSegment.java | 3 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 30 ++--
.../cassandra/db/rows/ComplexColumnData.java | 10 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 40 ++++-
.../io/sstable/format/big/BigTableWriter.java | 50 +++---
.../cassandra/io/util/DataOutputBuffer.java | 38 ++++-
.../io/util/DataOutputBufferFixed.java | 4 +-
.../cassandra/io/util/DataOutputStreamPlus.java | 3 +-
.../cassandra/io/util/SafeMemoryWriter.java | 2 +-
.../cassandra/streaming/ConnectionHandler.java | 4 +-
.../compress/CompressedInputStream.java | 59 +++++--
.../compress/CompressedStreamReader.java | 11 +-
.../cassandra/tools/SSTableMetadataViewer.java | 10 +-
.../org/apache/cassandra/utils/BloomFilter.java | 7 +-
.../apache/cassandra/utils/FilterFactory.java | 6 +-
.../cassandra/utils/StreamingHistogram.java | 84 +++++----
.../org/apache/cassandra/utils/btree/BTree.java | 67 ++++----
.../apache/cassandra/utils/btree/BTreeSet.java | 3 -
.../cassandra/utils/btree/TreeBuilder.java | 30 +++-
.../concurrent/WrappedSharedCloseable.java | 4 +-
.../cassandra/utils/memory/BufferPool.java | 16 +-
.../apache/cassandra/utils/vint/VIntCoding.java | 3 +-
.../cassandra/streaming/LongStreamingTest.java | 171 +++++++++++++++++++
.../apache/cassandra/db/RowIndexEntryTest.java | 9 +-
.../org/apache/cassandra/utils/BTreeTest.java | 20 ++-
.../cassandra/utils/StreamingHistogramTest.java | 48 +++++-
31 files changed, 597 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6fe57b2..c2165db 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.8
+ * Faster streaming (CASSANDRA-9766)
3.6
* Enhanced Compaction Logging (CASSANDRA-10805)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 33c80d5..85edf74 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
* is useful to give Java threads meaningful names which is useful when using
@@ -55,7 +57,7 @@ public class NamedThreadFactory implements ThreadFactory
public Thread newThread(Runnable runnable)
{
String name = id + ":" + n.getAndIncrement();
- Thread thread = new Thread(threadGroup, runnable, name);
+ Thread thread = new FastThreadLocalThread(threadGroup, runnable, name);
thread.setPriority(priority);
thread.setDaemon(true);
if (contextClassLoader != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/concurrent/SEPWorker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index 3b3e7ad..d7c21bc 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.utils.JVMStabilityInspector;
final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnable
@@ -45,7 +46,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
{
this.pool = pool;
this.workerId = workerId;
- thread = new Thread(this, pool.poolName + "-Worker-" + workerId);
+ thread = new FastThreadLocalThread(this, pool.poolName + "-Worker-" + workerId);
thread.setDaemon(true);
set(initialState);
thread.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 4dcceff..2e7a2ee 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -41,13 +41,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public class ColumnIndex
{
-
// used, if the row-index-entry reaches config column_index_cache_size_in_kb
private DataOutputBuffer buffer;
// used to track the size of the serialized size of row-index-entry (unused for buffer)
private int indexSamplesSerializedSize;
// used, until the row-index-entry reaches config column_index_cache_size_in_kb
- public List<IndexInfo> indexSamples = new ArrayList<>();
+ private final List<IndexInfo> indexSamples = new ArrayList<>();
public int columnIndexCount;
private int[] indexOffsets;
@@ -55,11 +54,10 @@ public class ColumnIndex
private final SerializationHeader header;
private final int version;
private final SequentialWriter writer;
- private final long initialPosition;
- private final ISerializer<IndexInfo> idxSerializer;
- public long headerLength = -1;
-
- private long startPosition = -1;
+ private long initialPosition;
+ private final ISerializer<IndexInfo> idxSerializer;
+ public long headerLength;
+ private long startPosition;
private int written;
private long previousRowStart;
@@ -72,17 +70,32 @@ public class ColumnIndex
private final Collection<SSTableFlushObserver> observers;
public ColumnIndex(SerializationHeader header,
- SequentialWriter writer,
- Version version,
- Collection<SSTableFlushObserver> observers,
- ISerializer<IndexInfo> indexInfoSerializer)
+ SequentialWriter writer,
+ Version version,
+ Collection<SSTableFlushObserver> observers,
+ ISerializer<IndexInfo> indexInfoSerializer)
{
this.header = header;
- this.idxSerializer = indexInfoSerializer;
this.writer = writer;
this.version = version.correspondingMessagingVersion();
this.observers = observers;
+ this.idxSerializer = indexInfoSerializer;
+ }
+
+ public void reset()
+ {
this.initialPosition = writer.position();
+ this.headerLength = -1;
+ this.startPosition = -1;
+ this.previousRowStart = 0;
+ this.columnIndexCount = 0;
+ this.written = 0;
+ this.indexSamplesSerializedSize = 0;
+ this.indexSamples.clear();
+ this.firstClustering = null;
+ this.lastClustering = null;
+ this.openMarker = null;
+ this.buffer = null;
}
public void buildRowIndex(UnfilteredRowIterator iterator) throws IOException
@@ -93,7 +106,7 @@ public class ColumnIndex
while (iterator.hasNext())
add(iterator.next());
- close();
+ finish();
}
private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
@@ -120,6 +133,16 @@ public class ColumnIndex
return buffer != null ? buffer.buffer() : null;
}
+ public List<IndexInfo> indexSamples()
+ {
+ if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) <= DatabaseDescriptor.getColumnIndexCacheSize())
+ {
+ return indexSamples;
+ }
+
+ return null;
+ }
+
public int[] offsets()
{
return indexOffsets != null
@@ -136,7 +159,7 @@ public class ColumnIndex
openMarker);
// indexOffsets is used for both shallow (ShallowIndexedEntry) and non-shallow IndexedEntry.
- // For shallow ones, we need it to serialize the offsts in close().
+ // For shallow ones, we need it to serialize the offsts in finish().
// For non-shallow ones, the offsts are passed into IndexedEntry, so we don't have to
// calculate the offsets again.
@@ -149,10 +172,19 @@ public class ColumnIndex
{
if (columnIndexCount >= indexOffsets.length)
indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length + 10);
- indexOffsets[columnIndexCount] =
+
+ //the 0th element is always 0
+ if (columnIndexCount == 0)
+ {
+ indexOffsets[columnIndexCount] = 0;
+ }
+ else
+ {
+ indexOffsets[columnIndexCount] =
buffer != null
- ? Ints.checkedCast(buffer.position())
- : indexSamplesSerializedSize;
+ ? Ints.checkedCast(buffer.position())
+ : indexSamplesSerializedSize;
+ }
}
columnIndexCount++;
@@ -168,7 +200,6 @@ public class ColumnIndex
{
idxSerializer.serialize(indexSample, buffer);
}
- indexSamples = null;
}
else
{
@@ -216,7 +247,7 @@ public class ColumnIndex
addIndexBlock();
}
- private void close() throws IOException
+ private void finish() throws IOException
{
UnfilteredSerializer.serializer.writeEndOfPartition(writer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 7f6b462..6fef70f 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -353,7 +353,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
public void reset()
{
built = null;
- rowBuilder.reuse();
+ rowBuilder = BTree.builder(metadata.comparator);
deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
index ec4aa91..50f9efd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/FileDirectSegment.java
@@ -23,6 +23,7 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.BufferType;
@@ -34,7 +35,7 @@ import org.apache.cassandra.io.util.FileUtils;
*/
public abstract class FileDirectSegment extends CommitLogSegment
{
- protected static final ThreadLocal<ByteBuffer> reusableBufferHolder = new ThreadLocal<ByteBuffer>()
+ protected static final FastThreadLocal<ByteBuffer> reusableBufferHolder = new FastThreadLocal<ByteBuffer>()
{
protected ByteBuffer initialValue()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 47cfd58..63aa157 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -330,9 +330,10 @@ public class BTreeRow extends AbstractRow
if (cd.column().isSimple())
return false;
- if (!((ComplexColumnData)cd).complexDeletion().isLive())
+ if (!((ComplexColumnData) cd).complexDeletion().isLive())
return true;
}
+
return false;
}
@@ -620,7 +621,7 @@ public class BTreeRow extends AbstractRow
protected Deletion deletion = Deletion.LIVE;
private final boolean isSorted;
- private final BTree.Builder<Cell> cells;
+ private BTree.Builder<Cell> cells_;
private final CellResolver resolver;
private boolean hasComplex = false;
@@ -633,10 +634,19 @@ public class BTreeRow extends AbstractRow
protected Builder(boolean isSorted, int nowInSecs)
{
- this.cells = BTree.builder(ColumnData.comparator);
+ cells_ = null;
resolver = new CellResolver(nowInSecs);
this.isSorted = isSorted;
- this.cells.auto(false);
+ }
+
+ private BTree.Builder<Cell> getCells()
+ {
+ if (cells_ == null)
+ {
+ cells_ = BTree.builder(ColumnData.comparator);
+ cells_.auto(false);
+ }
+ return cells_;
}
public boolean isSorted()
@@ -660,7 +670,7 @@ public class BTreeRow extends AbstractRow
this.clustering = null;
this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
this.deletion = Deletion.LIVE;
- this.cells.reuse();
+ this.cells_ = null;
}
public void addPrimaryKeyLivenessInfo(LivenessInfo info)
@@ -676,25 +686,25 @@ public class BTreeRow extends AbstractRow
public void addCell(Cell cell)
{
assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering;
- cells.add(cell);
+ getCells().add(cell);
hasComplex |= cell.column.isComplex();
}
public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion)
{
- cells.add(new ComplexColumnDeletion(column, complexDeletion));
+ getCells().add(new ComplexColumnDeletion(column, complexDeletion));
hasComplex = true;
}
public Row build()
{
if (!isSorted)
- cells.sort();
+ getCells().sort();
// we can avoid resolving if we're sorted and have no complex values
// (because we'll only have unique simple cells, which are already in their final condition)
if (!isSorted | hasComplex)
- cells.resolve(resolver);
- Object[] btree = cells.build();
+ getCells().resolve(resolver);
+ Object[] btree = getCells().build();
if (deletion.isShadowedBy(primaryKeyLivenessInfo))
deletion = Deletion.LIVE;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index f4678b7..154fc77 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -19,23 +19,22 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
-import java.util.*;
+import java.util.Iterator;
+import java.util.Objects;
import java.util.function.BiFunction;
import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.UpdateFunction;
/**
* The data for a complex column, that is it's cells and potential complex
@@ -240,8 +239,7 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
{
this.column = column;
this.complexDeletion = DeletionTime.LIVE; // default if writeComplexDeletion is not called
- if (builder == null) builder = BTree.builder(column.cellComparator());
- else builder.reuse(column.cellComparator());
+ this.builder = BTree.builder(column.cellComparator());
}
public void addComplexDeletion(DeletionTime complexDeletion)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 8157bbb..bcc65aa 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -21,9 +21,11 @@ import java.io.IOException;
import com.google.common.collect.Collections2;
+import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
/**
@@ -178,11 +180,37 @@ public class UnfilteredSerializer
if (header.isForSSTable())
{
- out.writeUnsignedVInt(serializedRowBodySize(row, header, previousUnfilteredSize, version));
- // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler).
- // This is currently not used however and using it is tbd.
- out.writeUnsignedVInt(previousUnfilteredSize);
+ DataOutputBuffer dob = DataOutputBuffer.RECYCLER.get();
+ try
+ {
+ serializeRowBody(row, flags, header, dob);
+
+ out.writeUnsignedVInt(dob.position() + TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize));
+ // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler).
+ // This is currently not used however and using it is tbd.
+ out.writeUnsignedVInt(previousUnfilteredSize);
+ out.write(dob.buffer());
+ }
+ finally
+ {
+ dob.recycle();
+ }
}
+ else
+ {
+ serializeRowBody(row, flags, header, out);
+ }
+ }
+
+ @Inline
+ private void serializeRowBody(Row row, int flags, SerializationHeader header, DataOutputPlus out)
+ throws IOException
+ {
+ boolean isStatic = row.isStatic();
+
+ Columns headerColumns = header.columns(isStatic);
+ LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
+ Row.Deletion deletion = row.deletion();
if ((flags & HAS_TIMESTAMP) != 0)
header.writeTimestamp(pkLiveness.timestamp(), out);
@@ -194,7 +222,7 @@ public class UnfilteredSerializer
if ((flags & HAS_DELETION) != 0)
header.writeDeletionTime(deletion.time(), out);
- if (!hasAllColumns)
+ if ((flags & HAS_ALL_COLUMNS) == 0)
Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
for (ColumnData data : row)
@@ -202,7 +230,7 @@ public class UnfilteredSerializer
if (data.column.isSimple())
Cell.serializer.serialize((Cell) data, out, pkLiveness, header);
else
- writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out);
+ writeComplexColumn((ComplexColumnData) data, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index b6ea1d9..44b1c3a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -17,50 +17,46 @@
*/
package org.apache.cassandra.io.sstable.format.big;
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.transform.Transformation;
-import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.ChunkCache;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.FilterFactory;
-import org.apache.cassandra.utils.IFilter;
+import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.Transactional;
-import org.apache.cassandra.utils.SyncUtil;
-
public class BigTableWriter extends SSTableWriter
{
private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
+ private final ColumnIndex columnIndexWriter;
private final IndexWriter iwriter;
private final SegmentedFile.Builder dbuilder;
protected final SequentialWriter dataFile;
private DecoratedKey lastWrittenKey;
-
private DataPosition dataMark;
private long lastEarlyOpenLength = 0;
@@ -90,6 +86,8 @@ public class BigTableWriter extends SSTableWriter
dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
}
iwriter = new IndexWriter(keyCount, dataFile);
+
+ columnIndexWriter = new ColumnIndex(header, dataFile, descriptor.version, observers, getRowIndexEntrySerializer().indexInfoSerializer());
}
public void mark()
@@ -153,29 +151,31 @@ public class BigTableWriter extends SSTableWriter
long startPosition = beforeAppend(key);
observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
+ //Reuse the writer for each row
+ columnIndexWriter.reset();
+
try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector)))
{
- ColumnIndex columnIndex = new ColumnIndex(header, dataFile, descriptor.version, observers,
- getRowIndexEntrySerializer().indexInfoSerializer());
-
- columnIndex.buildRowIndex(collecting);
+ columnIndexWriter.buildRowIndex(collecting);
// afterAppend() writes the partition key before the first RowIndexEntry - so we have to add it's
// serialized size to the index-writer position
long indexFilePosition = ByteBufferUtil.serializedSizeWithShortLength(key.getKey()) + iwriter.indexFile.position();
RowIndexEntry entry = RowIndexEntry.create(startPosition, indexFilePosition,
- collecting.partitionLevelDeletion(), columnIndex.headerLength, columnIndex.columnIndexCount,
- columnIndex.indexInfoSerializedSize(),
- columnIndex.indexSamples,
- columnIndex.offsets(),
+ collecting.partitionLevelDeletion(),
+ columnIndexWriter.headerLength,
+ columnIndexWriter.columnIndexCount,
+ columnIndexWriter.indexInfoSerializedSize(),
+ columnIndexWriter.indexSamples(),
+ columnIndexWriter.offsets(),
getRowIndexEntrySerializer().indexInfoSerializer());
long endPosition = dataFile.position();
long rowSize = endPosition - startPosition;
maybeLogLargePartitionWarning(key, rowSize);
metadataCollector.addPartitionSizeInBytes(rowSize);
- afterAppend(key, endPosition, entry, columnIndex.buffer());
+ afterAppend(key, endPosition, entry, columnIndexWriter.buffer());
return entry;
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 91242b8..8dbad8c 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -21,11 +21,12 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-import org.apache.cassandra.config.Config;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import io.netty.util.Recycler;
+import org.apache.cassandra.config.Config;
+
/**
* An implementation of the DataOutputStream interface using a FastByteArrayOutputStream and exposing
* its buffer so copies can be avoided.
@@ -39,6 +40,21 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
*/
private static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64);
+ public static final Recycler<DataOutputBuffer> RECYCLER = new Recycler<DataOutputBuffer>()
+ {
+ protected DataOutputBuffer newObject(Handle handle)
+ {
+ return new DataOutputBuffer(handle);
+ }
+ };
+
+ private final Recycler.Handle handle;
+
+ private DataOutputBuffer(Recycler.Handle handle)
+ {
+ this(128, handle);
+ }
+
public DataOutputBuffer()
{
this(128);
@@ -46,12 +62,26 @@ public class DataOutputBuffer extends BufferedDataOutputStreamPlus
public DataOutputBuffer(int size)
{
- super(ByteBuffer.allocate(size));
+ this(size, null);
+ }
+
+ protected DataOutputBuffer(int size, Recycler.Handle handle)
+ {
+ this(ByteBuffer.allocate(size), handle);
}
- protected DataOutputBuffer(ByteBuffer buffer)
+ protected DataOutputBuffer(ByteBuffer buffer, Recycler.Handle handle)
{
super(buffer);
+ this.handle = handle;
+ }
+
+ public void recycle()
+ {
+ assert handle != null;
+ buffer.rewind();
+
+ RECYCLER.recycle(this, handle);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
index c2cc549..5193401 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBufferFixed.java
@@ -38,12 +38,12 @@ public class DataOutputBufferFixed extends DataOutputBuffer
public DataOutputBufferFixed(int size)
{
- super(ByteBuffer.allocate(size));
+ super(size, null);
}
public DataOutputBufferFixed(ByteBuffer buffer)
{
- super(buffer);
+ super(buffer, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
index a846384..755783b 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputStreamPlus.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -64,7 +65,7 @@ public abstract class DataOutputStreamPlus extends OutputStream implements DataO
return bytes;
}
- private static final ThreadLocal<byte[]> tempBuffer = new ThreadLocal<byte[]>()
+ private static final FastThreadLocal<byte[]> tempBuffer = new FastThreadLocal<byte[]>()
{
@Override
public byte[] initialValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 24eb93c..88912f9 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -33,7 +33,7 @@ public class SafeMemoryWriter extends DataOutputBuffer
private SafeMemoryWriter(SafeMemory memory)
{
- super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN));
+ super(tailBuffer(memory).order(ByteOrder.BIG_ENDIAN), null);
this.memory = memory;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 960b531..3600d5e 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -206,7 +208,7 @@ public class ConnectionHandler
this.socket = socket;
this.protocolVersion = protocolVersion;
- new Thread(this, name() + "-" + socket.getRemoteSocketAddress()).start();
+ new FastThreadLocalThread(this, name() + "-" + socket.getRemoteSocketAddress()).start();
}
public ListenableFuture<?> close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index c7d5f98..3aaa1a3 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -25,14 +25,13 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
-import java.util.zip.Checksum;
import com.google.common.collect.Iterators;
import com.google.common.primitives.Ints;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -82,31 +81,61 @@ public class CompressedInputStream extends InputStream
this.crcCheckChanceSupplier = crcCheckChanceSupplier;
this.checksumType = checksumType;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ new FastThreadLocalThread(new Reader(source, info, dataBuffer)).start();
}
- public int read() throws IOException
+ private void decompressNextChunk() throws IOException
{
- if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+ try
{
- try
- {
- byte[] compressedWithCRC = dataBuffer.take();
- if (compressedWithCRC == POISON_PILL)
- throw new EOFException("No chunk available");
- decompress(compressedWithCRC);
- }
- catch (InterruptedException e)
- {
+ byte[] compressedWithCRC = dataBuffer.take();
+ if (compressedWithCRC == POISON_PILL)
throw new EOFException("No chunk available");
- }
+ decompress(compressedWithCRC);
}
+ catch (InterruptedException e)
+ {
+ throw new EOFException("No chunk available");
+ }
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+ decompressNextChunk();
assert current >= bufferOffset && current < bufferOffset + validBufferBytes;
return ((int) buffer[(int) (current++ - bufferOffset)]) & 0xff;
}
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ long nextCurrent = current + len;
+
+ if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
+ decompressNextChunk();
+
+ assert nextCurrent >= bufferOffset;
+
+ int read = 0;
+ while (read < len)
+ {
+ int nextLen = Math.min((len - read), (int)((bufferOffset + validBufferBytes) - current));
+
+ System.arraycopy(buffer, (int)(current - bufferOffset), b, off + read, nextLen);
+ read += nextLen;
+
+ current += nextLen;
+ if (read != len)
+ decompressNextChunk();
+ }
+
+ return len;
+ }
+
public void position(long position)
{
assert position >= current : "stream can only read forward.";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 5aa393e..8dafa9c 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -17,16 +17,11 @@
*/
package org.apache.cassandra.streaming.compress;
-import java.io.DataInputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import com.google.common.base.Throwables;
-
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +29,12 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.util.TrackedInputStream;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.io.util.TrackedInputStream;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -92,6 +88,7 @@ public class CompressedStreamReader extends StreamReader
try
{
writer = createWriter(cfs, totalSize, repairedAt, format);
+ String filename = writer.getFilename();
int sectionIdx = 0;
for (Pair<Long, Long> section : sections)
{
@@ -107,7 +104,7 @@ public class CompressedStreamReader extends StreamReader
{
writePartition(deserializer, writer);
// when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+ session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
}
}
logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
index d2e0513..de43c2f 100644
--- a/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
+++ b/src/java/org/apache/cassandra/tools/SSTableMetadataViewer.java
@@ -17,11 +17,7 @@
*/
package org.apache.cassandra.tools;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.PrintStream;
+import java.io.*;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.EnumSet;
@@ -121,9 +117,9 @@ public class SSTableMetadataViewer
out.printf("totalRows: %s%n", stats.totalRows);
out.println("Estimated tombstone drop times:");
- for (Map.Entry<Double, Long> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
+ for (Map.Entry<Number, long[]> entry : stats.estimatedTombstoneDropTime.getAsMap().entrySet())
{
- out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue());
+ out.printf("%-10s:%10s%n",entry.getKey().intValue(), entry.getValue()[0]);
}
printHistograms(stats, out);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/BloomFilter.java b/src/java/org/apache/cassandra/utils/BloomFilter.java
index ce6c638..4ff07b7 100644
--- a/src/java/org/apache/cassandra/utils/BloomFilter.java
+++ b/src/java/org/apache/cassandra/utils/BloomFilter.java
@@ -19,13 +19,15 @@ package org.apache.cassandra.utils;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.concurrent.FastThreadLocal;
+import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
import org.apache.cassandra.utils.obs.IBitSet;
public class BloomFilter extends WrappedSharedCloseable implements IFilter
{
- private static final ThreadLocal<long[]> reusableIndexes = new ThreadLocal<long[]>()
+ private final static FastThreadLocal<long[]> reusableIndexes = new FastThreadLocal<long[]>()
{
protected long[] initialValue()
{
@@ -84,16 +86,19 @@ public class BloomFilter extends WrappedSharedCloseable implements IFilter
// to avoid generating a lot of garbage since stack allocation currently does not support stores
// (CASSANDRA-6609). it returns the array so that the caller does not need to perform
// a second threadlocal lookup.
+ @Inline
private long[] indexes(FilterKey key)
{
// we use the same array both for storing the hash result, and for storing the indexes we return,
// so that we do not need to allocate two arrays.
long[] indexes = reusableIndexes.get();
+
key.filterHash(indexes);
setIndexes(indexes[1], indexes[0], hashCount, bitset.capacity(), indexes);
return indexes;
}
+ @Inline
private void setIndexes(long base, long inc, int count, long max, long[] results)
{
if (oldBfHashOrder)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/FilterFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FilterFactory.java b/src/java/org/apache/cassandra/utils/FilterFactory.java
index 869f3fa..298e734 100644
--- a/src/java/org/apache/cassandra/utils/FilterFactory.java
+++ b/src/java/org/apache/cassandra/utils/FilterFactory.java
@@ -20,14 +20,14 @@ package org.apache.cassandra.utils;
import java.io.DataInput;
import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.obs.IBitSet;
import org.apache.cassandra.utils.obs.OffHeapBitSet;
import org.apache.cassandra.utils.obs.OpenBitSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class FilterFactory
{
public static final IFilter AlwaysPresent = new AlwaysPresentFilter();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/StreamingHistogram.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StreamingHistogram.java b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
index b925395..7ea97e5 100644
--- a/src/java/org/apache/cassandra/utils/StreamingHistogram.java
+++ b/src/java/org/apache/cassandra/utils/StreamingHistogram.java
@@ -39,7 +39,10 @@ public class StreamingHistogram
public static final StreamingHistogramSerializer serializer = new StreamingHistogramSerializer();
// TreeMap to hold bins of histogram.
- private final TreeMap<Double, Long> bin;
+ // The key is a numeric type so we can avoid boxing/unboxing streams of different key types
+ // The value is a unboxed long array always of length == 1
+ // Serialized Histograms always writes with double keys for backwards compatibility
+ private final TreeMap<Number, long[]> bin;
// maximum bin size for this histogram
private final int maxBinSize;
@@ -51,22 +54,28 @@ public class StreamingHistogram
public StreamingHistogram(int maxBinSize)
{
this.maxBinSize = maxBinSize;
- bin = new TreeMap<>();
+ bin = new TreeMap<>((o1, o2) -> {
+ if (o1.getClass().equals(o2.getClass()))
+ return ((Comparable)o1).compareTo(o2);
+ else
+ return ((Double)o1.doubleValue()).compareTo(o2.doubleValue());
+ });
}
private StreamingHistogram(int maxBinSize, Map<Double, Long> bin)
{
- this.maxBinSize = maxBinSize;
- this.bin = new TreeMap<>(bin);
+ this(maxBinSize);
+ for (Map.Entry<Double, Long> entry : bin.entrySet())
+ this.bin.put(entry.getKey(), new long[]{entry.getValue()});
}
/**
* Adds new point p to this histogram.
* @param p
*/
- public void update(double p)
+ public void update(Number p)
{
- update(p, 1);
+ update(p, 1L);
}
/**
@@ -74,30 +83,31 @@ public class StreamingHistogram
* @param p
* @param m
*/
- public void update(double p, long m)
+ public void update(Number p, long m)
{
- Long mi = bin.get(p);
+ long[] mi = bin.get(p);
if (mi != null)
{
// we found the same p so increment that counter
- bin.put(p, mi + m);
+ mi[0] += m;
}
else
{
- bin.put(p, m);
+ mi = new long[]{m};
+ bin.put(p, mi);
// if bin size exceeds maximum bin size then trim down to max size
while (bin.size() > maxBinSize)
{
// find points p1, p2 which have smallest difference
- Iterator<Double> keys = bin.keySet().iterator();
- double p1 = keys.next();
- double p2 = keys.next();
+ Iterator<Number> keys = bin.keySet().iterator();
+ double p1 = keys.next().doubleValue();
+ double p2 = keys.next().doubleValue();
double smallestDiff = p2 - p1;
double q1 = p1, q2 = p2;
while (keys.hasNext())
{
p1 = p2;
- p2 = keys.next();
+ p2 = keys.next().doubleValue();
double diff = p2 - p1;
if (diff < smallestDiff)
{
@@ -107,9 +117,13 @@ public class StreamingHistogram
}
}
// merge those two
- long k1 = bin.remove(q1);
- long k2 = bin.remove(q2);
- bin.put((q1 * k1 + q2 * k2) / (k1 + k2), k1 + k2);
+ long[] a1 = bin.remove(q1);
+ long[] a2 = bin.remove(q2);
+ long k1 = a1[0];
+ long k2 = a2[0];
+
+ a1[0] += k2;
+ bin.put((q1 * k1 + q2 * k2) / (k1 + k2), a1);
}
}
}
@@ -124,8 +138,8 @@ public class StreamingHistogram
if (other == null)
return;
- for (Map.Entry<Double, Long> entry : other.getAsMap().entrySet())
- update(entry.getKey(), entry.getValue());
+ for (Map.Entry<Number, long[]> entry : other.getAsMap().entrySet())
+ update(entry.getKey(), entry.getValue()[0]);
}
/**
@@ -138,32 +152,32 @@ public class StreamingHistogram
{
double sum = 0;
// find the points pi, pnext which satisfy pi <= b < pnext
- Map.Entry<Double, Long> pnext = bin.higherEntry(b);
+ Map.Entry<Number, long[]> pnext = bin.higherEntry(b);
if (pnext == null)
{
// if b is greater than any key in this histogram,
// just count all appearance and return
- for (Long value : bin.values())
- sum += value;
+ for (long[] value : bin.values())
+ sum += value[0];
}
else
{
- Map.Entry<Double, Long> pi = bin.floorEntry(b);
+ Map.Entry<Number, long[]> pi = bin.floorEntry(b);
if (pi == null)
return 0;
// calculate estimated count mb for point b
- double weight = (b - pi.getKey()) / (pnext.getKey() - pi.getKey());
- double mb = pi.getValue() + (pnext.getValue() - pi.getValue()) * weight;
- sum += (pi.getValue() + mb) * weight / 2;
+ double weight = (b - pi.getKey().doubleValue()) / (pnext.getKey().doubleValue() - pi.getKey().doubleValue());
+ double mb = pi.getValue()[0] + (pnext.getValue()[0] - pi.getValue()[0]) * weight;
+ sum += (pi.getValue()[0] + mb) * weight / 2;
- sum += pi.getValue() / 2.0;
- for (Long value : bin.headMap(pi.getKey(), false).values())
- sum += value;
+ sum += pi.getValue()[0] / 2.0;
+ for (long[] value : bin.headMap(pi.getKey(), false).values())
+ sum += value[0];
}
return sum;
}
- public Map<Double, Long> getAsMap()
+ public Map<Number, long[]> getAsMap()
{
return Collections.unmodifiableMap(bin);
}
@@ -173,12 +187,12 @@ public class StreamingHistogram
public void serialize(StreamingHistogram histogram, DataOutputPlus out) throws IOException
{
out.writeInt(histogram.maxBinSize);
- Map<Double, Long> entries = histogram.getAsMap();
+ Map<Number, long[]> entries = histogram.getAsMap();
out.writeInt(entries.size());
- for (Map.Entry<Double, Long> entry : entries.entrySet())
+ for (Map.Entry<Number, long[]> entry : entries.entrySet())
{
- out.writeDouble(entry.getKey());
- out.writeLong(entry.getValue());
+ out.writeDouble(entry.getKey().doubleValue());
+ out.writeLong(entry.getValue()[0]);
}
}
@@ -198,7 +212,7 @@ public class StreamingHistogram
public long serializedSize(StreamingHistogram histogram)
{
long size = TypeSizes.sizeof(histogram.maxBinSize);
- Map<Double, Long> entries = histogram.getAsMap();
+ Map<Number, long[]> entries = histogram.getAsMap();
size += TypeSizes.sizeof(entries.size());
// size of entries = size * (8(double) + 8(long))
size += entries.size() * (8L + 8L);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 1c3d2e2..4f21d26 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
+import io.netty.util.Recycler;
import org.apache.cassandra.utils.ObjectSizes;
import static com.google.common.collect.Iterables.concat;
@@ -139,12 +140,9 @@ public class BTree
return values;
}
- Queue<TreeBuilder> queue = modifier.get();
- TreeBuilder builder = queue.poll();
- if (builder == null)
- builder = new TreeBuilder();
+ TreeBuilder builder = TreeBuilder.newInstance();
Object[] btree = builder.build(source, updateF, size);
- queue.add(builder);
+
return btree;
}
@@ -176,12 +174,9 @@ public class BTree
if (isEmpty(btree))
return build(updateWith, updateWithLength, updateF);
- Queue<TreeBuilder> queue = modifier.get();
- TreeBuilder builder = queue.poll();
- if (builder == null)
- builder = new TreeBuilder();
+
+ TreeBuilder builder = TreeBuilder.newInstance();
btree = builder.update(btree, comparator, updateWith, updateF);
- queue.add(builder);
return btree;
}
@@ -203,12 +198,12 @@ public class BTree
public static <V> Iterator<V> iterator(Object[] btree, Dir dir)
{
- return new BTreeSearchIterator<V, V>(btree, null, dir);
+ return new BTreeSearchIterator<>(btree, null, dir);
}
public static <V> Iterator<V> iterator(Object[] btree, int lb, int ub, Dir dir)
{
- return new BTreeSearchIterator<V, V>(btree, null, dir, lb, ub);
+ return new BTreeSearchIterator<>(btree, null, dir, lb, ub);
}
public static <V> Iterable<V> iterable(Object[] btree)
@@ -771,28 +766,29 @@ public class BTree
return 1 + lookupSizeMap(root, childIndex - 1);
}
- private static final ThreadLocal<Queue<TreeBuilder>> modifier = new ThreadLocal<Queue<TreeBuilder>>()
+ final static Recycler<Builder> builderRecycler = new Recycler<Builder>()
{
- @Override
- protected Queue<TreeBuilder> initialValue()
+ protected Builder newObject(Handle handle)
{
- return new ArrayDeque<>();
+ return new Builder(handle);
}
};
public static <V> Builder<V> builder(Comparator<? super V> comparator)
{
- return new Builder<>(comparator);
+ Builder<V> builder = builderRecycler.get();
+ builder.reuse(comparator);
+
+ return builder;
}
public static <V> Builder<V> builder(Comparator<? super V> comparator, int initialCapacity)
{
- return new Builder<>(comparator);
+ return builder(comparator);
}
public static class Builder<V>
{
-
// a user-defined bulk resolution, to be applied manually via resolve()
public static interface Resolver
{
@@ -817,16 +813,13 @@ public class BTree
boolean detected = true; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added
boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates
QuickResolver<V> quickResolver;
+ final Recycler.Handle recycleHandle;
- protected Builder(Comparator<? super V> comparator)
- {
- this(comparator, 16);
- }
- protected Builder(Comparator<? super V> comparator, int initialCapacity)
+ private Builder(Recycler.Handle handle)
{
- this.comparator = comparator;
- this.values = new Object[initialCapacity];
+ this.recycleHandle = handle;
+ this.values = new Object[16];
}
public Builder<V> setQuickResolver(QuickResolver<V> quickResolver)
@@ -835,16 +828,19 @@ public class BTree
return this;
}
- public void reuse()
+ public void recycle()
{
- reuse(comparator);
+ if (recycleHandle != null)
+ builderRecycler.recycle(this, recycleHandle);
}
- public void reuse(Comparator<? super V> comparator)
+ private void reuse(Comparator<? super V> comparator)
{
this.comparator = comparator;
+ quickResolver = null;
count = 0;
detected = true;
+ auto = true;
}
public Builder<V> auto(boolean auto)
@@ -1071,9 +1067,16 @@ public class BTree
public Object[] build()
{
- if (auto)
- autoEnforce();
- return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
+ try
+ {
+ if (auto)
+ autoEnforce();
+ return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp());
+ }
+ finally
+ {
+ this.recycle();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
index 9517009..a59e481 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
@@ -21,14 +21,11 @@ package org.apache.cassandra.utils.btree;
import java.util.*;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.cassandra.utils.btree.BTree.Dir;
import static org.apache.cassandra.utils.btree.BTree.findIndex;
-import static org.apache.cassandra.utils.btree.BTree.lower;
-import static org.apache.cassandra.utils.btree.BTree.toArray;
public class BTreeSet<V> implements NavigableSet<V>, List<V>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
index 024902e..f42de0f 100644
--- a/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
+++ b/src/java/org/apache/cassandra/utils/btree/TreeBuilder.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.utils.btree;
import java.util.Comparator;
+import io.netty.util.Recycler;
+
import static org.apache.cassandra.utils.btree.BTree.EMPTY_LEAF;
import static org.apache.cassandra.utils.btree.BTree.FAN_SHIFT;
import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY;
@@ -28,12 +30,32 @@ import static org.apache.cassandra.utils.btree.BTree.POSITIVE_INFINITY;
* A class for constructing a new BTree, either from an existing one and some set of modifications
* or a new tree from a sorted collection of items.
* <p/>
- * This is a fairly heavy-weight object, so a ThreadLocal instance is created for making modifications to a tree
+ * This is a fairly heavy-weight object, so a Recycled instance is created for making modifications to a tree
*/
final class TreeBuilder
{
+
+ private final static Recycler<TreeBuilder> builderRecycler = new Recycler<TreeBuilder>()
+ {
+ protected TreeBuilder newObject(Handle handle)
+ {
+ return new TreeBuilder(handle);
+ }
+ };
+
+ public static TreeBuilder newInstance()
+ {
+ return builderRecycler.get();
+ }
+
+ private final Recycler.Handle recycleHandle;
private final NodeBuilder rootBuilder = new NodeBuilder();
+ private TreeBuilder(Recycler.Handle handle)
+ {
+ this.recycleHandle = handle;
+ }
+
/**
* At the highest level, we adhere to the classic b-tree insertion algorithm:
*
@@ -93,6 +115,9 @@ final class TreeBuilder
Object[] r = current.toNode();
current.clear();
+
+ builderRecycler.recycle(this, recycleHandle);
+
return r;
}
@@ -114,6 +139,9 @@ final class TreeBuilder
Object[] r = current.toNode();
current.clear();
+
+ builderRecycler.recycle(this, recycleHandle);
+
return r;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
index 0eefae3..31894b1 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.utils.concurrent;
import java.util.Arrays;
-import org.apache.cassandra.utils.Throwables;
-
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -35,7 +33,7 @@ public abstract class WrappedSharedCloseable extends SharedCloseableImpl
public WrappedSharedCloseable(final AutoCloseable closeable)
{
- this(new AutoCloseable[] { closeable});
+ this(new AutoCloseable[] {closeable});
}
public WrappedSharedCloseable(final AutoCloseable[] closeable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index ad2404f..68f8911 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -21,23 +21,23 @@ package org.apache.cassandra.utils.memory;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.compress.BufferType;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.NoSpamLogger;
-
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.BufferPoolMetrics;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.Ref;
/**
@@ -68,7 +68,7 @@ public class BufferPool
private static final GlobalPool globalPool = new GlobalPool();
/** A thread local pool of chunks, where chunks come from the global pool */
- private static final ThreadLocal<LocalPool> localPool = new ThreadLocal<LocalPool>() {
+ private static final FastThreadLocal<LocalPool> localPool = new FastThreadLocal<LocalPool>() {
@Override
protected LocalPool initialValue()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index daf5006..3872424 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -50,6 +50,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import io.netty.util.concurrent.FastThreadLocal;
import net.nicoulaj.compilecommand.annotations.Inline;
/**
@@ -103,7 +104,7 @@ public class VIntCoding
return Integer.numberOfLeadingZeros(~firstByte) - 24;
}
- protected static final ThreadLocal<byte[]> encodingBuffer = new ThreadLocal<byte[]>()
+ protected static final FastThreadLocal<byte[]> encodingBuffer = new FastThreadLocal<byte[]>()
{
@Override
public byte[] initialValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
new file mode 100644
index 0000000..7e53ba2
--- /dev/null
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.OutputHandler;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongStreamingTest
+{
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ SchemaLoader.cleanupAndLeaveDirs();
+ Keyspace.setInitialized();
+ StorageService.instance.initServer();
+
+ StorageService.instance.setCompactionThroughputMbPerSec(0);
+ StorageService.instance.setStreamThroughputMbPerSec(0);
+ StorageService.instance.setInterDCStreamThroughputMbPerSec(0);
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ Config.setClientMode(false);
+ }
+
+ @Test
+ public void testCompressedStream() throws InvalidRequestException, IOException, ExecutionException, InterruptedException
+ {
+ String KS = "cql_keyspace";
+ String TABLE = "table1";
+
+ File tempdir = Files.createTempDir();
+ File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE);
+ assert dataDir.mkdirs();
+
+ String schema = "CREATE TABLE cql_keyspace.table1 ("
+ + " k int PRIMARY KEY,"
+ + " v1 text,"
+ + " v2 int"
+ + ");";// with compression = {};";
+ String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES (?, ?, ?)";
+ CQLSSTableWriter writer = CQLSSTableWriter.builder()
+ .sorted()
+ .inDirectory(dataDir)
+ .forTable(schema)
+ .using(insert).build();
+ long start = System.nanoTime();
+
+ for (int i = 0; i < 10_000_000; i++)
+ writer.addRow(i, "test1", 24);
+
+ writer.close();
+ System.err.println(String.format("Writer finished after %d seconds....", TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start)));
+
+ File[] dataFiles = dataDir.listFiles((dir, name) -> name.endsWith("-Data.db"));
+ long dataSize = 0l;
+ for (File file : dataFiles)
+ {
+ System.err.println("File : "+file.getAbsolutePath());
+ dataSize += file.length();
+ }
+
+ SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+ {
+ private String ks;
+ public void init(String keyspace)
+ {
+ for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
+ addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+
+ this.ks = keyspace;
+ }
+
+ public CFMetaData getTableMetadata(String cfName)
+ {
+ return Schema.instance.getCFMetaData(ks, cfName);
+ }
+ }, new OutputHandler.SystemOutput(false, false));
+
+ start = System.nanoTime();
+ loader.stream().get();
+
+ long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec",
+ millis/1000d,
+ (dataSize / (1 << 20) / (millis / 1000d)) * 8));
+
+
+ //Stream again
+ loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
+ {
+ private String ks;
+ public void init(String keyspace)
+ {
+ for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
+ addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+
+ this.ks = keyspace;
+ }
+
+ public CFMetaData getTableMetadata(String cfName)
+ {
+ return Schema.instance.getCFMetaData(ks, cfName);
+ }
+ }, new OutputHandler.SystemOutput(false, false));
+
+ start = System.nanoTime();
+ loader.stream().get();
+
+ millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ System.err.println(String.format("Finished Streaming in %.2f seconds: %.2f Mb/sec",
+ millis/1000d,
+ (dataSize / (1 << 20) / (millis / 1000d)) * 8));
+
+
+ //Compact them both
+ start = System.nanoTime();
+ Keyspace.open(KS).getColumnFamilyStore(TABLE).forceMajorCompaction();
+ millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+
+ System.err.println(String.format("Finished Compacting in %.2f seconds: %.2f Mb/sec",
+ millis / 1000d,
+ (dataSize * 2 / (1 << 20) / (millis / 1000d)) * 8));
+
+ UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table1 limit 100;");
+ assertEquals(100, rs.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index ebacf34..244018e 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -28,6 +28,8 @@ import java.util.Iterator;
import java.util.List;
import com.google.common.primitives.Ints;
+import org.junit.Assert;
+import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.cache.IMeasurableMemory;
@@ -37,12 +39,12 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.columniterator.AbstractSSTableIterator;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.BufferCell;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
@@ -67,9 +69,6 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
-import org.junit.Assert;
-import org.junit.Test;
-
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
@@ -191,7 +190,7 @@ public class RowIndexEntryTest extends CQLTester
rieNew = RowIndexEntry.create(startPosition, 0L,
deletionInfo, columnIndex.headerLength, columnIndex.columnIndexCount,
columnIndex.indexInfoSerializedSize(),
- columnIndex.indexSamples, columnIndex.offsets(),
+ columnIndex.indexSamples(), columnIndex.offsets(),
rieSerializer.indexInfoSerializer());
rieSerializer.serialize(rieNew, rieOutput, columnIndex.buffer());
rieNewSerialized = rieOutput.buffer().duplicate();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/utils/BTreeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/BTreeTest.java b/test/unit/org/apache/cassandra/utils/BTreeTest.java
index ffd7315..a01ad2e 100644
--- a/test/unit/org/apache/cassandra/utils/BTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/BTreeTest.java
@@ -214,14 +214,16 @@ public class BTreeTest
builder.add(i);
// for sorted input, check non-resolve path works before checking resolution path
checkResolverOutput(count, builder.build(), BTree.Dir.ASC);
- builder.reuse();
+ builder = BTree.builder(Comparator.naturalOrder());
+ builder.setQuickResolver(resolver);
for (int i = 0 ; i < 10 ; i++)
{
// now do a few runs of randomized inputs
for (Accumulator j : resolverInput(count, true))
builder.add(j);
checkResolverOutput(count, builder.build(), BTree.Dir.ASC);
- builder.reuse();
+ builder = BTree.builder(Comparator.naturalOrder());
+ builder.setQuickResolver(resolver);
}
for (List<Accumulator> add : splitResolverInput(count))
{
@@ -231,7 +233,6 @@ public class BTreeTest
builder.addAll(new TreeSet<>(add));
}
checkResolverOutput(count, builder.build(), BTree.Dir.ASC);
- builder.reuse();
}
}
@@ -278,7 +279,14 @@ public class BTreeTest
builder.add(i);
// for sorted input, check non-resolve path works before checking resolution path
Assert.assertTrue(Iterables.elementsEqual(sorted, BTree.iterable(builder.build())));
+
+ builder = BTree.builder(Comparator.naturalOrder());
+ builder.auto(false);
+ for (Accumulator i : sorted)
+ builder.add(i);
+ // check resolution path
checkResolverOutput(count, builder.resolve(resolver).build(), BTree.Dir.ASC);
+
builder = BTree.builder(Comparator.naturalOrder());
builder.auto(false);
for (int i = 0 ; i < 10 ; i++)
@@ -287,11 +295,13 @@ public class BTreeTest
for (Accumulator j : resolverInput(count, true))
builder.add(j);
checkResolverOutput(count, builder.sort().resolve(resolver).build(), BTree.Dir.ASC);
- builder.reuse();
+ builder = BTree.builder(Comparator.naturalOrder());
+ builder.auto(false);
for (Accumulator j : resolverInput(count, true))
builder.add(j);
checkResolverOutput(count, builder.sort().reverse().resolve(resolver).build(), BTree.Dir.DESC);
- builder.reuse();
+ builder = BTree.builder(Comparator.naturalOrder());
+ builder.auto(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e92ce43/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
index b6b1882..94aac9e 100644
--- a/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/StreamingHistogramTest.java
@@ -22,6 +22,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import org.junit.Test;
+
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -50,11 +51,11 @@ public class StreamingHistogramTest
expected1.put(36.0, 1L);
Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
- for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet())
+ for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet())
{
Map.Entry<Double, Long> entry = expectedItr.next();
- assertEquals(entry.getKey(), actual.getKey(), 0.01);
- assertEquals(entry.getValue(), actual.getValue());
+ assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
+ assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
}
// merge test
@@ -72,11 +73,11 @@ public class StreamingHistogramTest
expected2.put(32.67, 3L);
expected2.put(45.0, 1L);
expectedItr = expected2.entrySet().iterator();
- for (Map.Entry<Double, Long> actual : hist.getAsMap().entrySet())
+ for (Map.Entry<Number, long[]> actual : hist.getAsMap().entrySet())
{
Map.Entry<Double, Long> entry = expectedItr.next();
- assertEquals(entry.getKey(), actual.getKey(), 0.01);
- assertEquals(entry.getValue(), actual.getValue());
+ assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
+ assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
}
// sum test
@@ -112,11 +113,40 @@ public class StreamingHistogramTest
expected1.put(36.0, 1L);
Iterator<Map.Entry<Double, Long>> expectedItr = expected1.entrySet().iterator();
- for (Map.Entry<Double, Long> actual : deserialized.getAsMap().entrySet())
+ for (Map.Entry<Number, long[]> actual : deserialized.getAsMap().entrySet())
{
Map.Entry<Double, Long> entry = expectedItr.next();
- assertEquals(entry.getKey(), actual.getKey(), 0.01);
- assertEquals(entry.getValue(), actual.getValue());
+ assertEquals(entry.getKey(), actual.getKey().doubleValue(), 0.01);
+ assertEquals(entry.getValue().longValue(), actual.getValue()[0]);
}
}
+
+
+ @Test
+ public void testNumericTypes() throws Exception
+ {
+ StreamingHistogram hist = new StreamingHistogram(5);
+
+ hist.update(2);
+ hist.update(2.0);
+ hist.update(2L);
+
+ Map<Number, long[]> asMap = hist.getAsMap();
+
+ assertEquals(1, asMap.size());
+ assertEquals(3L, asMap.get(2)[0]);
+
+ //Make sure it's working with Serde
+ DataOutputBuffer out = new DataOutputBuffer();
+ StreamingHistogram.serializer.serialize(hist, out);
+ byte[] bytes = out.toByteArray();
+
+ StreamingHistogram deserialized = StreamingHistogram.serializer.deserialize(new DataInputBuffer(bytes));
+
+ deserialized.update(2L);
+
+ asMap = deserialized.getAsMap();
+ assertEquals(1, asMap.size());
+ assertEquals(4L, asMap.get(2)[0]);
+ }
}