You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/29 14:05:09 UTC
[7/7] git commit: Push more of memtable data off-heap
Push more of memtable data off-heap
patch by benedict & xedin; reviewed by benedict, iamaleksey & xedin for CASSANDRA-6694
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8541cca7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8541cca7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8541cca7
Branch: refs/heads/cassandra-2.1
Commit: 8541cca718fc324c2545831fc945247a4aeb3437
Parents: d402cf6
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Mon Apr 14 14:16:08 2014 -0700
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Apr 29 13:58:20 2014 +0200
----------------------------------------------------------------------
conf/cassandra.yaml | 1 +
.../org/apache/cassandra/cache/RowCacheKey.java | 2 +-
.../org/apache/cassandra/config/CFMetaData.java | 5 +-
.../org/apache/cassandra/config/Config.java | 3 +-
.../cassandra/config/DatabaseDescriptor.java | 7 +-
.../org/apache/cassandra/config/Schema.java | 2 +-
.../apache/cassandra/cql/QueryProcessor.java | 12 +-
.../apache/cassandra/cql3/ColumnIdentifier.java | 2 +-
.../apache/cassandra/cql3/UpdateParameters.java | 4 +-
.../cql3/statements/ModificationStatement.java | 7 +-
.../cql3/statements/SelectStatement.java | 8 +-
.../org/apache/cassandra/db/AbstractCell.java | 265 ++++++++
.../apache/cassandra/db/AbstractNativeCell.java | 647 +++++++++++++++++++
.../cassandra/db/ArrayBackedSortedColumns.java | 6 +-
.../apache/cassandra/db/AtomicBTreeColumns.java | 204 +++---
.../org/apache/cassandra/db/BufferCell.java | 103 +++
.../apache/cassandra/db/BufferCounterCell.java | 181 ++++++
.../cassandra/db/BufferCounterUpdateCell.java | 93 +++
.../apache/cassandra/db/BufferDecoratedKey.java | 39 ++
.../apache/cassandra/db/BufferDeletedCell.java | 123 ++++
.../apache/cassandra/db/BufferExpiringCell.java | 170 +++++
.../org/apache/cassandra/db/CFRowAdder.java | 6 +-
src/java/org/apache/cassandra/db/Cell.java | 245 +------
.../cassandra/db/CollationController.java | 6 +-
.../org/apache/cassandra/db/ColumnFamily.java | 16 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 29 +-
.../apache/cassandra/db/ColumnSerializer.java | 10 +-
.../org/apache/cassandra/db/CounterCell.java | 217 +------
.../apache/cassandra/db/CounterMutation.java | 8 +-
.../apache/cassandra/db/CounterUpdateCell.java | 65 +-
src/java/org/apache/cassandra/db/DataRange.java | 2 +-
.../org/apache/cassandra/db/DecoratedKey.java | 35 +-
.../org/apache/cassandra/db/DefsTables.java | 4 +-
.../org/apache/cassandra/db/DeletedCell.java | 100 +--
.../org/apache/cassandra/db/ExpiringCell.java | 159 +----
.../cassandra/db/HintedHandOffManager.java | 4 +-
src/java/org/apache/cassandra/db/Keyspace.java | 6 +-
src/java/org/apache/cassandra/db/Memtable.java | 47 +-
src/java/org/apache/cassandra/db/Mutation.java | 2 +-
.../org/apache/cassandra/db/NativeCell.java | 88 +++
.../apache/cassandra/db/NativeCounterCell.java | 190 ++++++
.../apache/cassandra/db/NativeDecoratedKey.java | 45 ++
.../apache/cassandra/db/NativeDeletedCell.java | 125 ++++
.../apache/cassandra/db/NativeExpiringCell.java | 173 +++++
src/java/org/apache/cassandra/db/Row.java | 4 +-
.../apache/cassandra/db/RowIteratorFactory.java | 4 +-
.../org/apache/cassandra/db/RowPosition.java | 24 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 6 +-
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../db/compaction/CompactionController.java | 2 +-
.../db/compaction/CompactionManager.java | 12 +-
.../db/compaction/LazilyCompactedRow.java | 4 +-
.../db/compaction/LeveledManifest.java | 12 +-
.../cassandra/db/compaction/Scrubber.java | 6 +-
.../db/composites/AbstractCellNameType.java | 17 +-
.../db/composites/AbstractComposite.java | 1 -
.../db/composites/BoundedComposite.java | 12 +-
.../cassandra/db/composites/CellName.java | 6 +-
.../cassandra/db/composites/CellNameType.java | 3 +-
.../cassandra/db/composites/CellNames.java | 16 +
.../cassandra/db/composites/Composite.java | 5 +-
.../cassandra/db/composites/Composites.java | 10 +-
.../db/composites/CompoundComposite.java | 12 +-
.../db/composites/CompoundDenseCellName.java | 5 +-
.../composites/CompoundDenseCellNameType.java | 3 +-
.../db/composites/CompoundSparseCellName.java | 15 +-
.../composites/CompoundSparseCellNameType.java | 15 +-
.../db/composites/SimpleComposite.java | 10 +-
.../db/composites/SimpleDenseCellName.java | 5 +-
.../db/composites/SimpleDenseCellNameType.java | 3 +-
.../db/composites/SimpleSparseCellName.java | 11 +-
.../db/composites/SimpleSparseCellNameType.java | 5 +-
.../SimpleSparseInternedCellName.java | 11 +-
.../apache/cassandra/db/filter/ColumnSlice.java | 14 +-
.../cassandra/db/filter/ExtendedFilter.java | 6 +-
.../AbstractSimplePerColumnSecondaryIndex.java | 23 +-
.../cassandra/db/index/SecondaryIndex.java | 10 +-
.../db/index/SecondaryIndexManager.java | 16 +-
.../CompositesIndexOnClusteringKey.java | 2 +-
.../CompositesIndexOnCollectionKey.java | 2 +-
.../CompositesIndexOnCollectionValue.java | 2 +-
.../composites/CompositesIndexOnRegular.java | 2 +-
.../db/index/composites/CompositesSearcher.java | 8 +-
.../cassandra/db/index/keys/KeysSearcher.java | 12 +-
.../db/marshal/LocalByPartionerType.java | 2 +-
.../apache/cassandra/dht/AbstractBounds.java | 3 +-
.../dht/AbstractByteOrderedPartitioner.java | 3 +-
.../apache/cassandra/dht/LocalPartitioner.java | 3 +-
.../cassandra/dht/Murmur3Partitioner.java | 3 +-
.../dht/OrderPreservingPartitioner.java | 3 +-
.../apache/cassandra/dht/RandomPartitioner.java | 3 +-
src/java/org/apache/cassandra/dht/Token.java | 7 +-
.../hadoop/ColumnFamilyRecordReader.java | 5 +-
.../apache/cassandra/hadoop/pig/CqlStorage.java | 3 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 10 +-
.../cassandra/io/sstable/CQLSSTableWriter.java | 2 +-
.../io/sstable/IndexSummaryBuilder.java | 9 +-
.../apache/cassandra/io/sstable/SSTable.java | 5 +-
.../cassandra/io/sstable/SSTableReader.java | 18 +-
.../cassandra/io/sstable/SSTableScanner.java | 4 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 4 +-
.../cassandra/io/sstable/SSTableWriter.java | 12 +-
.../repair/RepairMessageVerbHandler.java | 2 +-
.../org/apache/cassandra/repair/Validator.java | 10 +-
.../cassandra/service/ActiveRepairService.java | 2 +-
.../cassandra/service/RowDataResolver.java | 2 +-
.../cassandra/service/StorageService.java | 2 +-
.../service/pager/RangeNamesQueryPager.java | 2 +-
.../service/pager/RangeSliceQueryPager.java | 2 +-
.../cassandra/thrift/CassandraServer.java | 12 +-
.../cassandra/thrift/ThriftValidation.java | 6 +-
.../apache/cassandra/tools/SSTableExport.java | 6 +-
.../apache/cassandra/tools/SSTableImport.java | 4 +-
.../org/apache/cassandra/tracing/Tracing.java | 6 +-
.../org/apache/cassandra/utils/FBUtilities.java | 35 +-
.../apache/cassandra/utils/PureJavaCrc32.java | 33 +-
.../utils/memory/ContextAllocator.java | 25 +-
.../apache/cassandra/utils/memory/HeapPool.java | 58 +-
.../cassandra/utils/memory/MemoryUtil.java | 305 +++++++++
.../utils/memory/MemtableAllocator.java | 245 +++++++
.../utils/memory/MemtableBufferAllocator.java | 70 ++
.../utils/memory/MemtableCleanerThread.java | 73 +++
.../cassandra/utils/memory/MemtablePool.java | 199 ++++++
.../cassandra/utils/memory/NativeAllocator.java | 258 ++++++++
.../cassandra/utils/memory/NativePool.java | 39 ++
.../org/apache/cassandra/utils/memory/Pool.java | 199 ------
.../cassandra/utils/memory/PoolAllocator.java | 225 -------
.../utils/memory/PoolCleanerThread.java | 73 ---
.../cassandra/utils/memory/SlabAllocator.java | 13 +-
.../apache/cassandra/utils/memory/SlabPool.java | 7 +-
test/conf/cassandra.yaml | 2 +-
.../cassandra/db/LongFlushMemtableTest.java | 2 +-
.../apache/cassandra/db/LongKeyspaceTest.java | 2 +-
.../db/compaction/LongCompactionsTest.java | 2 +-
.../LongLeveledCompactionStrategyTest.java | 2 +-
test/unit/org/apache/cassandra/Util.java | 8 +-
.../org/apache/cassandra/config/DefsTest.java | 17 +-
.../db/ArrayBackedSortedColumnsTest.java | 20 +-
.../cassandra/db/CollationControllerTest.java | 12 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 86 +--
.../apache/cassandra/db/CounterCacheTest.java | 4 +-
.../apache/cassandra/db/CounterCellTest.java | 80 +--
.../org/apache/cassandra/db/KeyCacheTest.java | 4 +-
.../apache/cassandra/db/KeyCollisionTest.java | 10 +-
.../org/apache/cassandra/db/KeyspaceTest.java | 38 +-
.../org/apache/cassandra/db/MultitableTest.java | 4 +-
.../org/apache/cassandra/db/NativeCellTest.java | 171 +++++
.../apache/cassandra/db/RangeTombstoneTest.java | 9 +-
.../apache/cassandra/db/ReadMessageTest.java | 10 +-
.../cassandra/db/RecoveryManager3Test.java | 4 +-
.../cassandra/db/RecoveryManagerTest.java | 10 +-
.../org/apache/cassandra/db/RemoveCellTest.java | 8 +-
.../cassandra/db/RemoveColumnFamilyTest.java | 4 +-
.../db/RemoveColumnFamilyWithFlush1Test.java | 4 +-
.../db/RemoveColumnFamilyWithFlush2Test.java | 4 +-
.../apache/cassandra/db/RemoveSubCellTest.java | 10 +-
.../org/apache/cassandra/db/RowCacheTest.java | 2 +-
.../apache/cassandra/db/RowIterationTest.java | 8 +-
test/unit/org/apache/cassandra/db/RowTest.java | 2 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 4 +-
.../db/SecondaryIndexCellSizeTest.java | 15 +-
.../apache/cassandra/db/SerializationsTest.java | 31 +-
.../org/apache/cassandra/db/TimeSortTest.java | 8 +-
.../db/compaction/AntiCompactionTest.java | 6 +-
.../compaction/BlacklistingCompactionsTest.java | 2 +-
.../db/compaction/CompactionsPurgeTest.java | 34 +-
.../db/compaction/CompactionsTest.java | 18 +-
.../LeveledCompactionStrategyTest.java | 8 +-
.../db/compaction/OneCompactionTest.java | 2 +-
.../SizeTieredCompactionStrategyTest.java | 4 +-
.../cassandra/db/compaction/TTLExpiryTest.java | 16 +-
.../db/context/CounterContextTest.java | 13 -
.../db/index/PerRowSecondaryIndexTest.java | 8 +-
.../io/sstable/IndexSummaryManagerTest.java | 6 +-
.../cassandra/io/sstable/IndexSummaryTest.java | 10 +-
.../cassandra/io/sstable/SSTableLoaderTest.java | 2 +-
.../io/sstable/SSTableMetadataTest.java | 30 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 13 +-
.../io/sstable/SSTableScannerTest.java | 8 +-
.../cassandra/io/sstable/SSTableUtils.java | 2 +-
.../apache/cassandra/repair/ValidatorTest.java | 3 +-
.../cassandra/service/QueryPagerTest.java | 4 +-
.../service/pager/AbstractQueryPagerTest.java | 2 +-
.../streaming/StreamingTransferTest.java | 20 +-
.../cassandra/tools/SSTableExportTest.java | 6 +-
.../cassandra/tools/SSTableImportTest.java | 6 +-
.../cassandra/triggers/TriggerExecutorTest.java | 14 +-
.../apache/cassandra/triggers/TriggersTest.java | 8 +-
188 files changed, 4529 insertions(+), 2129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2176bf9..6e47918 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -283,6 +283,7 @@ memtable_cleanup_threshold: 0.4
# Options are:
# heap_buffers: on heap nio buffers
# offheap_buffers: off heap (direct) nio buffers
+# offheap_objects: native memory, eliminating nio buffer heap overhead
memtable_allocation_type: heap_buffers
# Total space to use for commitlogs. Since commitlog segments are
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index bbd8591..aebb129 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -37,7 +37,7 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
public RowCacheKey(UUID cfId, DecoratedKey key)
{
- this(cfId, key.key);
+ this(cfId, key.getKey());
}
public RowCacheKey(UUID cfId, ByteBuffer key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b8c1fae..b4b3fbe 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -55,6 +55,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.CFStatement;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.AbstractCell;
import org.apache.cassandra.db.AtomDeserializer;
import org.apache.cassandra.db.CFRowAdder;
import org.apache.cassandra.db.Cell;
@@ -1328,7 +1329,7 @@ public final class CFMetaData
*/
public ColumnDefinition getColumnDefinition(CellName cellName)
{
- ColumnIdentifier id = cellName.cql3ColumnName();
+ ColumnIdentifier id = cellName.cql3ColumnName(this);
ColumnDefinition def = id == null
? getColumnDefinition(cellName.toByteBuffer()) // Means a dense layout, try the full column name
: getColumnDefinition(id);
@@ -1409,7 +1410,7 @@ public final class CFMetaData
public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
{
- return Cell.onDiskIterator(in, flag, expireBefore, version, comparator);
+ return AbstractCell.onDiskIterator(in, flag, expireBefore, version, comparator);
}
public AtomDeserializer getOnDiskDeserializer(DataInput in, Descriptor.Version version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 7ab7a8c..97ceb7f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -290,7 +290,8 @@ public class Config
{
unslabbed_heap_buffers,
heap_buffers,
- offheap_buffers
+ offheap_buffers,
+ offheap_objects
}
public static enum DiskFailurePolicy
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4b0043c..cdcceb1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -66,7 +66,8 @@ import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.memory.HeapPool;
-import org.apache.cassandra.utils.memory.Pool;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.MemtablePool;
import org.apache.cassandra.utils.memory.SlabPool;
public class DatabaseDescriptor
@@ -1476,7 +1477,7 @@ public class DatabaseDescriptor
return conf.inter_dc_tcp_nodelay;
}
- public static Pool getMemtableAllocatorPool()
+ public static MemtablePool getMemtableAllocatorPool()
{
long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
@@ -1493,6 +1494,8 @@ public class DatabaseDescriptor
System.exit(-1);
}
return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+ case offheap_objects:
+ return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
default:
throw new AssertionError();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index b1e0f2f..106ad9b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -416,7 +416,7 @@ public class Schema
{
try
{
- return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
+ return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.getKey()));
}
catch (CharacterCodingException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index baf89b2..3b35555 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -171,7 +171,7 @@ public class QueryProcessor
? select.getKeyFinish().getByteBuffer(keyType,variables)
: null;
- RowPosition startKey = RowPosition.forKey(startKeyBytes, p), finishKey = RowPosition.forKey(finishKeyBytes, p);
+ RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p), finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p))
{
if (p instanceof RandomPartitioner)
@@ -213,7 +213,7 @@ public class QueryProcessor
// if start key was set and relation was "greater than"
if (select.getKeyStart() != null && !select.includeStartKey() && !rows.isEmpty())
{
- if (rows.get(0).key.key.equals(startKeyBytes))
+ if (rows.get(0).key.getKey().equals(startKeyBytes))
rows.remove(0);
}
@@ -221,7 +221,7 @@ public class QueryProcessor
if (select.getKeyFinish() != null && !select.includeFinishKey() && !rows.isEmpty())
{
int lastIndex = rows.size() - 1;
- if (rows.get(lastIndex).key.key.equals(finishKeyBytes))
+ if (rows.get(lastIndex).key.getKey().equals(finishKeyBytes))
rows.remove(lastIndex);
}
@@ -459,7 +459,7 @@ public class QueryProcessor
{
// prepend key
ByteBuffer keyName = ByteBufferUtil.bytes(metadata.getCQL2KeyName());
- thriftColumns.add(new Column(keyName).setValue(row.key.key).setTimestamp(-1));
+ thriftColumns.add(new Column(keyName).setValue(row.key.getKey()).setTimestamp(-1));
result.schema.name_types.put(keyName, TypeParser.getShortName(AsciiType.instance));
result.schema.value_types.put(keyName, TypeParser.getShortName(metadata.getKeyValidator()));
}
@@ -491,7 +491,7 @@ public class QueryProcessor
{
// preserve case of key as it was requested
ByteBuffer requestedKey = ByteBufferUtil.bytes(term.getText());
- thriftColumns.add(new Column(requestedKey).setValue(row.key.key).setTimestamp(-1));
+ thriftColumns.add(new Column(requestedKey).setValue(row.key.getKey()).setTimestamp(-1));
result.schema.name_types.put(requestedKey, TypeParser.getShortName(AsciiType.instance));
result.schema.value_types.put(requestedKey, TypeParser.getShortName(metadata.getKeyValidator()));
continue;
@@ -524,7 +524,7 @@ public class QueryProcessor
// Create a new row, add the columns to it, and then add it to the list of rows
CqlRow cqlRow = new CqlRow();
- cqlRow.key = row.key.key;
+ cqlRow.key = row.key.getKey();
cqlRow.columns = thriftColumns;
if (select.isColumnsReversed())
Collections.reverse(cqlRow.columns);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index 3bf3acc..1b232cf 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -50,7 +50,7 @@ public class ColumnIdentifier implements Selectable, IMeasurableMemory
this.text = type.getString(bytes);
}
- private ColumnIdentifier(ByteBuffer bytes, String text)
+ public ColumnIdentifier(ByteBuffer bytes, String text)
{
this.bytes = bytes;
this.text = text;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 87ce22e..fad8fae 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -55,13 +55,13 @@ public class UpdateParameters
public Cell makeColumn(CellName name, ByteBuffer value) throws InvalidRequestException
{
QueryProcessor.validateCellName(name, metadata.comparator);
- return Cell.create(name, value, timestamp, ttl, metadata);
+ return AbstractCell.create(name, value, timestamp, ttl, metadata);
}
public Cell makeTombstone(CellName name) throws InvalidRequestException
{
QueryProcessor.validateCellName(name, metadata.comparator);
- return new DeletedCell(name, localDeletionTime, timestamp);
+ return new BufferDeletedCell(name, localDeletionTime, timestamp);
}
public RangeTombstone makeRangeTombstone(ColumnSlice slice) throws InvalidRequestException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index acc4802..4741b9a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -32,12 +32,9 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.CBuilder;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.CASConditions;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
@@ -449,10 +446,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
if (row.cf == null || row.cf.isEmpty())
continue;
- Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(now).group(row.cf.getSortedColumns().iterator());
+ Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(row.cf.getSortedColumns().iterator());
if (iter.hasNext())
{
- map.put(row.key.key, iter.next());
+ map.put(row.key.getKey(), iter.next());
// We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
assert !iter.hasNext();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index f28d570..d79bd5b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -400,8 +400,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
- RowPosition startKey = RowPosition.forKey(startKeyBytes, p);
- RowPosition finishKey = RowPosition.forKey(finishKeyBytes, p);
+ RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
+ RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p))
return null;
@@ -1007,7 +1007,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (row.cf == null)
continue;
- processColumnFamily(row.key.key, row.cf, variables, now, result);
+ processColumnFamily(row.key.getKey(), row.cf, variables, now, result);
}
ResultSet cqlRows = result.build();
@@ -1042,7 +1042,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (sliceRestriction != null)
cells = applySliceRestriction(cells, variables);
- CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(now).group(cells);
+ CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
// If there is static columns but there is no non-static row, then provided the select was a full
// partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractCell.java b/src/java/org/apache/cassandra/db/AbstractCell.java
new file mode 100644
index 0000000..1075278
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractCell.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Iterator;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractCell implements Cell
+{
+ public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
+ final ColumnSerializer.Flag flag,
+ final int expireBefore,
+ final Descriptor.Version version,
+ final CellNameType type)
+ {
+ return new AbstractIterator<OnDiskAtom>()
+ {
+ protected OnDiskAtom computeNext()
+ {
+ OnDiskAtom atom;
+ try
+ {
+ atom = type.onDiskAtomSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ if (atom == null)
+ return endOfData();
+
+ return atom;
+ }
+ };
+ }
+
+ @Override
+ public boolean isMarkedForDelete(long now)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isLive(long now)
+ {
+ return !isMarkedForDelete(now);
+ }
+
+ // Don't call unless the column is actually marked for delete.
+ @Override
+ public long getMarkedForDeleteAt()
+ {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public int cellDataSize()
+ {
+ return name().dataSize() + value().remaining() + TypeSizes.NATIVE.sizeof(timestamp());
+ }
+
+ @Override
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
+ {
+ /*
+ * Size of a column is =
+ * size of a name (short + length of the string)
+ * + 1 byte to indicate if the column has been deleted
+ * + 8 bytes for timestamp
+ * + 4 bytes which basically indicates the size of the byte array
+ * + entire byte array.
+ */
+ int valueSize = value().remaining();
+ return ((int)type.cellSerializer().serializedSize(name(), typeSizes)) + 1 + typeSizes.sizeof(timestamp()) + typeSizes.sizeof(valueSize) + valueSize;
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return 0;
+ }
+
+ @Override
+ public Cell diff(Cell cell)
+ {
+ if (timestamp() < cell.timestamp())
+ return cell;
+ return null;
+ }
+
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ digest.update(name().toByteBuffer().duplicate());
+ digest.update(value().duplicate());
+
+ FBUtilities.updateWithLong(digest, timestamp());
+ FBUtilities.updateWithByte(digest, serializationFlags());
+ }
+
+ @Override
+ public int getLocalDeletionTime()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public Cell reconcile(Cell cell)
+ {
+ // tombstones take precedence. (if both are tombstones, then it doesn't matter which one we use.)
+ if (isMarkedForDelete(System.currentTimeMillis()))
+ return timestamp() < cell.timestamp() ? cell : this;
+ if (cell.isMarkedForDelete(System.currentTimeMillis()))
+ return timestamp() > cell.timestamp() ? this : cell;
+ // break ties by comparing values.
+ if (timestamp() == cell.timestamp())
+ return value().compareTo(cell.value()) < 0 ? cell : this;
+ // neither is tombstoned and timestamps are different
+ return timestamp() < cell.timestamp() ? cell : this;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return this == o || (o instanceof Cell && equals((Cell) o));
+ }
+
+ public boolean equals(Cell cell)
+ {
+ return timestamp() == cell.timestamp() && name().equals(cell.name()) && value().equals(cell.value());
+ }
+
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ return String.format("%s:%b:%d@%d",
+ comparator.getString(name()),
+ isMarkedForDelete(System.currentTimeMillis()),
+ value().remaining(),
+ timestamp());
+ }
+
+ @Override
+ public void validateName(CFMetaData metadata) throws MarshalException
+ {
+ metadata.comparator.validate(name());
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ validateName(metadata);
+
+ AbstractType<?> valueValidator = metadata.getValueValidator(name());
+ if (valueValidator != null)
+ valueValidator.validate(value());
+ }
+
+ public static Cell create(CellName name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
+ {
+ if (ttl <= 0)
+ ttl = metadata.getDefaultTimeToLive();
+
+ return ttl > 0
+ ? new BufferExpiringCell(name, value, timestamp, ttl)
+ : new BufferCell(name, value, timestamp);
+ }
+
+ public static Cell diff(CounterCell a, Cell b)
+ {
+ if (a.timestamp() < b.timestamp())
+ return b;
+
+ // Note that if at that point, cell can't be a tombstone. Indeed,
+ // cell is the result of merging us with other nodes results, and
+ // merging a CounterCell with a tombstone never return a tombstone
+ // unless that tombstone timestamp is greater that the CounterCell
+ // one.
+ assert b instanceof CounterCell : "Wrong class type: " + b.getClass();
+
+ if (a.timestampOfLastDelete() < ((CounterCell) b).timestampOfLastDelete())
+ return b;
+
+ CounterContext.Relationship rel = CounterCell.contextManager.diff(b.value(), a.value());
+ return (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT) ? b : null;
+ }
+
+ /** This is temporary until we start creating Cells of the different type (buffer vs. native) */
+ public static Cell reconcile(CounterCell a, Cell b)
+ {
+ assert (b instanceof CounterCell) || (b instanceof DeletedCell) : "Wrong class type: " + b.getClass();
+
+ // live + tombstone: track last tombstone
+ if (b.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
+ {
+ // live < tombstone
+ if (a.timestamp() < b.timestamp())
+ return b;
+
+ // live last delete >= tombstone
+ if (a.timestampOfLastDelete() >= b.timestamp())
+ return a;
+
+ // live last delete < tombstone
+ return new BufferCounterCell(a.name(), a.value(), a.timestamp(), b.timestamp());
+ }
+
+ assert b instanceof CounterCell : "Wrong class type: " + b.getClass();
+
+ // live < live last delete
+ if (a.timestamp() < ((CounterCell) b).timestampOfLastDelete())
+ return b;
+
+ // live last delete > live
+ if (a.timestampOfLastDelete() > b.timestamp())
+ return a;
+
+ // live + live. return one of the cells if its context is a superset of the other's, or merge them otherwise
+ ByteBuffer context = CounterCell.contextManager.merge(a.value(), b.value());
+ if (context == a.value() && a.timestamp() >= b.timestamp() && a.timestampOfLastDelete() >= ((CounterCell) b).timestampOfLastDelete())
+ return a;
+ else if (context == b.value() && b.timestamp() >= a.timestamp() && ((CounterCell) b).timestampOfLastDelete() >= a.timestampOfLastDelete())
+ return b;
+ else // merge clocks and timsestamps.
+ return new BufferCounterCell(a.name(),
+ context,
+ Math.max(a.timestamp(), b.timestamp()),
+ Math.max(a.timestampOfLastDelete(), ((CounterCell) b).timestampOfLastDelete()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/AbstractNativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractNativeCell.java b/src/java/org/apache/cassandra/db/AbstractNativeCell.java
new file mode 100644
index 0000000..d21171f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractNativeCell.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.*;
+
+
+/**
+ * Packs a CellName AND a Cell into one off-heap representation.
+ * Layout is:
+ *
+ * Note we store the ColumnIdentifier in full as bytes. This seems an okay tradeoff for now, as we just
+ * look it back up again when we need to, and in the near future we hope to switch to ints, longs or
+ * UUIDs representing column identifiers on disk, at which point we can switch that here as well.
+ *
+ * [timestamp][value offset][name size]][name extra][name offset deltas][cell names][value][Descendants]
+ * [ 8b ][ 4b ][ 2b ][ 1b ][ each 2b ][ arb < 64k][ arb ][ arbitrary ]
+ *
+ * descendants: any overriding classes will put their state here
+ * name offsets are deltas from their base offset, and don't include the first offset, or the end position of the final entry,
+ * i.e. there will be size - 1 entries, and each is a delta that is added to the offset of the position of the first name
+ * (which is always CELL_NAME_OFFSETS_OFFSET + (2 * (size - 1))). The length of the final name fills up any remaining
+ * space upto the value offset
+ * name extra: lowest 2 bits indicate the clustering size delta (i.e. how many name items are NOT part of the clustering key)
+ * the next 2 bits indicate the CellNameType
+ * the next bit indicates if the column is a static or clustered/dynamic column
+ */
+public abstract class AbstractNativeCell extends AbstractCell implements CellName
+{
+ static final int TIMESTAMP_OFFSET = 4;
+ private static final int VALUE_OFFSET_OFFSET = 12;
+ private static final int CELL_NAME_SIZE_OFFSET = 16;
+ private static final int CELL_NAME_EXTRA_OFFSET = 18;
+ private static final int CELL_NAME_OFFSETS_OFFSET = 19;
+ private static final int CELL_NAME_SIZE_DELTA_MASK = 3;
+ private static final int CELL_NAME_TYPE_SHIFT = 2;
+ private static final int CELL_NAME_TYPE_MASK = 7;
+
+ private static enum NameType
+ {
+ COMPOUND_DENSE(0 << 2), COMPOUND_SPARSE(1 << 2), COMPOUND_SPARSE_STATIC(2 << 2), SIMPLE_DENSE(3 << 2), SIMPLE_SPARSE(4 << 2);
+ static final NameType[] TYPES = NameType.values();
+ final int bits;
+
+ NameType(int bits)
+ {
+ this.bits = bits;
+ }
+
+ static NameType typeOf(CellName name)
+ {
+ if (name instanceof CompoundDenseCellName)
+ {
+ assert !name.isStatic();
+ return COMPOUND_DENSE;
+ }
+
+ if (name instanceof CompoundSparseCellName)
+ return name.isStatic() ? COMPOUND_SPARSE_STATIC : COMPOUND_SPARSE;
+
+ if (name instanceof SimpleDenseCellName)
+ {
+ assert !name.isStatic();
+ return SIMPLE_DENSE;
+ }
+
+ if (name instanceof SimpleSparseCellName)
+ {
+ assert !name.isStatic();
+ return SIMPLE_SPARSE;
+ }
+
+ if (name instanceof NativeCell)
+ return ((NativeCell) name).nametype();
+
+ throw new AssertionError();
+ }
+ }
+
+ private final long peer; // peer is assigned by peer updater in setPeer method
+
+ AbstractNativeCell()
+ {
+ peer = -1;
+ }
+
+ public AbstractNativeCell(NativeAllocator allocator, OpOrder.Group writeOp, Cell copyOf)
+ {
+ int size = sizeOf(copyOf);
+ peer = allocator.allocate(size, writeOp);
+
+ MemoryUtil.setInt(peer, size);
+ construct(copyOf);
+ }
+
+ protected int sizeOf(Cell cell)
+ {
+ int size = CELL_NAME_OFFSETS_OFFSET + Math.max(0, cell.name().size() - 1) * 2 + cell.value().remaining();
+ CellName name = cell.name();
+ for (int i = 0; i < name.size(); i++)
+ size += name.get(i).remaining();
+ return size;
+ }
+
+ protected void construct(Cell from)
+ {
+ setLong(TIMESTAMP_OFFSET, from.timestamp());
+ CellName name = from.name();
+ int nameSize = name.size();
+ int offset = CELL_NAME_SIZE_OFFSET;
+ setShort(offset, (short) nameSize);
+ assert nameSize - name.clusteringSize() <= 2;
+ byte cellNameExtraBits = (byte) ((nameSize - name.clusteringSize()) | NameType.typeOf(name).bits);
+ setByte(offset += 2, cellNameExtraBits);
+ offset += 1;
+ short cellNameDelta = 0;
+ for (int i = 1; i < nameSize; i++)
+ {
+ cellNameDelta += name.get(i - 1).remaining();
+ setShort(offset, cellNameDelta);
+ offset += 2;
+ }
+ for (int i = 0; i < nameSize; i++)
+ {
+ ByteBuffer bb = name.get(i);
+ setBytes(offset, bb);
+ offset += bb.remaining();
+ }
+ setInt(VALUE_OFFSET_OFFSET, offset);
+ setBytes(offset, from.value());
+ }
+
+ // the offset at which to read the short that gives the names
+ private int nameDeltaOffset(int i)
+ {
+ return CELL_NAME_OFFSETS_OFFSET + ((i - 1) * 2);
+ }
+
+ int valueStartOffset()
+ {
+ return getInt(VALUE_OFFSET_OFFSET);
+ }
+
+ private int valueEndOffset()
+ {
+ return (int) (internalSize() - postfixSize());
+ }
+
+ protected int postfixSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public ByteBuffer value()
+ {
+ long offset = valueStartOffset();
+ return getByteBuffer(offset, (int) (internalSize() - (postfixSize() + offset)));
+ }
+
+ private int clusteringSizeDelta()
+ {
+ return getByte(CELL_NAME_EXTRA_OFFSET) & CELL_NAME_SIZE_DELTA_MASK;
+ }
+
+ public boolean isStatic()
+ {
+ return nametype() == NameType.COMPOUND_SPARSE_STATIC;
+ }
+
+ NameType nametype()
+ {
+ return NameType.TYPES[(((int) this.getByte(CELL_NAME_EXTRA_OFFSET)) >> CELL_NAME_TYPE_SHIFT) & CELL_NAME_TYPE_MASK];
+ }
+
+ public long minTimestamp()
+ {
+ return timestamp();
+ }
+
+ public long maxTimestamp()
+ {
+ return timestamp();
+ }
+
+ public int clusteringSize()
+ {
+ return size() - clusteringSizeDelta();
+ }
+
+ @Override
+ public ColumnIdentifier cql3ColumnName(CFMetaData metadata)
+ {
+ switch (nametype())
+ {
+ case SIMPLE_SPARSE:
+ return getIdentifier(metadata, get(clusteringSize()));
+ case COMPOUND_SPARSE_STATIC:
+ case COMPOUND_SPARSE:
+ ByteBuffer buffer = get(clusteringSize());
+ if (buffer.remaining() == 0)
+ return CompoundSparseCellNameType.rowMarkerId;
+
+ return getIdentifier(metadata, buffer);
+ case SIMPLE_DENSE:
+ case COMPOUND_DENSE:
+ return null;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ public ByteBuffer collectionElement()
+ {
+ return isCollectionCell() ? get(size() - 1) : null;
+ }
+
+ // we always have a collection element if our clustering size is 2 less than our total size,
+ // and we never have one otherwiss
+ public boolean isCollectionCell()
+ {
+ return clusteringSizeDelta() == 2;
+ }
+
+ public boolean isSameCQL3RowAs(CellNameType type, CellName other)
+ {
+ switch (nametype())
+ {
+ case SIMPLE_DENSE:
+ case COMPOUND_DENSE:
+ return type.compare(this, other) == 0;
+ case COMPOUND_SPARSE_STATIC:
+ case COMPOUND_SPARSE:
+ int clusteringSize = clusteringSize();
+ if (clusteringSize != other.clusteringSize() || other.isStatic() != isStatic())
+ return false;
+ for (int i = 0; i < clusteringSize; i++)
+ if (type.subtype(i).compare(get(i), other.get(i)) != 0)
+ return false;
+ return true;
+ case SIMPLE_SPARSE:
+ return true;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ public int size()
+ {
+ return getShort(CELL_NAME_SIZE_OFFSET);
+ }
+
+ public boolean isEmpty()
+ {
+ return size() == 0;
+ }
+
+ public ByteBuffer get(int i)
+ {
+ // remember to take dense/sparse into account, and only return EOC when not dense
+ int size = size();
+ assert i >= 0 && i < size();
+ int cellNamesOffset = nameDeltaOffset(size);
+ int startDelta = i == 0 ? 0 : getShort(nameDeltaOffset(i));
+ int endDelta = i < size - 1 ? getShort(nameDeltaOffset(i + 1)) : valueStartOffset() - cellNamesOffset;
+ return getByteBuffer(cellNamesOffset + startDelta, endDelta - startDelta);
+ }
+
+ private static final ThreadLocal<byte[]> BUFFER = new ThreadLocal<byte[]>()
+ {
+ protected byte[] initialValue()
+ {
+ return new byte[256];
+ }
+ };
+
+ protected void writeComponentTo(MessageDigest digest, int i, boolean includeSize)
+ {
+ // remember to take dense/sparse into account, and only return EOC when not dense
+ int size = size();
+ assert i >= 0 && i < size();
+ int cellNamesOffset = nameDeltaOffset(size);
+ int startDelta = i == 0 ? 0 : getShort(nameDeltaOffset(i));
+ int endDelta = i < size - 1 ? getShort(nameDeltaOffset(i + 1)) : valueStartOffset() - cellNamesOffset;
+
+ int componentStart = cellNamesOffset + startDelta;
+ int count = endDelta - startDelta;
+
+ if (includeSize)
+ FBUtilities.updateWithShort(digest, count);
+
+ writeMemoryTo(digest, componentStart, count);
+ }
+
+ protected void writeMemoryTo(MessageDigest digest, int from, int count)
+ {
+ // only batch if we have more than 16 bytes remaining to transfer, otherwise fall-back to single-byte updates
+ int i = 0, batchEnd = count - 16;
+ if (i < batchEnd)
+ {
+ byte[] buffer = BUFFER.get();
+ while (i < batchEnd)
+ {
+ int transfer = Math.min(count - i, 256);
+ getBytes(from + i, buffer, 0, transfer);
+ digest.update(buffer, 0, transfer);
+ i += transfer;
+ }
+ }
+ while (i < count)
+ digest.update(getByte(from + i++));
+ }
+
+ public EOC eoc()
+ {
+ return EOC.NONE;
+ }
+
+ public Composite withEOC(EOC eoc)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Composite start()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Composite end()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ColumnSlice slice()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isPrefixOf(CType type, Composite c)
+ {
+ if (size() > c.size() || isStatic() != c.isStatic())
+ return false;
+
+ for (int i = 0; i < size(); i++)
+ {
+ if (type.subtype(i).compare(get(i), c.get(i)) != 0)
+ return false;
+ }
+ return true;
+ }
+
+ public ByteBuffer toByteBuffer()
+ {
+ // for simple sparse we just return our one name buffer
+ switch (nametype())
+ {
+ case SIMPLE_DENSE:
+ case SIMPLE_SPARSE:
+ return get(0);
+ case COMPOUND_DENSE:
+ case COMPOUND_SPARSE_STATIC:
+ case COMPOUND_SPARSE:
+ // This is the legacy format of composites.
+ // See org.apache.cassandra.db.marshal.CompositeType for details.
+ ByteBuffer result = ByteBuffer.allocate(cellDataSize());
+ if (isStatic())
+ ByteBufferUtil.writeShortLength(result, CompositeType.STATIC_MARKER);
+
+ for (int i = 0; i < size(); i++)
+ {
+ ByteBuffer bb = get(i);
+ ByteBufferUtil.writeShortLength(result, bb.remaining());
+ result.put(bb);
+ result.put((byte) 0);
+ }
+ result.flip();
+ return result;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ protected void updateWithName(MessageDigest digest)
+ {
+ // for simple sparse we just return our one name buffer
+ switch (nametype())
+ {
+ case SIMPLE_DENSE:
+ case SIMPLE_SPARSE:
+ writeComponentTo(digest, 0, false);
+ break;
+
+ case COMPOUND_DENSE:
+ case COMPOUND_SPARSE_STATIC:
+ case COMPOUND_SPARSE:
+ // This is the legacy format of composites.
+ // See org.apache.cassandra.db.marshal.CompositeType for details.
+ if (isStatic())
+ FBUtilities.updateWithShort(digest, CompositeType.STATIC_MARKER);
+
+ for (int i = 0; i < size(); i++)
+ {
+ writeComponentTo(digest, i, true);
+ digest.update((byte) 0);
+ }
+ break;
+
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ protected void updateWithValue(MessageDigest digest)
+ {
+ int offset = valueStartOffset();
+ int length = valueEndOffset() - offset;
+ writeMemoryTo(digest, offset, length);
+ }
+
+ @Override // this is the NAME dataSize, only!
+ public int dataSize()
+ {
+ switch (nametype())
+ {
+ case SIMPLE_DENSE:
+ case SIMPLE_SPARSE:
+ return valueStartOffset() - nameDeltaOffset(size());
+ case COMPOUND_DENSE:
+ case COMPOUND_SPARSE_STATIC:
+ case COMPOUND_SPARSE:
+ int size = size();
+ return valueStartOffset() - nameDeltaOffset(size) + 3 * size + (isStatic() ? 2 : 0);
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (obj == this)
+ return true;
+ if (obj instanceof CellName)
+ return equals((CellName) obj);
+ if (obj instanceof Cell)
+ return equals((Cell) obj);
+ return false;
+ }
+
+ public boolean equals(CellName that)
+ {
+ int size = this.size();
+ if (size != that.size())
+ return false;
+
+ for (int i = 0 ; i < size ; i++)
+ if (!get(i).equals(that.get(i)))
+ return false;
+ return true;
+ }
+
+ private static final ByteBuffer[] EMPTY = new ByteBuffer[0];
+
+ @Override
+ public CellName copy(CFMetaData cfm, AbstractAllocator allocator)
+ {
+ ByteBuffer[] r;
+ switch (nametype())
+ {
+ case SIMPLE_DENSE:
+ return CellNames.simpleDense(allocator.clone(get(0)));
+
+ case COMPOUND_DENSE:
+ r = new ByteBuffer[size()];
+ for (int i = 0; i < r.length; i++)
+ r[i] = allocator.clone(get(i));
+ return CellNames.compositeDense(r);
+
+ case COMPOUND_SPARSE_STATIC:
+ case COMPOUND_SPARSE:
+ int clusteringSize = clusteringSize();
+ r = clusteringSize == 0 ? EMPTY : new ByteBuffer[clusteringSize()];
+ for (int i = 0; i < clusteringSize; i++)
+ r[i] = allocator.clone(get(i));
+
+ ByteBuffer nameBuffer = get(r.length);
+ ColumnIdentifier name;
+
+ if (nameBuffer.remaining() == 0)
+ {
+ name = CompoundSparseCellNameType.rowMarkerId;
+ }
+ else
+ {
+ name = getIdentifier(cfm, nameBuffer);
+ }
+
+ if (clusteringSizeDelta() == 2)
+ {
+ ByteBuffer element = allocator.clone(get(size() - 1));
+ return CellNames.compositeSparseWithCollection(r, element, name, isStatic());
+ }
+ return CellNames.compositeSparse(r, name, isStatic());
+
+ case SIMPLE_SPARSE:
+ return CellNames.simpleSparse(getIdentifier(cfm, get(0)));
+ }
+ throw new IllegalStateException();
+ }
+
+ private static ColumnIdentifier getIdentifier(CFMetaData cfMetaData, ByteBuffer name)
+ {
+ ColumnDefinition def = cfMetaData.getColumnDefinition(name);
+ if (def != null)
+ {
+ return def.name;
+ }
+ else
+ {
+ // it's safe to simply grab based on clusteringPrefixSize() as we are only called if not a dense type
+ AbstractType<?> type = cfMetaData.comparator.subtype(cfMetaData.comparator.clusteringPrefixSize());
+ return new ColumnIdentifier(HeapAllocator.instance.clone(name), type);
+ }
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell withUpdatedTimestamp(long newTimestamp)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ protected long internalSize()
+ {
+ return MemoryUtil.getInt(peer);
+ }
+
+ private void checkPosition(long offset, long size)
+ {
+ assert size >= 0;
+ assert peer > 0 : "Memory was freed";
+ assert offset >= 0 && offset + size <= internalSize() : String.format("Illegal range: [%d..%d), size: %s", offset, offset + size, internalSize());
+ }
+
+ protected final void setByte(long offset, byte b)
+ {
+ checkPosition(offset, 1);
+ MemoryUtil.setByte(peer + offset, b);
+ }
+
+ protected final void setShort(long offset, short s)
+ {
+ checkPosition(offset, 1);
+ MemoryUtil.setShort(peer + offset, s);
+ }
+
+ protected final void setInt(long offset, int l)
+ {
+ checkPosition(offset, 4);
+ MemoryUtil.setInt(peer + offset, l);
+ }
+
+ protected final void setLong(long offset, long l)
+ {
+ checkPosition(offset, 8);
+ MemoryUtil.setLong(peer + offset, l);
+ }
+
+ protected final void setBytes(long offset, ByteBuffer buffer)
+ {
+ int start = buffer.position();
+ int count = buffer.limit() - start;
+ if (count == 0)
+ return;
+
+ checkPosition(offset, count);
+ MemoryUtil.setBytes(peer + offset, buffer);
+ }
+
+ protected final byte getByte(long offset)
+ {
+ checkPosition(offset, 1);
+ return MemoryUtil.getByte(peer + offset);
+ }
+
+ protected final void getBytes(long offset, byte[] trg, int trgOffset, int count)
+ {
+ checkPosition(offset, count);
+ MemoryUtil.getBytes(peer + offset, trg, trgOffset, count);
+ }
+
+ protected final int getShort(long offset)
+ {
+ checkPosition(offset, 2);
+ return MemoryUtil.getShort(peer + offset);
+ }
+
+ protected final int getInt(long offset)
+ {
+ checkPosition(offset, 4);
+ return MemoryUtil.getInt(peer + offset);
+ }
+
+ protected final long getLong(long offset)
+ {
+ checkPosition(offset, 8);
+ return MemoryUtil.getLong(peer + offset);
+ }
+
+ protected final ByteBuffer getByteBuffer(long offset, int length)
+ {
+ checkPosition(offset, length);
+ return MemoryUtil.getByteBuffer(peer + offset, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index e04867a..c9cff77 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -90,7 +90,7 @@ public class ArrayBackedSortedColumns extends ColumnFamily
{
ArrayBackedSortedColumns copy = new ArrayBackedSortedColumns(original.metadata, false, new Cell[original.getColumnCount()], 0, 0);
for (Cell cell : original)
- copy.internalAdd(cell.localCopy(allocator));
+ copy.internalAdd(cell.localCopy(original.metadata, allocator));
copy.sortedSize = copy.size; // internalAdd doesn't update sortedSize.
copy.delete(original);
return copy;
@@ -138,7 +138,7 @@ public class ArrayBackedSortedColumns extends ColumnFamily
Arrays.sort(cells, sortedSize, size, comparator);
// Determine the merge start position for that segment
- int pos = binarySearch(0, sortedSize, cells[sortedSize].name, internalComparator());
+ int pos = binarySearch(0, sortedSize, cells[sortedSize].name(), internalComparator());
if (pos < 0)
pos = -pos - 1;
@@ -420,7 +420,7 @@ public class ArrayBackedSortedColumns extends ColumnFamily
{
public CellName apply(Cell cell)
{
- return cell.name;
+ return cell.name();
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 72038b6..20fe64c 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
import java.util.AbstractCollection;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
@@ -28,17 +27,19 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import com.google.common.base.Function;
import com.google.common.base.Functions;
-import com.google.common.collect.*;
+import com.google.common.collect.Iterators;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
@@ -53,14 +54,14 @@ import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
*/
public class AtomicBTreeColumns extends ColumnFamily
{
- static final long HEAP_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
+ static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
+ ObjectSizes.measure(new Holder(null, null));
private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
{
public CellName apply(Cell column)
{
- return column.name;
+ return column.name();
}
};
@@ -155,110 +156,40 @@ public class AtomicBTreeColumns extends ColumnFamily
}
}
- // the function we provide to the btree utilities to perform any column replacements
- private static final class ColumnUpdater implements UpdateFunction<Cell>
- {
- final AtomicBTreeColumns updating;
- final Holder ref;
- final Function<Cell, Cell> transform;
- final Updater indexer;
- final Delta delta;
-
- private ColumnUpdater(AtomicBTreeColumns updating, Holder ref, Function<Cell, Cell> transform, Updater indexer, Delta delta)
- {
- this.updating = updating;
- this.ref = ref;
- this.transform = transform;
- this.indexer = indexer;
- this.delta = delta;
- }
-
- public Cell apply(Cell inserted)
- {
- indexer.insert(inserted);
- delta.insert(inserted);
- return transform.apply(inserted);
- }
-
- public Cell apply(Cell existing, Cell update)
- {
- Cell reconciled = update.reconcile(existing);
- indexer.update(existing, reconciled);
- if (existing != reconciled)
- delta.swap(existing, reconciled);
- else
- delta.abort(update);
- return transform.apply(reconciled);
- }
-
- public boolean abortEarly()
- {
- return updating.ref != ref;
- }
-
- public void allocated(long heapSize)
- {
- delta.addHeapSize(heapSize);
- }
- }
-
- private static Collection<Cell> transform(Comparator<Cell> cmp, ColumnFamily cf, Function<Cell, Cell> transformation, boolean sort)
- {
- Cell[] tmp = new Cell[cf.getColumnCount()];
-
- int i = 0;
- for (Cell c : cf)
- tmp[i++] = transformation.apply(c);
-
- if (sort)
- Arrays.sort(tmp, cmp);
-
- return Arrays.asList(tmp);
- }
-
/**
* This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it.
*
* @return the difference in size seen after merging the given columns
*/
- public Delta addAllWithSizeDelta(final ColumnFamily cm, Function<Cell, Cell> transformation, Updater indexer, Delta delta)
+ public long addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
{
- boolean transformed = false;
- Collection<Cell> insert = cm.getSortedColumns();
-
+ ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
while (true)
{
Holder current = ref;
+ updater.ref = current;
+ updater.reset();
- delta.reset();
DeletionInfo deletionInfo;
if (cm.deletionInfo().mayModify(current.deletionInfo))
{
deletionInfo = current.deletionInfo.copy().add(cm.deletionInfo());
- delta.addHeapSize(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+ updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
}
else
{
deletionInfo = current.deletionInfo;
}
- ColumnUpdater updater = new ColumnUpdater(this, current, transformation, indexer, delta);
- Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(), insert, true, updater);
+ Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(), cm, cm.getColumnCount(), true, updater);
if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo)))
{
indexer.updateRowLevelIndexes();
- return updater.delta;
- }
-
- if (!transformed)
- {
- // After failing once, transform Columns into a new collection to avoid repeatedly allocating Slab space
- insert = transform(metadata.comparator.columnComparator(), cm, transformation, false);
- transformed = true;
+ updater.finish();
+ return updater.dataSize;
}
}
-
}
// no particular reason not to implement these next methods, we just haven't needed them yet
@@ -290,7 +221,7 @@ public class AtomicBTreeColumns extends ColumnFamily
{
public int compare(Object o1, Object o2)
{
- return cmp.compare((CellName) o1, ((Cell) o2).name);
+ return cmp.compare((CellName) o1, ((Cell) o2).name());
}
};
}
@@ -352,7 +283,7 @@ public class AtomicBTreeColumns extends ColumnFamily
return false;
}
- private static class Holder
+ private static final class Holder
{
final DeletionInfo deletionInfo;
// the btree of columns
@@ -375,69 +306,96 @@ public class AtomicBTreeColumns extends ColumnFamily
}
}
- // TODO: create a stack-allocation-friendly list to help optimise garbage for updates to rows with few columns
-
- /**
- * tracks the size changes made while merging a new group of cells in
- */
- public static final class Delta
+ // the function we provide to the btree utilities to perform any column replacements
+ private static final class ColumnUpdater implements UpdateFunction<Cell>
{
- private long dataSize;
- private long heapSize;
+ final AtomicBTreeColumns updating;
+ final CFMetaData metadata;
+ final MemtableAllocator allocator;
+ final OpOrder.Group writeOp;
+ final Updater indexer;
+ Holder ref;
+ long dataSize;
+ long heapSize;
+ final MemtableAllocator.DataReclaimer reclaimer;
+ List<Cell> inserted; // TODO: replace with walk of aborted BTree
- // we track the discarded cells (cells that were in the btree, but replaced by new ones)
- // separately from aborted ones (were part of an update but older than existing cells)
- // since we need to reset the former when we race on the btree update, but not the latter
- private List<Cell> discarded = new ArrayList<>();
- private List<Cell> aborted;
+ private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ {
+ this.updating = updating;
+ this.allocator = allocator;
+ this.writeOp = writeOp;
+ this.indexer = indexer;
+ this.metadata = metadata;
+ this.reclaimer = allocator.reclaimer();
+ }
- protected void reset()
+ public Cell apply(Cell insert)
{
- this.dataSize = 0;
- this.heapSize = 0;
- discarded.clear();
+ indexer.insert(insert);
+ insert = insert.localCopy(metadata, allocator, writeOp);
+ this.dataSize += insert.cellDataSize();
+ this.heapSize += insert.excessHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(insert);
+ return insert;
}
- protected void addHeapSize(long heapSize)
+ public Cell apply(Cell existing, Cell update)
{
- this.heapSize += heapSize;
+ Cell reconciled = existing.reconcile(update);
+ indexer.update(existing, reconciled);
+ if (existing != reconciled)
+ {
+ reconciled = reconciled.localCopy(metadata, allocator, writeOp);
+ dataSize += reconciled.cellDataSize() - existing.cellDataSize();
+ heapSize += reconciled.excessHeapSizeExcludingData() - existing.excessHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(reconciled);
+ discard(existing);
+ }
+ return reconciled;
}
- protected void swap(Cell old, Cell updated)
+ protected void reset()
{
- dataSize += updated.dataSize() - old.dataSize();
- heapSize += updated.excessHeapSizeExcludingData() - old.excessHeapSizeExcludingData();
- discarded.add(old);
+ this.dataSize = 0;
+ this.heapSize = 0;
+ if (inserted != null)
+ {
+ for (Cell cell : inserted)
+ abort(cell);
+ inserted.clear();
+ }
+ reclaimer.cancel();
}
- protected void insert(Cell insert)
+ protected void abort(Cell abort)
{
- this.dataSize += insert.dataSize();
- this.heapSize += insert.excessHeapSizeExcludingData();
+ reclaimer.reclaimImmediately(abort);
}
- private void abort(Cell neverUsed)
+ protected void discard(Cell discard)
{
- if (aborted == null)
- aborted = new ArrayList<>();
- aborted.add(neverUsed);
+ reclaimer.reclaim(discard);
}
- public long dataSize()
+ public boolean abortEarly()
{
- return dataSize;
+ return updating.ref != ref;
}
- public long excessHeapSize()
+ public void allocated(long heapSize)
{
- return heapSize;
+ this.heapSize += heapSize;
}
- public Iterable<Cell> reclaimed()
+ protected void finish()
{
- if (aborted == null)
- return discarded;
- return Iterables.concat(discarded, aborted);
+ allocator.onHeap().allocate(heapSize, writeOp);
+ reclaimer.commit();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCell.java b/src/java/org/apache/cassandra/db/BufferCell.java
new file mode 100644
index 0000000..93251c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferCell.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferCell extends AbstractCell
+{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(CellNames.simpleDense(ByteBuffer.allocate(1))));
+
+ protected final CellName name;
+ protected final ByteBuffer value;
+ protected final long timestamp;
+
+ BufferCell(CellName name)
+ {
+ this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ }
+
+ public BufferCell(CellName name, ByteBuffer value)
+ {
+ this(name, value, 0);
+ }
+
+ public BufferCell(CellName name, ByteBuffer value, long timestamp)
+ {
+ assert name != null;
+ assert value != null;
+
+ this.name = name;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ return new BufferCell(newName, value, timestamp);
+ }
+
+ @Override
+ public Cell withUpdatedTimestamp(long newTimestamp)
+ {
+ return new BufferCell(name, value, newTimestamp);
+ }
+
+ @Override
+ public CellName name() {
+ return name;
+ }
+
+ @Override
+ public ByteBuffer value() {
+ return value;
+ }
+
+ @Override
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public long excessHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE + name.excessHeapSizeExcludingData() + ObjectSizes.sizeOnHeapExcludingData(value);
+ }
+
+ @Override
+ public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+ {
+ return new BufferCell(name.copy(metadata, allocator), allocator.clone(value), timestamp);
+ }
+
+ @Override
+ public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+ {
+ return allocator.clone(this, metadata, opGroup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferCounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCounterCell.java b/src/java/org/apache/cassandra/db/BufferCounterCell.java
new file mode 100644
index 0000000..a70e274
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferCounterCell.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferCounterCell extends BufferCell implements CounterCell
+{
+ private final long timestampOfLastDelete;
+
+ public BufferCounterCell(CellName name, ByteBuffer value, long timestamp)
+ {
+ this(name, value, timestamp, Long.MIN_VALUE);
+ }
+
+ public BufferCounterCell(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
+ {
+ super(name, value, timestamp);
+ this.timestampOfLastDelete = timestampOfLastDelete;
+ }
+
+ public static CounterCell create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
+ {
+ if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value)))
+ value = contextManager.clearAllLocal(value);
+ return new BufferCounterCell(name, value, timestamp, timestampOfLastDelete);
+ }
+
+ // For use by tests of compatibility with pre-2.1 counter only.
+ public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete)
+ {
+ return new BufferCounterCell(name, contextManager.createLocal(value), timestamp, timestampOfLastDelete);
+ }
+
+ @Override
+ public Cell withUpdatedName(CellName newName)
+ {
+ return new BufferCounterCell(newName, value, timestamp, timestampOfLastDelete);
+ }
+
+ @Override
+ public long timestampOfLastDelete()
+ {
+ return timestampOfLastDelete;
+ }
+
+ @Override
+ public long total()
+ {
+ return contextManager.total(value);
+ }
+
+ @Override
+ public int cellDataSize()
+ {
+ // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
+ return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
+ }
+
+ @Override
+ public int serializedSize(CellNameType type, TypeSizes typeSizes)
+ {
+ return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
+ }
+
+ @Override
+ public Cell diff(Cell cell)
+ {
+ return diff(this, cell);
+ }
+
+ /*
+ * We have to special case digest creation for counter column because
+ * we don't want to include the information about which shard of the
+ * context is a delta or not, since this information differs from node to
+ * node.
+ */
+ @Override
+ public void updateDigest(MessageDigest digest)
+ {
+ digest.update(name().toByteBuffer().duplicate());
+ // We don't take the deltas into account in a digest
+ contextManager.updateDigest(digest, value());
+
+ FBUtilities.updateWithLong(digest, timestamp);
+ FBUtilities.updateWithByte(digest, serializationFlags());
+ FBUtilities.updateWithLong(digest, timestampOfLastDelete);
+ }
+
+ @Override
+ public Cell reconcile(Cell cell)
+ {
+ return reconcile(this, cell);
+ }
+
+ @Override
+ public boolean hasLegacyShards()
+ {
+ return contextManager.hasLegacyShards(value);
+ }
+
+ @Override
+ public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+ {
+ return new BufferCounterCell(name.copy(metadata, allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
+ }
+
+ @Override
+ public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+ {
+ return allocator.clone(this, metadata, opGroup);
+ }
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ return String.format("%s:false:%s@%d!%d",
+ comparator.getString(name()),
+ contextManager.toString(value()),
+ timestamp(),
+ timestampOfLastDelete);
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.COUNTER_MASK;
+ }
+
+ @Override
+ public void validateFields(CFMetaData metadata) throws MarshalException
+ {
+ validateName(metadata);
+ // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
+ // which is not the internal representation of counters
+ contextManager.validateContext(value());
+ }
+
+ @Override
+ public Cell markLocalToBeCleared()
+ {
+ ByteBuffer marked = contextManager.markLocalToBeCleared(value());
+ return marked == value() ? this : new BufferCounterCell(name(), marked, timestamp(), timestampOfLastDelete);
+ }
+
+ @Override
+ public boolean equals(Cell cell)
+ {
+ return cell instanceof CounterCell && equals((CounterCell) cell);
+ }
+
+ public boolean equals(CounterCell cell)
+ {
+ return super.equals(cell) && timestampOfLastDelete == cell.timestampOfLastDelete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java b/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
new file mode 100644
index 0000000..44ab83e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferCounterUpdateCell extends BufferCell implements CounterUpdateCell
+{
+ public BufferCounterUpdateCell(CellName name, long value, long timestamp)
+ {
+ this(name, ByteBufferUtil.bytes(value), timestamp);
+ }
+
+ public BufferCounterUpdateCell(CellName name, ByteBuffer value, long timestamp)
+ {
+ super(name, value, timestamp);
+ }
+
+ public long delta()
+ {
+ return value().getLong(value.position());
+ }
+
+ @Override
+ public Cell diff(Cell cell)
+ {
+ // Diff is used during reads, but we should never read those columns
+ throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateCell.");
+ }
+
+ @Override
+ public Cell reconcile(Cell cell)
+ {
+ // The only time this could happen is if a batchAdd ships two
+ // increment for the same cell. Hence we simply sums the delta.
+
+ // tombstones take precedence
+ if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
+ return timestamp > cell.timestamp() ? this : cell;
+
+ // neither is tombstoned
+ assert cell instanceof CounterUpdateCell : "Wrong class type.";
+ CounterUpdateCell c = (CounterUpdateCell) cell;
+ return new BufferCounterUpdateCell(name, delta() + c.delta(), Math.max(timestamp, c.timestamp()));
+ }
+
+ @Override
+ public int serializationFlags()
+ {
+ return ColumnSerializer.COUNTER_UPDATE_MASK;
+ }
+
+ @Override
+ public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getString(CellNameType comparator)
+ {
+ return String.format("%s:%s@%d", comparator.getString(name()), ByteBufferUtil.toLong(value), timestamp());
+ }
+}