You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2016/01/27 11:50:07 UTC
[2/2] cassandra git commit: 9472: Reintroduce Off-Heap Memtables
9472: Reintroduce Off-Heap Memtables
patch by benedict and stefania
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f412431
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f412431
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f412431
Branch: refs/heads/trunk
Commit: 2f41243191c381193a3bf6ec3730ff6555325d06
Parents: a0901b8
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Nov 4 17:03:38 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 27 10:49:26 2016 +0000
----------------------------------------------------------------------
conf/cassandra.yaml | 1 +
.../cassandra/config/DatabaseDescriptor.java | 3 +-
.../db/AbstractBufferClusteringPrefix.java | 72 ++++++
.../cassandra/db/AbstractClusteringPrefix.java | 44 ----
.../apache/cassandra/db/BufferClustering.java | 44 ++++
src/java/org/apache/cassandra/db/CBuilder.java | 6 +-
.../org/apache/cassandra/db/Clustering.java | 150 ++++++------
.../cassandra/db/ClusteringComparator.java | 3 -
.../apache/cassandra/db/ClusteringPrefix.java | 4 +-
.../org/apache/cassandra/db/LegacyLayout.java | 8 +-
src/java/org/apache/cassandra/db/Memtable.java | 5 +-
.../org/apache/cassandra/db/MultiCBuilder.java | 2 +-
.../apache/cassandra/db/NativeClustering.java | 125 ++++++++++
.../apache/cassandra/db/NativeDecoratedKey.java | 6 +-
.../apache/cassandra/db/RowUpdateBuilder.java | 2 -
.../org/apache/cassandra/db/Serializers.java | 6 +-
.../db/SinglePartitionReadCommand.java | 18 +-
src/java/org/apache/cassandra/db/Slice.java | 2 +-
.../apache/cassandra/db/filter/RowFilter.java | 4 +-
.../db/partitions/AbstractBTreePartition.java | 25 +-
.../db/partitions/AtomicBTreePartition.java | 66 +++++-
.../db/partitions/FilteredPartition.java | 2 +-
.../apache/cassandra/db/rows/AbstractCell.java | 79 +++++++
.../org/apache/cassandra/db/rows/BTreeRow.java | 33 ++-
.../apache/cassandra/db/rows/BufferCell.java | 232 -------------------
src/java/org/apache/cassandra/db/rows/Cell.java | 161 ++++++++++++-
.../apache/cassandra/db/rows/NativeCell.java | 151 ++++++++++++
.../apache/cassandra/db/rows/RowIterator.java | 5 -
.../db/rows/UnfilteredRowIterators.java | 27 ---
.../apache/cassandra/db/transform/BaseRows.java | 4 +
.../cassandra/db/transform/Transformation.java | 6 +
.../apache/cassandra/db/view/TemporalRow.java | 11 +-
.../cassandra/index/internal/IndexEntry.java | 1 +
.../index/internal/keys/KeysSearcher.java | 4 +-
.../io/sstable/metadata/MetadataCollector.java | 4 -
.../cassandra/io/util/MemoryInputStream.java | 2 +-
.../apache/cassandra/service/CacheService.java | 2 -
.../cassandra/thrift/CassandraServer.java | 16 +-
.../cassandra/thrift/ThriftResultsMerger.java | 2 +-
.../utils/memory/AbstractAllocator.java | 1 -
.../cassandra/utils/memory/EnsureOnHeap.java | 150 ++++++++++++
.../cassandra/utils/memory/HeapAllocator.java | 5 +
.../apache/cassandra/utils/memory/HeapPool.java | 5 -
.../cassandra/utils/memory/MemoryUtil.java | 19 +-
.../utils/memory/MemtableAllocator.java | 5 +-
.../cassandra/utils/memory/MemtablePool.java | 1 -
.../cassandra/utils/memory/NativeAllocator.java | 46 +++-
.../cassandra/utils/memory/NativePool.java | 6 -
.../cassandra/utils/memory/SlabAllocator.java | 15 +-
.../apache/cassandra/utils/memory/SlabPool.java | 5 -
test/conf/cassandra.yaml | 3 +-
.../org/apache/cassandra/db/KeyspaceTest.java | 4 +-
.../org/apache/cassandra/db/NativeCellTest.java | 171 ++++++++++++++
.../cassandra/db/RangeTombstoneListTest.java | 2 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 20 +-
.../apache/cassandra/db/ReadMessageTest.java | 2 +-
.../org/apache/cassandra/db/RowCacheTest.java | 2 +-
.../rows/DigestBackwardCompatibilityTest.java | 2 -
.../rows/UnfilteredRowIteratorsMergeTest.java | 2 +-
.../io/sstable/CQLSSTableWriterClientTest.java | 4 -
.../cassandra/io/sstable/SSTableLoaderTest.java | 2 +-
.../service/pager/PagingStateTest.java | 2 +-
62 files changed, 1265 insertions(+), 547 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e29a6d3..a9749f2 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -398,6 +398,7 @@ concurrent_materialized_view_writes: 32
# Options are:
# heap_buffers: on heap nio buffers
# offheap_buffers: off heap (direct) nio buffers
+# offheap_objects: off heap objects
memtable_allocation_type: heap_buffers
# Total space to use for commit logs on disk.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2a2719a..b09605f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1847,8 +1847,7 @@ public class DatabaseDescriptor
}
return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
case offheap_objects:
- throw new ConfigurationException("offheap_objects are not available in 3.0. They should be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");
- // return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+ return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
default:
throw new AssertionError();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
new file mode 100644
index 0000000..95bc777
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+public abstract class AbstractBufferClusteringPrefix extends AbstractClusteringPrefix
+{
+ public static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0];
+ private static final long EMPTY_SIZE = ObjectSizes.measure(Clustering.make(EMPTY_VALUES_ARRAY));
+
+ protected final Kind kind;
+ protected final ByteBuffer[] values;
+
+ protected AbstractBufferClusteringPrefix(Kind kind, ByteBuffer[] values)
+ {
+ this.kind = kind;
+ this.values = values;
+ }
+
+ public Kind kind()
+ {
+ return kind;
+ }
+
+ public ClusteringPrefix clustering()
+ {
+ return this;
+ }
+
+ public int size()
+ {
+ return values.length;
+ }
+
+ public ByteBuffer get(int i)
+ {
+ return values[i];
+ }
+
+ public ByteBuffer[] getRawValues()
+ {
+ return values;
+ }
+
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
index 2631b46..0b1daf7 100644
--- a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
@@ -22,48 +22,14 @@ import java.security.MessageDigest;
import java.util.Objects;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
public abstract class AbstractClusteringPrefix implements ClusteringPrefix
{
- protected static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0];
-
- private static final long EMPTY_SIZE = ObjectSizes.measure(new Clustering(EMPTY_VALUES_ARRAY));
-
- protected final Kind kind;
- protected final ByteBuffer[] values;
-
- protected AbstractClusteringPrefix(Kind kind, ByteBuffer[] values)
- {
- this.kind = kind;
- this.values = values;
- }
-
- public Kind kind()
- {
- return kind;
- }
-
public ClusteringPrefix clustering()
{
return this;
}
- public int size()
- {
- return values.length;
- }
-
- public ByteBuffer get(int i)
- {
- return values[i];
- }
-
- public ByteBuffer[] getRawValues()
- {
- return values;
- }
-
public int dataSize()
{
int size = 0;
@@ -86,16 +52,6 @@ public abstract class AbstractClusteringPrefix implements ClusteringPrefix
FBUtilities.updateWithByte(digest, kind().ordinal());
}
- public long unsharedHeapSize()
- {
- return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
- }
-
- public long unsharedHeapSizeExcludingData()
- {
- return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
- }
-
@Override
public final int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/BufferClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferClustering.java b/src/java/org/apache/cassandra/db/BufferClustering.java
new file mode 100644
index 0000000..7c6bb20
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferClustering.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * The clustering column values for a row.
+ * <p>
+ * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have
+ * as many values as there is clustering columns in the table it is part of. It is the clustering
+ * prefix used by rows.
+ * <p>
+ * Note however that while it's size must be equal to the table clustering size, a clustering can have
+ * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null,
+ * all of the following ones will be too because that's what thrift allows, but it's never assumed by the
+ * code so we could start generally allowing nulls for clustering columns if we wanted to).
+ */
+public class BufferClustering extends AbstractBufferClusteringPrefix implements Clustering
+{
+ BufferClustering(ByteBuffer... values)
+ {
+ super(Kind.CLUSTERING, values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/CBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CBuilder.java b/src/java/org/apache/cassandra/db/CBuilder.java
index 94feb93..73b575f 100644
--- a/src/java/org/apache/cassandra/db/CBuilder.java
+++ b/src/java/org/apache/cassandra/db/CBuilder.java
@@ -162,7 +162,7 @@ public abstract class CBuilder
built = true;
// Currently, only dense table can leave some clustering column out (see #7990)
- return size == 0 ? Clustering.EMPTY : new Clustering(values);
+ return size == 0 ? Clustering.EMPTY : Clustering.make(values);
}
public Slice.Bound buildBound(boolean isStart, boolean isInclusive)
@@ -196,7 +196,7 @@ public abstract class CBuilder
ByteBuffer[] newValues = Arrays.copyOf(values, type.size());
newValues[size] = value;
- return new Clustering(newValues);
+ return Clustering.make(newValues);
}
public Clustering buildWith(List<ByteBuffer> newValues)
@@ -207,7 +207,7 @@ public abstract class CBuilder
for (ByteBuffer value : newValues)
buffers[newSize++] = value;
- return new Clustering(buffers);
+ return Clustering.make(buffers);
}
public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, boolean isInclusive)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index a40cc1f..f5ffae4 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -1,53 +1,91 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.memory.AbstractAllocator;
-/**
- * The clustering column values for a row.
- * <p>
- * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have
- * as many values as there is clustering columns in the table it is part of. It is the clustering
- * prefix used by rows.
- * <p>
- * Note however that while it's size must be equal to the table clustering size, a clustering can have
- * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null,
- * all of the following ones will be too because that's what thrift allows, but it's never assumed by the
- * code so we could start generally allowing nulls for clustering columns if we wanted to).
- */
-public class Clustering extends AbstractClusteringPrefix
+import static org.apache.cassandra.db.AbstractBufferClusteringPrefix.EMPTY_VALUES_ARRAY;
+
+public interface Clustering extends ClusteringPrefix
{
public static final Serializer serializer = new Serializer();
+ public long unsharedHeapSizeExcludingData();
+
+ public default Clustering copy(AbstractAllocator allocator)
+ {
+ // Important for STATIC_CLUSTERING (but must copy empty native clustering types).
+ if (size() == 0)
+ return kind() == Kind.STATIC_CLUSTERING ? this : new BufferClustering(EMPTY_VALUES_ARRAY);
+
+ ByteBuffer[] newValues = new ByteBuffer[size()];
+ for (int i = 0; i < size(); i++)
+ {
+ ByteBuffer val = get(i);
+ newValues[i] = val == null ? null : allocator.clone(val);
+ }
+ return new BufferClustering(newValues);
+ }
+
+ public default String toString(CFMetaData metadata)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < size(); i++)
+ {
+ ColumnDefinition c = metadata.clusteringColumns().get(i);
+ sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i)));
+ }
+ return sb.toString();
+ }
+
+ public default String toCQLString(CFMetaData metadata)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < size(); i++)
+ {
+ ColumnDefinition c = metadata.clusteringColumns().get(i);
+ sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
+ }
+ return sb.toString();
+ }
+
+ public static Clustering make(ByteBuffer... values)
+ {
+ return new BufferClustering(values);
+ }
+
/**
* The special cased clustering used by all static rows. It is a special case in the
* sense that it's always empty, no matter how many clustering columns the table has.
*/
- public static final Clustering STATIC_CLUSTERING = new Clustering(EMPTY_VALUES_ARRAY)
+ public static final Clustering STATIC_CLUSTERING = new BufferClustering(EMPTY_VALUES_ARRAY)
{
@Override
public Kind kind()
@@ -69,7 +107,7 @@ public class Clustering extends AbstractClusteringPrefix
};
/** Empty clustering for tables having no clustering columns. */
- public static final Clustering EMPTY = new Clustering(EMPTY_VALUES_ARRAY)
+ public static final Clustering EMPTY = new BufferClustering(EMPTY_VALUES_ARRAY)
{
@Override
public String toString(CFMetaData metadata)
@@ -78,50 +116,6 @@ public class Clustering extends AbstractClusteringPrefix
}
};
- public Clustering(ByteBuffer... values)
- {
- super(Kind.CLUSTERING, values);
- }
-
- public Kind kind()
- {
- return Kind.CLUSTERING;
- }
-
- public Clustering copy(AbstractAllocator allocator)
- {
- // Important for STATIC_CLUSTERING (but no point in being wasteful in general).
- if (size() == 0)
- return this;
-
- ByteBuffer[] newValues = new ByteBuffer[size()];
- for (int i = 0; i < size(); i++)
- newValues[i] = values[i] == null ? null : allocator.clone(values[i]);
- return new Clustering(newValues);
- }
-
- public String toString(CFMetaData metadata)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < size(); i++)
- {
- ColumnDefinition c = metadata.clusteringColumns().get(i);
- sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i)));
- }
- return sb.toString();
- }
-
- public String toCQLString(CFMetaData metadata)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < size(); i++)
- {
- ColumnDefinition c = metadata.clusteringColumns().get(i);
- sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
- }
- return sb.toString();
- }
-
/**
* Serializer for Clustering object.
* <p>
@@ -161,7 +155,7 @@ public class Clustering extends AbstractClusteringPrefix
return EMPTY;
ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types);
- return new Clustering(values);
+ return new BufferClustering(values);
}
public Clustering deserialize(ByteBuffer in, int version, List<AbstractType<?>> types)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index f3411cf..f5f6ae8 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@@ -29,8 +28,6 @@ import com.google.common.collect.ImmutableList;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FastByteOperations;
import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 9477651..cacaeb5 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -74,7 +74,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
*/
public final int comparedToClustering;
- private Kind(int comparison, int comparedToClustering)
+ Kind(int comparison, int comparedToClustering)
{
this.comparison = comparison;
this.comparedToClustering = comparedToClustering;
@@ -500,7 +500,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
{
assert nextIsRow;
deserializeAll();
- Clustering clustering = new Clustering(nextValues);
+ Clustering clustering = Clustering.make(nextValues);
nextValues = null;
return clustering;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 07778b3..4e1eab5 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -103,7 +103,7 @@ public abstract class LegacyLayout
if (metadata.isSuper())
{
assert superColumnName != null;
- return decodeForSuperColumn(metadata, new Clustering(superColumnName), cellname);
+ return decodeForSuperColumn(metadata, Clustering.make(superColumnName), cellname);
}
assert superColumnName == null;
@@ -161,7 +161,7 @@ public abstract class LegacyLayout
{
// If it's a compact table, it means the column is in fact a "dynamic" one
if (metadata.isCompactTable())
- return new LegacyCellName(new Clustering(column), metadata.compactValueColumn(), null);
+ return new LegacyCellName(Clustering.make(column), metadata.compactValueColumn(), null);
if (def == null)
throw new UnknownColumnException(metadata, column);
@@ -298,7 +298,7 @@ public abstract class LegacyLayout
? CompositeType.splitName(value)
: Collections.singletonList(value);
- return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
+ return Clustering.make(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
}
public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering)
@@ -1308,7 +1308,7 @@ public abstract class LegacyLayout
ByteBuffer[] values = new ByteBuffer[bound.size()];
for (int i = 0; i < bound.size(); i++)
values[i] = bound.get(i);
- return new Clustering(values);
+ return Clustering.make(values);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 952c045..244c7b6 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.index.transactions.UpdateTransaction;
@@ -53,6 +54,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.apache.cassandra.utils.memory.MemtablePool;
@@ -60,7 +62,7 @@ public class Memtable implements Comparable<Memtable>
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
- static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
+ public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000")));
private final MemtableAllocator allocator;
@@ -526,6 +528,7 @@ public class Memtable implements Comparable<Memtable>
assert entry.getKey() instanceof DecoratedKey;
DecoratedKey key = (DecoratedKey)entry.getKey();
ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
+
return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/MultiCBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java b/src/java/org/apache/cassandra/db/MultiCBuilder.java
index 8353703..7a4eef0 100644
--- a/src/java/org/apache/cassandra/db/MultiCBuilder.java
+++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java
@@ -239,7 +239,7 @@ public abstract class MultiCBuilder
if (hasMissingElements)
return BTreeSet.empty(comparator);
- return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : new Clustering(elements));
+ return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : Clustering.make(elements));
}
public NavigableSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java b/src/java/org/apache/cassandra/db/NativeClustering.java
new file mode 100644
index 0000000..1943b71
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeClustering.java
@@ -0,0 +1,125 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeClustering extends AbstractClusteringPrefix implements Clustering
+{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeClustering());
+
+ private final long peer;
+
+ private NativeClustering() { peer = 0; }
+
+ public NativeClustering(NativeAllocator allocator, OpOrder.Group writeOp, Clustering clustering)
+ {
+ int count = clustering.size();
+ int metadataSize = (count * 2) + 4;
+ int dataSize = clustering.dataSize();
+ int bitmapSize = ((count + 7) >>> 3);
+
+ assert count < 64 << 10;
+ assert dataSize < 64 << 10;
+
+ peer = allocator.allocate(metadataSize + dataSize + bitmapSize, writeOp);
+ long bitmapStart = peer + metadataSize;
+ MemoryUtil.setShort(peer, (short) count);
+ MemoryUtil.setShort(peer + (metadataSize - 2), (short) dataSize); // goes at the end of the other offsets
+
+ MemoryUtil.setByte(bitmapStart, bitmapSize, (byte) 0);
+ long dataStart = peer + metadataSize + bitmapSize;
+ int dataOffset = 0;
+ for (int i = 0 ; i < count ; i++)
+ {
+ MemoryUtil.setShort(peer + 2 + i * 2, (short) dataOffset);
+
+ ByteBuffer value = clustering.get(i);
+ if (value == null)
+ {
+ long boffset = bitmapStart + (i >>> 3);
+ int b = MemoryUtil.getByte(boffset);
+ b |= 1 << (i & 7);
+ MemoryUtil.setByte(boffset, (byte) b);
+ continue;
+ }
+
+ assert value.order() == ByteOrder.BIG_ENDIAN;
+
+ int size = value.remaining();
+ MemoryUtil.setBytes(dataStart + dataOffset, value);
+ dataOffset += size;
+ }
+ }
+
+ public Kind kind()
+ {
+ return Kind.CLUSTERING;
+ }
+
+ public int size()
+ {
+ return MemoryUtil.getShort(peer);
+ }
+
+ public ByteBuffer get(int i)
+ {
+ // offset at which we store the dataOffset
+ int size = size();
+ if (i >= size)
+ throw new IndexOutOfBoundsException();
+
+ int metadataSize = (size * 2) + 4;
+ int bitmapSize = ((size + 7) >>> 3);
+ long bitmapStart = peer + metadataSize;
+ int b = MemoryUtil.getByte(bitmapStart + (i >>> 3));
+ if ((b & (1 << (i & 7))) != 0)
+ return null;
+
+ int startOffset = MemoryUtil.getShort(peer + 2 + i * 2);
+ int endOffset = MemoryUtil.getShort(peer + 4 + i * 2);
+ return MemoryUtil.getByteBuffer(bitmapStart + bitmapSize + startOffset,
+ endOffset - startOffset,
+ ByteOrder.BIG_ENDIAN);
+ }
+
+ public ByteBuffer[] getRawValues()
+ {
+ ByteBuffer[] values = new ByteBuffer[size()];
+ for (int i = 0 ; i < values.length ; i++)
+ values[i] = get(i);
+ return values;
+ }
+
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE;
+ }
+
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
index ca874c3..019209e 100644
--- a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
@@ -18,9 +18,11 @@
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
import org.apache.cassandra.utils.memory.MemoryUtil;
import org.apache.cassandra.utils.memory.NativeAllocator;
@@ -32,6 +34,8 @@ public class NativeDecoratedKey extends DecoratedKey
{
super(token);
assert key != null;
+ assert key.order() == ByteOrder.BIG_ENDIAN;
+
int size = key.remaining();
this.peer = allocator.allocate(4 + size, writeOp);
MemoryUtil.setInt(peer, size);
@@ -40,6 +44,6 @@ public class NativeDecoratedKey extends DecoratedKey
public ByteBuffer getKey()
{
- return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer));
+ return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer), ByteOrder.BIG_ENDIAN);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 8ace988..0ceec90 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,7 +26,6 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.partitions.*;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 9b29d89..17f1de0 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -29,8 +29,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.utils.ByteBufferUtil;
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-
/**
* Holds references on serializers that depend on the table definition.
*/
@@ -70,7 +68,7 @@ public class Serializers
return Clustering.EMPTY;
if (!metadata.isCompound())
- return new Clustering(bb);
+ return Clustering.make(bb);
List<ByteBuffer> components = CompositeType.splitName(bb);
byte eoc = CompositeType.lastEOC(bb);
@@ -81,7 +79,7 @@ public class Serializers
if (components.size() > clusteringSize)
components = components.subList(0, clusteringSize);
- return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
+ return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 9ad9ba3..680b4b5 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -460,8 +460,7 @@ public class SinglePartitionReadCommand extends ReadCommand
{
Tracing.trace("Executing single-partition query on {}", cfs.name);
- boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
- return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+ return queryMemtableAndDiskInternal(cfs);
}
@Override
@@ -470,7 +469,7 @@ public class SinglePartitionReadCommand extends ReadCommand
return oldestUnrepairedTombstone;
}
- private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
{
/*
* We have 2 main strategies:
@@ -484,7 +483,7 @@ public class SinglePartitionReadCommand extends ReadCommand
* of shards so have the same problem).
*/
if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections())
- return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+ return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
@@ -502,10 +501,8 @@ public class SinglePartitionReadCommand extends ReadCommand
@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
- @SuppressWarnings("resource") // same as above
- UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
- iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+ iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
}
/*
* We can't eliminate full sstables based on the timestamp of what we've already read like
@@ -649,7 +646,7 @@ public class SinglePartitionReadCommand extends ReadCommand
* no collection or counters are included).
* This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
*/
- private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter)
{
Tracing.trace("Acquiring sstable references");
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
@@ -668,10 +665,7 @@ public class SinglePartitionReadCommand extends ReadCommand
if (iter.isEmpty())
continue;
- UnfilteredRowIterator clonedFilter = copyOnHeap
- ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
- : iter;
- result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
+ result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, false);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index 7fde45e..8611470 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -343,7 +343,7 @@ public class Slice
* <p>
* This can be either a start or an end bound, and this can be either inclusive or exclusive.
*/
- public static class Bound extends AbstractClusteringPrefix
+ public static class Bound extends AbstractBufferClusteringPrefix
{
public static final Serializer serializer = new Serializer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index c234fc9..79bdbd7 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -194,11 +194,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
if (metadata.isCompound())
{
List<ByteBuffer> values = CompositeType.splitName(name);
- return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
+ return Clustering.make(values.toArray(new ByteBuffer[metadata.comparator.size()]));
}
else
{
- return new Clustering(name);
+ return Clustering.make(name);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index e44124f..c276c57 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -183,7 +183,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
if (slices.size() == 0)
{
DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
- return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed);
+ return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey(), staticRow, partitionDeletion, reversed);
}
return slices.size() == 1
@@ -202,9 +202,9 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
}
private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter,
- ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
+ ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
{
- return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(),
+ return new RowAndDeletionMergeIterator(metadata, partitionKey(), current.deletionInfo.getPartitionDeletion(),
selection, staticRow, reversed, current.stats,
rowIter, deleteIter,
canHaveShadowedData());
@@ -215,22 +215,10 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
final Holder current;
final ColumnFilter selection;
- private AbstractIterator(ColumnFilter selection, boolean isReversed)
- {
- this(AbstractBTreePartition.this.holder(), selection, isReversed);
- }
-
- private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed)
- {
- this(current,
- AbstractBTreePartition.this.staticRow(current, selection, false),
- selection, isReversed);
- }
-
private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed)
{
super(AbstractBTreePartition.this.metadata,
- AbstractBTreePartition.this.partitionKey,
+ AbstractBTreePartition.this.partitionKey(),
current.deletionInfo.getPartitionDeletion(),
selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
// it would also be more precise to return the intersection of the selection and current.columns,
@@ -318,10 +306,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
builder.auto(false);
while (rows.hasNext())
- {
- Row row = rows.next();
- builder.add(row);
- }
+ builder.add(rows.next());
if (reversed)
builder.reverse();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 2be882e..c7113d4 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -26,12 +27,12 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
import org.apache.cassandra.utils.concurrent.Locks;
@@ -181,7 +182,66 @@ public class AtomicBTreePartition extends AbstractBTreePartition
if (monitorOwned)
Locks.monitorExitUnsafe(this);
}
+ }
+
+ @Override
+ public DeletionInfo deletionInfo()
+ {
+ return allocator.ensureOnHeap().applyToDeletionInfo(super.deletionInfo());
+ }
+ @Override
+ public Row staticRow()
+ {
+ return allocator.ensureOnHeap().applyToStatic(super.staticRow());
+ }
+
+ @Override
+ public DecoratedKey partitionKey()
+ {
+ return allocator.ensureOnHeap().applyToPartitionKey(super.partitionKey());
+ }
+
+ @Override
+ public Row getRow(Clustering clustering)
+ {
+ return allocator.ensureOnHeap().applyToRow(super.getRow(clustering));
+ }
+
+ @Override
+ public Row lastRow()
+ {
+ return allocator.ensureOnHeap().applyToRow(super.lastRow());
+ }
+
+ @Override
+ public SearchIterator<Clustering, Row> searchIterator(ColumnFilter columns, boolean reversed)
+ {
+ return allocator.ensureOnHeap().applyToPartition(super.searchIterator(columns, reversed));
+ }
+
+ @Override
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+ {
+ return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed));
+ }
+
+ @Override
+ public UnfilteredRowIterator unfilteredIterator()
+ {
+ return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator());
+ }
+
+ @Override
+ public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed)
+ {
+ return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(current, selection, slices, reversed));
+ }
+
+ @Override
+ public Iterator<Row> iterator()
+ {
+ return allocator.ensureOnHeap().applyToPartition(super.iterator());
}
public boolean usePessimisticLocking()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 26a947b..70a4678 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -65,7 +65,7 @@ public class FilteredPartition extends ImmutableBTreePartition
public DecoratedKey partitionKey()
{
- return partitionKey;
+ return FilteredPartition.this.partitionKey();
}
public Row staticRow()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 882c0e0..1ea9713 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -17,15 +17,19 @@
*/
package org.apache.cassandra.db.rows;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Objects;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
* Base abstract class for {@code Cell} implementations.
@@ -40,6 +44,81 @@ public abstract class AbstractCell extends Cell
super(column);
}
+ public boolean isCounterCell()
+ {
+ return !isTombstone() && column.cellValueType().isCounter();
+ }
+
+ public boolean isLive(int nowInSec)
+ {
+ return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && nowInSec < localDeletionTime());
+ }
+
+ public boolean isTombstone()
+ {
+ return localDeletionTime() != NO_DELETION_TIME && ttl() == NO_TTL;
+ }
+
+ public boolean isExpiring()
+ {
+ return ttl() != NO_TTL;
+ }
+
+ public Cell markCounterLocalToBeCleared()
+ {
+ if (!isCounterCell())
+ return this;
+
+ ByteBuffer value = value();
+ ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value);
+ return marked == value ? this : new BufferCell(column, timestamp(), ttl(), localDeletionTime(), marked, path());
+ }
+
+ public Cell purge(DeletionPurger purger, int nowInSec)
+ {
+ if (!isLive(nowInSec))
+ {
+ if (purger.shouldPurge(timestamp(), localDeletionTime()))
+ return null;
+
+ // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
+ // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
+ // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
+ // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
+ // to do both here.
+ if (isExpiring())
+ {
+ // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+ // we'll fulfil our responsibility to repair. See discussion at
+ // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+ return BufferCell.tombstone(column, timestamp(), localDeletionTime() - ttl());
+ }
+ }
+ return this;
+ }
+
+ public Cell copy(AbstractAllocator allocator)
+ {
+ CellPath path = path();
+ return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), allocator.clone(value()), path == null ? null : path.copy(allocator));
+ }
+
+ // note: while the cell returned may be different, the value is the same, so if the value is offheap it must be referenced inside a guarded context (or copied)
+ public Cell updateAllTimestamp(long newTimestamp)
+ {
+ return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl(), localDeletionTime(), value(), path());
+ }
+
+ public int dataSize()
+ {
+ CellPath path = path();
+ return TypeSizes.sizeof(timestamp())
+ + TypeSizes.sizeof(ttl())
+ + TypeSizes.sizeof(localDeletionTime())
+ + value().remaining()
+ + (path == null ? 0 : path.dataSize());
+ }
+
public void digest(MessageDigest digest)
{
digest.update(value().duplicate());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index a0912ae..47cfd58 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -62,7 +62,11 @@ public class BTreeRow extends AbstractRow
// no expiring cells, this will be Integer.MAX_VALUE;
private final int minLocalDeletionTime;
- private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, int minLocalDeletionTime)
+ private BTreeRow(Clustering clustering,
+ LivenessInfo primaryKeyLivenessInfo,
+ Deletion deletion,
+ Object[] btree,
+ int minLocalDeletionTime)
{
assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
this.clustering = clustering;
@@ -78,7 +82,10 @@ public class BTreeRow extends AbstractRow
}
// Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
- public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree)
+ public static BTreeRow create(Clustering clustering,
+ LivenessInfo primaryKeyLivenessInfo,
+ Deletion deletion,
+ Object[] btree)
{
int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time()));
if (minDeletionTime != Integer.MIN_VALUE)
@@ -87,6 +94,15 @@ public class BTreeRow extends AbstractRow
minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
}
+ return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+ }
+
+ public static BTreeRow create(Clustering clustering,
+ LivenessInfo primaryKeyLivenessInfo,
+ Deletion deletion,
+ Object[] btree,
+ int minDeletionTime)
+ {
return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
}
@@ -113,7 +129,11 @@ public class BTreeRow extends AbstractRow
public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
{
assert !primaryKeyLivenessInfo.isEmpty();
- return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+ return new BTreeRow(clustering,
+ primaryKeyLivenessInfo,
+ Deletion.LIVE,
+ BTree.empty(),
+ minDeletionTime(primaryKeyLivenessInfo));
}
private static int minDeletionTime(Cell cell)
@@ -368,7 +388,7 @@ public class BTreeRow extends AbstractRow
return null;
int minDeletionTime = minDeletionTime(transformed, info, deletion.time());
- return new BTreeRow(clustering, info, deletion, transformed, minDeletionTime);
+ return BTreeRow.create(clustering, info, deletion, transformed, minDeletionTime);
}
public int dataSize()
@@ -594,7 +614,7 @@ public class BTreeRow extends AbstractRow
return new ComplexColumnData(column, btree, deletion);
}
- };
+ }
protected Clustering clustering;
protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
protected Deletion deletion = Deletion.LIVE;
@@ -680,10 +700,9 @@ public class BTreeRow extends AbstractRow
deletion = Deletion.LIVE;
int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion.time());
- Row row = new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+ Row row = BTreeRow.create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
reset();
return row;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 4176ba6..cac63ac 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -17,16 +17,11 @@
*/
package org.apache.cassandra.db.rows;
-import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.FBUtilities;
@@ -88,26 +83,6 @@ public class BufferCell extends AbstractCell
return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
}
- public boolean isCounterCell()
- {
- return !isTombstone() && column.cellValueType().isCounter();
- }
-
- public boolean isLive(int nowInSec)
- {
- return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && nowInSec < localDeletionTime);
- }
-
- public boolean isTombstone()
- {
- return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL;
- }
-
- public boolean isExpiring()
- {
- return ttl != NO_TTL;
- }
-
public long timestamp()
{
return timestamp;
@@ -146,216 +121,9 @@ public class BufferCell extends AbstractCell
return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
}
- public Cell markCounterLocalToBeCleared()
- {
- if (!isCounterCell())
- return this;
-
- ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value());
- return marked == value() ? this : new BufferCell(column, timestamp, ttl, localDeletionTime, marked, path);
- }
-
- public Cell purge(DeletionPurger purger, int nowInSec)
- {
- if (!isLive(nowInSec))
- {
- if (purger.shouldPurge(timestamp, localDeletionTime))
- return null;
-
- // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
- // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
- // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
- // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
- // to do both here.
- if (isExpiring())
- {
- // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
- // we'll fulfil our responsibility to repair. See discussion at
- // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
- return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl);
- }
- }
- return this;
- }
-
- public Cell updateAllTimestamp(long newTimestamp)
- {
- return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl, localDeletionTime, value, path);
- }
-
- public int dataSize()
- {
- return TypeSizes.sizeof(timestamp)
- + TypeSizes.sizeof(ttl)
- + TypeSizes.sizeof(localDeletionTime)
- + value.remaining()
- + (path == null ? 0 : path.dataSize());
- }
-
public long unsharedHeapSizeExcludingData()
{
return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
}
- /**
- * The serialization format for cell is:
- * [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ]
- * [ 1b ][ 8b (vint) ][ 4b (vint) ][ 4b (vint) ][ 4b (vint) ][ arb ][ 4b (vint) ][ arb ]
- *
- * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
- * meaning:
- * - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
- * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
- * - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
- * or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
- * - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
- * USE_ROW_TTL_MASK.
- * - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
- * for columns of this type have a fixed length.
- * - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
- * - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
- * - [ path ]: the cell path if the column this is a cell of is complex.
- */
- static class Serializer implements Cell.Serializer
- {
- private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not.
- private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring.
- private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
- private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
- private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
-
- public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
- {
- assert cell != null;
- boolean hasValue = cell.value().hasRemaining();
- boolean isDeleted = cell.isTombstone();
- boolean isExpiring = cell.isExpiring();
- boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
- boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
- int flags = 0;
- if (!hasValue)
- flags |= HAS_EMPTY_VALUE_MASK;
-
- if (isDeleted)
- flags |= IS_DELETED_MASK;
- else if (isExpiring)
- flags |= IS_EXPIRING_MASK;
-
- if (useRowTimestamp)
- flags |= USE_ROW_TIMESTAMP_MASK;
- if (useRowTTL)
- flags |= USE_ROW_TTL_MASK;
-
- out.writeByte((byte)flags);
-
- if (!useRowTimestamp)
- header.writeTimestamp(cell.timestamp(), out);
-
- if ((isDeleted || isExpiring) && !useRowTTL)
- header.writeLocalDeletionTime(cell.localDeletionTime(), out);
- if (isExpiring && !useRowTTL)
- header.writeTTL(cell.ttl(), out);
-
- if (cell.column().isComplex())
- cell.column().cellPathSerializer().serialize(cell.path(), out);
-
- if (hasValue)
- header.getType(cell.column()).writeValue(cell.value(), out);
- }
-
- public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
- {
- int flags = in.readUnsignedByte();
- boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
- boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
- boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
- boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
- boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
-
- long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
-
- int localDeletionTime = useRowTTL
- ? rowLiveness.localExpirationTime()
- : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
-
- int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
-
- CellPath path = column.isComplex()
- ? column.cellPathSerializer().deserialize(in)
- : null;
-
- boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
-
- ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
- if (hasValue)
- {
- if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
- {
- header.getType(column).skipValue(in);
- }
- else
- {
- value = header.getType(column).readValue(in);
- if (isCounter)
- value = helper.maybeClearCounterValue(value);
- }
- }
-
- return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
- }
-
- public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
- {
- long size = 1; // flags
- boolean hasValue = cell.value().hasRemaining();
- boolean isDeleted = cell.isTombstone();
- boolean isExpiring = cell.isExpiring();
- boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
- boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
-
- if (!useRowTimestamp)
- size += header.timestampSerializedSize(cell.timestamp());
-
- if ((isDeleted || isExpiring) && !useRowTTL)
- size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
- if (isExpiring && !useRowTTL)
- size += header.ttlSerializedSize(cell.ttl());
-
- if (cell.column().isComplex())
- size += cell.column().cellPathSerializer().serializedSize(cell.path());
-
- if (hasValue)
- size += header.getType(cell.column()).writtenLength(cell.value());
-
- return size;
- }
-
- // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
- public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
- {
- int flags = in.readUnsignedByte();
- boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
- boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
- boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
- boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
- boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
-
- if (!useRowTimestamp)
- header.skipTimestamp(in);
-
- if (!useRowTTL && (isDeleted || isExpiring))
- header.skipLocalDeletionTime(in);
-
- if (!useRowTTL && isExpiring)
- header.skipTTL(in);
-
- if (column.isComplex())
- column.cellPathSerializer().skip(in);
-
- if (hasValue)
- header.getType(column).skipValue(in);
-
- return true;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index 73d9e44..ad1c39a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.memory.AbstractAllocator;
/**
@@ -141,15 +142,165 @@ public abstract class Cell extends ColumnData
// Overrides super type to provide a more precise return type.
public abstract Cell purge(DeletionPurger purger, int nowInSec);
- public interface Serializer
+ /**
+ * The serialization format for cell is:
+ * [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ]
+ * [ 1b ][ 8b (vint) ][ 4b (vint) ][ 4b (vint) ][ 4b (vint) ][ arb ][ 4b (vint) ][ arb ]
+ *
+ * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
+ * meaning:
+ * - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
+ * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
+ * - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
+ * or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
+ * - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
+ * USE_ROW_TTL_MASK.
+ * - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
+ * for columns of this type have a fixed length.
+ * - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
+ * - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
+ * - [ path ]: the cell path if the column this is a cell of is complex.
+ */
+ static class Serializer
{
- public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException;
+ private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not.
+ private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring.
+ private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+ private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+ private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
+
+ public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
+ {
+ assert cell != null;
+ boolean hasValue = cell.value().hasRemaining();
+ boolean isDeleted = cell.isTombstone();
+ boolean isExpiring = cell.isExpiring();
+ boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+ boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+ int flags = 0;
+ if (!hasValue)
+ flags |= HAS_EMPTY_VALUE_MASK;
+
+ if (isDeleted)
+ flags |= IS_DELETED_MASK;
+ else if (isExpiring)
+ flags |= IS_EXPIRING_MASK;
+
+ if (useRowTimestamp)
+ flags |= USE_ROW_TIMESTAMP_MASK;
+ if (useRowTTL)
+ flags |= USE_ROW_TTL_MASK;
+
+ out.writeByte((byte)flags);
+
+ if (!useRowTimestamp)
+ header.writeTimestamp(cell.timestamp(), out);
+
+ if ((isDeleted || isExpiring) && !useRowTTL)
+ header.writeLocalDeletionTime(cell.localDeletionTime(), out);
+ if (isExpiring && !useRowTTL)
+ header.writeTTL(cell.ttl(), out);
+
+ if (cell.column().isComplex())
+ cell.column().cellPathSerializer().serialize(cell.path(), out);
+
+ if (hasValue)
+ header.getType(cell.column()).writeValue(cell.value(), out);
+ }
+
+ public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+ boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+ boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+ boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+ boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+ long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
+
+ int localDeletionTime = useRowTTL
+ ? rowLiveness.localExpirationTime()
+ : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
+
+ int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
- public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException;
+ CellPath path = column.isComplex()
+ ? column.cellPathSerializer().deserialize(in)
+ : null;
- public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header);
+ boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
+
+ ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ if (hasValue)
+ {
+ if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
+ {
+ header.getType(column).skipValue(in);
+ }
+ else
+ {
+ value = header.getType(column).readValue(in);
+ if (isCounter)
+ value = helper.maybeClearCounterValue(value);
+ }
+ }
+
+ return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
+ }
+
+ public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
+ {
+ long size = 1; // flags
+ boolean hasValue = cell.value().hasRemaining();
+ boolean isDeleted = cell.isTombstone();
+ boolean isExpiring = cell.isExpiring();
+ boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+ boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+
+ if (!useRowTimestamp)
+ size += header.timestampSerializedSize(cell.timestamp());
+
+ if ((isDeleted || isExpiring) && !useRowTTL)
+ size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
+ if (isExpiring && !useRowTTL)
+ size += header.ttlSerializedSize(cell.ttl());
+
+ if (cell.column().isComplex())
+ size += cell.column().cellPathSerializer().serializedSize(cell.path());
+
+ if (hasValue)
+ size += header.getType(cell.column()).writtenLength(cell.value());
+
+ return size;
+ }
// Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
- public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException;
+ public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
+ {
+ int flags = in.readUnsignedByte();
+ boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+ boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+ boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+ boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+ boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+ if (!useRowTimestamp)
+ header.skipTimestamp(in);
+
+ if (!useRowTTL && (isDeleted || isExpiring))
+ header.skipLocalDeletionTime(in);
+
+ if (!useRowTTL && isExpiring)
+ header.skipTTL(in);
+
+ if (column.isComplex())
+ column.cellPathSerializer().skip(in);
+
+ if (hasValue)
+ header.getType(column).skipValue(in);
+
+ return true;
+ }
}
}