You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:48 UTC
[24/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 1660b2e..0b00b47 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -18,22 +18,28 @@
package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
+import java.io.DataInput;
+import java.io.IOException;
import java.util.List;
+import java.util.Iterator;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.transport.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.ColumnSpecification;
import org.apache.cassandra.cql3.Lists;
import org.apache.cassandra.cql3.Maps;
import org.apache.cassandra.cql3.Sets;
-
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -47,6 +53,8 @@ public abstract class CollectionType<T> extends AbstractType<T>
public static final int MAX_ELEMENTS = 65535;
+ public static CellPath.Serializer cellPathSerializer = new CollectionPathSerializer();
+
public enum Kind
{
MAP
@@ -84,6 +92,8 @@ public abstract class CollectionType<T> extends AbstractType<T>
public abstract AbstractType<?> nameComparator();
public abstract AbstractType<?> valueComparator();
+ protected abstract List<ByteBuffer> serializedValues(Iterator<Cell> cells);
+
@Override
public abstract CollectionSerializer<T> getSerializer();
@@ -132,28 +142,33 @@ public abstract class CollectionType<T> extends AbstractType<T>
return kind == Kind.MAP;
}
- public List<Cell> enforceLimit(ColumnDefinition def, List<Cell> cells, int version)
+ // Overrided by maps
+ protected int collectionSize(List<ByteBuffer> values)
+ {
+ return values.size();
+ }
+
+ protected int enforceLimit(ColumnDefinition def, List<ByteBuffer> values, int version)
{
assert isMultiCell();
- if (version >= Server.VERSION_3 || cells.size() <= MAX_ELEMENTS)
- return cells;
+ int size = collectionSize(values);
+ if (version >= Server.VERSION_3 || size <= MAX_ELEMENTS)
+ return size;
logger.error("Detected collection for table {}.{} with {} elements, more than the {} limit. Only the first {}" +
" elements will be returned to the client. Please see " +
"http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.",
- def.ksName, def.cfName, cells.size(), MAX_ELEMENTS, MAX_ELEMENTS);
- return cells.subList(0, MAX_ELEMENTS);
+ def.ksName, def.cfName, values.size(), MAX_ELEMENTS, MAX_ELEMENTS);
+ return MAX_ELEMENTS;
}
- public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
-
- public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, List<Cell> cells, int version)
+ public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, Iterator<Cell> cells, int version)
{
assert isMultiCell();
- cells = enforceLimit(def, cells, version);
List<ByteBuffer> values = serializedValues(cells);
- return CollectionSerializer.pack(values, cells.size(), version);
+ int size = enforceLimit(def, values, version);
+ return CollectionSerializer.pack(values, size, version);
}
@Override
@@ -217,4 +232,28 @@ public abstract class CollectionType<T> extends AbstractType<T>
{
return this.toString(false);
}
+
+ private static class CollectionPathSerializer implements CellPath.Serializer
+ {
+ public void serialize(CellPath path, DataOutputPlus out) throws IOException
+ {
+ ByteBufferUtil.writeWithLength(path.get(0), out);
+ }
+
+ public CellPath deserialize(DataInput in) throws IOException
+ {
+ return CellPath.create(ByteBufferUtil.readWithLength(in));
+ }
+
+ public long serializedSize(CellPath path, TypeSizes sizes)
+ {
+ return sizes.sizeofWithLength(path.get(0));
+ }
+
+ public void skip(DataInput in) throws IOException
+ {
+ int length = in.readInt();
+ FileUtils.skipBytesFully(in, length);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
index 1d2c88c..a81d3f8 100644
--- a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
@@ -31,6 +31,9 @@ import org.apache.cassandra.serializers.BytesSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
+/*
+ * This class is deprecated and only kept for backward compatibility.
+ */
public class ColumnToCollectionType extends AbstractType<ByteBuffer>
{
// interning instances
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index f3c041e..01eb58f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -177,6 +177,7 @@ public class CompositeType extends AbstractCompositeType
// most names will be complete.
ByteBuffer[] l = new ByteBuffer[types.size()];
ByteBuffer bb = name.duplicate();
+ readStatic(bb);
int i = 0;
while (bb.remaining() > 0)
{
@@ -186,6 +187,19 @@ public class CompositeType extends AbstractCompositeType
return i == l.length ? l : Arrays.copyOfRange(l, 0, i);
}
+ public static List<ByteBuffer> splitName(ByteBuffer name)
+ {
+ List<ByteBuffer> l = new ArrayList<>();
+ ByteBuffer bb = name.duplicate();
+ readStatic(bb);
+ while (bb.remaining() > 0)
+ {
+ l.add(ByteBufferUtil.readBytesWithShortLength(bb));
+ bb.get(); // skip end-of-component
+ }
+ return l;
+ }
+
// Extract component idx from bb. Return null if there is not enough component.
public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
{
@@ -318,11 +332,20 @@ public class CompositeType extends AbstractCompositeType
public static ByteBuffer build(ByteBuffer... buffers)
{
- int totalLength = 0;
+ return build(false, buffers);
+ }
+
+ public static ByteBuffer build(boolean isStatic, ByteBuffer... buffers)
+ {
+ int totalLength = isStatic ? 2 : 0;
for (ByteBuffer bb : buffers)
totalLength += 2 + bb.remaining() + 1;
ByteBuffer out = ByteBuffer.allocate(totalLength);
+
+ if (isStatic)
+ out.putShort((short)STATIC_MARKER);
+
for (ByteBuffer bb : buffers)
{
ByteBufferUtil.writeShortLength(out, bb.remaining());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
index 4b3ce82..687e525 100644
--- a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.Term;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.serializers.CounterSerializer;
+import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
public class CounterColumnType extends AbstractType<Long>
@@ -59,6 +60,12 @@ public class CounterColumnType extends AbstractType<Long>
return ByteBufferUtil.bytes(value);
}
+ @Override
+ public void validateCellValue(ByteBuffer cellValue) throws MarshalException
+ {
+ CounterContext.instance().validateContext(cellValue);
+ }
+
public int compare(ByteBuffer o1, ByteBuffer o2)
{
return ByteBufferUtil.compareUnsigned(o1, o2);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/DateType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DateType.java b/src/java/org/apache/cassandra/db/marshal/DateType.java
index 359ce52..66da443 100644
--- a/src/java/org/apache/cassandra/db/marshal/DateType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DateType.java
@@ -124,4 +124,10 @@ public class DateType extends AbstractType<Date>
{
return TimestampSerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 8;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/DoubleType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DoubleType.java b/src/java/org/apache/cassandra/db/marshal/DoubleType.java
index 661b3c9..bc160d5 100644
--- a/src/java/org/apache/cassandra/db/marshal/DoubleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DoubleType.java
@@ -97,4 +97,10 @@ public class DoubleType extends AbstractType<Double>
{
return DoubleSerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 8;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/EmptyType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
index f82d767..448376f 100644
--- a/src/java/org/apache/cassandra/db/marshal/EmptyType.java
+++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
@@ -69,4 +69,10 @@ public class EmptyType extends AbstractType<Void>
{
return EmptySerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/FloatType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/FloatType.java b/src/java/org/apache/cassandra/db/marshal/FloatType.java
index af02cad..ceedce4 100644
--- a/src/java/org/apache/cassandra/db/marshal/FloatType.java
+++ b/src/java/org/apache/cassandra/db/marshal/FloatType.java
@@ -96,4 +96,10 @@ public class FloatType extends AbstractType<Float>
{
return FloatSerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 4;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/Int32Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/Int32Type.java b/src/java/org/apache/cassandra/db/marshal/Int32Type.java
index 67d8142..cb0c584 100644
--- a/src/java/org/apache/cassandra/db/marshal/Int32Type.java
+++ b/src/java/org/apache/cassandra/db/marshal/Int32Type.java
@@ -109,4 +109,9 @@ public class Int32Type extends AbstractType<Integer>
return Int32Serializer.instance;
}
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 4;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java b/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
index 3e00d71..174ce3a 100644
--- a/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
+++ b/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
@@ -83,4 +83,10 @@ public class LexicalUUIDType extends AbstractType<UUID>
{
return UUIDSerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 16;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 03f39d7..73af808 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -23,14 +23,13 @@ import java.util.*;
import org.apache.cassandra.cql3.Json;
import org.apache.cassandra.cql3.Lists;
import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.serializers.ListSerializer;
-import org.apache.cassandra.transport.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,12 +168,12 @@ public class ListType<T> extends CollectionType<List<T>>
return sb.toString();
}
- public List<ByteBuffer> serializedValues(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(Iterator<Cell> cells)
{
assert isMultiCell;
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
- for (Cell c : cells)
- bbs.add(c.value());
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>();
+ while (cells.hasNext())
+ bbs.add(cells.next().value());
return bbs;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
index 427598d..e02ba3c 100644
--- a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
+++ b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
@@ -20,11 +20,12 @@ package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
/** for sorting columns representing row keys in the row ordering as determined by a partitioner.
@@ -38,6 +39,11 @@ public class LocalByPartionerType extends AbstractType<ByteBuffer>
this.partitioner = partitioner;
}
+ public static LocalByPartionerType getInstance(TypeParser parser)
+ {
+ return new LocalByPartionerType(StorageService.getPartitioner());
+ }
+
@Override
public ByteBuffer compose(ByteBuffer bytes)
{
@@ -74,8 +80,8 @@ public class LocalByPartionerType extends AbstractType<ByteBuffer>
public int compare(ByteBuffer o1, ByteBuffer o2)
{
- // o1 and o2 can be empty so we need to use RowPosition, not DecoratedKey
- return RowPosition.ForKey.get(o1, partitioner).compareTo(RowPosition.ForKey.get(o2, partitioner));
+ // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
+ return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LongType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LongType.java b/src/java/org/apache/cassandra/db/marshal/LongType.java
index d77d7d0..9d41f4f 100644
--- a/src/java/org/apache/cassandra/db/marshal/LongType.java
+++ b/src/java/org/apache/cassandra/db/marshal/LongType.java
@@ -118,4 +118,9 @@ public class LongType extends AbstractType<Long>
return LongSerializer.instance;
}
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 8;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 983710b..28a4fd5 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -23,7 +23,7 @@ import java.util.*;
import org.apache.cassandra.cql3.Json;
import org.apache.cassandra.cql3.Maps;
import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.serializers.CollectionSerializer;
@@ -173,6 +173,11 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
}
@Override
+ protected int collectionSize(List<ByteBuffer> values)
+ {
+ return values.size() / 2;
+ }
+
public String toString(boolean ignoreFreezing)
{
boolean includeFrozenType = !ignoreFreezing && !isMultiCell();
@@ -186,13 +191,14 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
return sb.toString();
}
- public List<ByteBuffer> serializedValues(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(Iterator<Cell> cells)
{
assert isMultiCell;
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2);
- for (Cell c : cells)
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>();
+ while (cells.hasNext())
{
- bbs.add(c.name().collectionElement());
+ Cell c = cells.next();
+ bbs.add(c.path().get(0));
bbs.add(c.value());
}
return bbs;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ReversedType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ReversedType.java b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
index 2181f74..cf357a8 100644
--- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
@@ -80,6 +80,12 @@ public class ReversedType<T> extends AbstractType<T>
return baseType.compare(o2, o1);
}
+ @Override
+ public int compareForCQL(ByteBuffer v1, ByteBuffer v2)
+ {
+ return baseType.compare(v1, v2);
+ }
+
public String getString(ByteBuffer bytes)
{
return baseType.getString(bytes);
@@ -129,6 +135,12 @@ public class ReversedType<T> extends AbstractType<T>
}
@Override
+ protected int valueLengthIfFixed()
+ {
+ return baseType.valueLengthIfFixed();
+ }
+
+ @Override
public String toString()
{
return getClass().getName() + "(" + baseType + ")";
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 78aac25..126c6aa 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -23,12 +23,11 @@ import java.util.*;
import org.apache.cassandra.cql3.Json;
import org.apache.cassandra.cql3.Sets;
import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.serializers.SetSerializer;
-import org.apache.cassandra.transport.Server;
public class SetType<T> extends CollectionType<Set<T>>
{
@@ -144,11 +143,11 @@ public class SetType<T> extends CollectionType<Set<T>>
return sb.toString();
}
- public List<ByteBuffer> serializedValues(List<Cell> cells)
+ public List<ByteBuffer> serializedValues(Iterator<Cell> cells)
{
- List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
- for (Cell c : cells)
- bbs.add(c.name().collectionElement());
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>();
+ while (cells.hasNext())
+ bbs.add(cells.next().path().get(0));
return bbs;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java b/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
index a1d8d82..64fa750 100644
--- a/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
@@ -127,4 +127,10 @@ public class TimeUUIDType extends AbstractType<UUID>
{
return TimeUUIDSerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 16;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/TimestampType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TimestampType.java b/src/java/org/apache/cassandra/db/marshal/TimestampType.java
index b01651d..288f8fd 100644
--- a/src/java/org/apache/cassandra/db/marshal/TimestampType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TimestampType.java
@@ -125,4 +125,10 @@ public class TimestampType extends AbstractType<Date>
{
return TimestampSerializer.instance;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 8;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/UUIDType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UUIDType.java b/src/java/org/apache/cassandra/db/marshal/UUIDType.java
index 0250eb20..14c9f48 100644
--- a/src/java/org/apache/cassandra/db/marshal/UUIDType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UUIDType.java
@@ -168,4 +168,10 @@ public class UUIDType extends AbstractType<UUID>
{
return (uuid.get(6) & 0xf0) >> 4;
}
+
+ @Override
+ protected int valueLengthIfFixed()
+ {
+ return 16;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
new file mode 100644
index 0000000..2fcd7b3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract common class for all non-thread safe Partition implementations.
+ */
+public abstract class AbstractPartitionData implements Partition, Iterable<Row>
+{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionData.class);
+
+ protected final CFMetaData metadata;
+ protected final DecoratedKey key;
+
+ protected final DeletionInfo deletionInfo;
+ protected final PartitionColumns columns;
+
+ protected Row staticRow;
+
+ protected int rows;
+
+ // The values for the clustering columns of the rows contained in this partition object. If
+ // clusteringSize is the size of the clustering comparator for this table, clusterings has size
+ // clusteringSize * rows where rows is the number of rows stored, and row i has it's clustering
+ // column values at indexes [clusteringSize * i, clusteringSize * (i + 1)).
+ protected ByteBuffer[] clusterings;
+
+ // The partition key column liveness infos for the rows of this partition (row i has its liveness info at index i).
+ protected final LivenessInfoArray livenessInfos;
+ // The row deletion for the rows of this partition (row i has its row deletion at index i).
+ protected final DeletionTimeArray deletions;
+
+ // The row data (cells data + complex deletions for complex columns) for the rows contained in this partition.
+ protected final RowDataBlock data;
+
+ // Stats over the rows stored in this partition.
+ private final RowStats.Collector statsCollector = new RowStats.Collector();
+
+ // The maximum timestamp for any data contained in this partition.
+ protected long maxTimestamp = Long.MIN_VALUE;
+
+ private AbstractPartitionData(CFMetaData metadata,
+ DecoratedKey key,
+ DeletionInfo deletionInfo,
+ ByteBuffer[] clusterings,
+ LivenessInfoArray livenessInfos,
+ DeletionTimeArray deletions,
+ PartitionColumns columns,
+ RowDataBlock data)
+ {
+ this.metadata = metadata;
+ this.key = key;
+ this.deletionInfo = deletionInfo;
+ this.clusterings = clusterings;
+ this.livenessInfos = livenessInfos;
+ this.deletions = deletions;
+ this.columns = columns;
+ this.data = data;
+
+ collectStats(deletionInfo.getPartitionDeletion());
+ Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
+ while (iter.hasNext())
+ collectStats(iter.next().deletionTime());
+ }
+
+ protected AbstractPartitionData(CFMetaData metadata,
+ DecoratedKey key,
+ DeletionInfo deletionInfo,
+ PartitionColumns columns,
+ RowDataBlock data,
+ int initialRowCapacity)
+ {
+ this(metadata,
+ key,
+ deletionInfo,
+ new ByteBuffer[initialRowCapacity * metadata.clusteringColumns().size()],
+ new LivenessInfoArray(initialRowCapacity),
+ new DeletionTimeArray(initialRowCapacity),
+ columns,
+ data);
+ }
+
+ protected AbstractPartitionData(CFMetaData metadata,
+ DecoratedKey key,
+ DeletionTime partitionDeletion,
+ PartitionColumns columns,
+ int initialRowCapacity,
+ boolean sortable)
+ {
+ this(metadata,
+ key,
+ new DeletionInfo(partitionDeletion.takeAlias()),
+ columns,
+ new RowDataBlock(columns.regulars, initialRowCapacity, sortable, metadata.isCounter()),
+ initialRowCapacity);
+ }
+
+ private void collectStats(DeletionTime dt)
+ {
+ statsCollector.updateDeletionTime(dt);
+ maxTimestamp = Math.max(maxTimestamp, dt.markedForDeleteAt());
+ }
+
+ private void collectStats(LivenessInfo info)
+ {
+ statsCollector.updateTimestamp(info.timestamp());
+ statsCollector.updateTTL(info.ttl());
+ statsCollector.updateLocalDeletionTime(info.localDeletionTime());
+ maxTimestamp = Math.max(maxTimestamp, info.timestamp());
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return key;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return deletionInfo.getPartitionDeletion();
+ }
+
+ public PartitionColumns columns()
+ {
+ return columns;
+ }
+
+ public Row staticRow()
+ {
+ return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+ }
+
+ public RowStats stats()
+ {
+ return statsCollector.get();
+ }
+
+ /**
+ * The deletion info for the partition update.
+ *
+ * <b>warning:</b> the returned object should be used in a read-only fashion. In particular,
+ * it should not be used to add new range tombstones to this deletion. For that,
+ * {@link addRangeTombstone} should be used instead. The reason being that adding directly to
+ * the returned object would bypass some stats collection that {@code addRangeTombstone} does.
+ *
+ * @return the deletion info for the partition update for use as read-only.
+ */
+ public DeletionInfo deletionInfo()
+ {
+ // TODO: it is a tad fragile that deletionInfo can be but shouldn't be modified. We
+ // could add the option of providing a read-only view of a DeletionInfo instead.
+ return deletionInfo;
+ }
+
+ public void addPartitionDeletion(DeletionTime deletionTime)
+ {
+ collectStats(deletionTime);
+ deletionInfo.add(deletionTime);
+ }
+
+ public void addRangeTombstone(Slice deletedSlice, DeletionTime deletion)
+ {
+ addRangeTombstone(new RangeTombstone(deletedSlice, deletion.takeAlias()));
+ }
+
+ public void addRangeTombstone(RangeTombstone range)
+ {
+ collectStats(range.deletionTime());
+ deletionInfo.add(range, metadata.comparator);
+ }
+
+ /**
+ * Swap row i and j.
+ *
+ * This is only used when we need to reorder rows because those were not added in clustering order,
+ * which happens in {@link PartitionUpdate#sort} and {@link ArrayBackedPartition#create}. This method
+ * is public only because {@code PartitionUpdate} needs to implement {@link Sorting.Sortable}, but
+ * it should really only be used by subclasses (and with care) in practice.
+ */
+ public void swap(int i, int j)
+ {
+ int cs = metadata.clusteringColumns().size();
+ for (int k = 0; k < cs; k++)
+ {
+ ByteBuffer tmp = clusterings[j * cs + k];
+ clusterings[j * cs + k] = clusterings[i * cs + k];
+ clusterings[i * cs + k] = tmp;
+ }
+
+ livenessInfos.swap(i, j);
+ deletions.swap(i, j);
+ data.swap(i, j);
+ }
+
+ public int rowCount()
+ {
+ return rows;
+ }
+
+ public boolean isEmpty()
+ {
+ return deletionInfo.isLive() && rows == 0 && staticRow().isEmpty();
+ }
+
+ protected void clear()
+ {
+ rows = 0;
+ Arrays.fill(clusterings, null);
+ livenessInfos.clear();
+ deletions.clear();
+ data.clear();
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ CFMetaData metadata = metadata();
+ sb.append(String.format("Partition[%s.%s] key=%s columns=%s deletion=%s",
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(partitionKey().getKey()),
+ columns(),
+ deletionInfo));
+
+ if (staticRow() != Rows.EMPTY_STATIC_ROW)
+ sb.append("\n ").append(staticRow().toString(metadata, true));
+
+ // We use createRowIterator() directly instead of iterator() because that avoids
+ // sorting for PartitionUpdate (which inherit this method) and that is useful because
+ // 1) it can help with debugging and 2) we can't write after sorting but we want to
+ // be able to print an update while we build it (again for debugging)
+ Iterator<Row> iterator = createRowIterator(null, false);
+ while (iterator.hasNext())
+ sb.append("\n ").append(iterator.next().toString(metadata, true));
+
+ return sb.toString();
+ }
+
+ protected void reverse()
+ {
+ for (int i = 0; i < rows / 2; i++)
+ swap(i, rows - 1 - i);
+ }
+
+ public Row getRow(Clustering clustering)
+ {
+ Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
+ // Note that for statics, this will never return null, this will return an empty row. However,
+ // it's more consistent for this method to return null if we don't really have a static row.
+ return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+ }
+
+ /**
+ * Returns an iterator that iterators over the rows of this update in clustering order.
+ *
+ * @return an iterator over the rows of this update.
+ */
+ public Iterator<Row> iterator()
+ {
+ return createRowIterator(null, false);
+ }
+
+ public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed)
+ {
+ final RowIterator iter = createRowIterator(columns, reversed);
+ return new SearchIterator<Clustering, Row>()
+ {
+ public boolean hasNext()
+ {
+ return iter.hasNext();
+ }
+
+ public Row next(Clustering key)
+ {
+ if (key == Clustering.STATIC_CLUSTERING)
+ {
+ if (columns.fetchedColumns().statics.isEmpty() || staticRow().isEmpty())
+ return Rows.EMPTY_STATIC_ROW;
+
+ return FilteringRow.columnsFilteringRow(columns).setTo(staticRow());
+ }
+
+ return iter.seekTo(key) ? iter.next() : null;
+ }
+ };
+ }
+
+ public UnfilteredRowIterator unfilteredIterator()
+ {
+ return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
+ }
+
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed)
+ {
+ return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed));
+ }
+
+ protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
+ {
+ return sliceableUnfilteredIterator(ColumnFilter.selection(columns()), false);
+ }
+
+ protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(final ColumnFilter selection, final boolean reversed)
+ {
+ return new AbstractSliceableIterator(this, selection.fetchedColumns(), reversed)
+ {
+ private final RowIterator rowIterator = createRowIterator(selection, reversed);
+ private RowAndTombstoneMergeIterator mergeIterator = new RowAndTombstoneMergeIterator(metadata.comparator, reversed);
+
+ protected Unfiltered computeNext()
+ {
+ if (!mergeIterator.isSet())
+ mergeIterator.setTo(rowIterator, deletionInfo.rangeIterator(reversed));
+
+ return mergeIterator.hasNext() ? mergeIterator.next() : endOfData();
+ }
+
+ public Iterator<Unfiltered> slice(Slice slice)
+ {
+ return mergeIterator.setTo(rowIterator.slice(slice), deletionInfo.rangeIterator(slice, reversed));
+ }
+ };
+ }
+
+ private RowIterator createRowIterator(ColumnFilter columns, boolean reversed)
+ {
+ return reversed ? new ReverseRowIterator(columns) : new ForwardRowIterator(columns);
+ }
+
+ /**
+ * An iterator over the rows of this partition that reuse the same row object.
+ */
+ private abstract class RowIterator extends UnmodifiableIterator<Row>
+ {
+ protected final InternalReusableClustering clustering = new InternalReusableClustering();
+ protected final InternalReusableRow reusableRow;
+ protected final FilteringRow filter;
+
+ protected int next;
+
+ protected RowIterator(final ColumnFilter columns)
+ {
+ this.reusableRow = new InternalReusableRow(clustering);
+ this.filter = columns == null ? null : FilteringRow.columnsFilteringRow(columns);
+ }
+
+ /*
+ * Move the iterator so that row {@code name} is returned next by {@code next} if that
+ * row exists. Otherwise the first row sorting after {@code name} will be returned.
+ * Returns whether {@code name} was found or not.
+ */
+ public abstract boolean seekTo(Clustering name);
+
+ public abstract Iterator<Row> slice(Slice slice);
+
+ protected Row setRowTo(int row)
+ {
+ reusableRow.setTo(row);
+ return filter == null ? reusableRow : filter.setTo(reusableRow);
+ }
+
+ /**
+ * Simple binary search.
+ */
+ protected int binarySearch(ClusteringPrefix name, int fromIndex, int toIndex)
+ {
+ int low = fromIndex;
+ int mid = toIndex;
+ int high = mid - 1;
+ int result = -1;
+ while (low <= high)
+ {
+ mid = (low + high) >> 1;
+ if ((result = metadata.comparator.compare(name, clustering.setTo(mid))) > 0)
+ low = mid + 1;
+ else if (result == 0)
+ return mid;
+ else
+ high = mid - 1;
+ }
+ return -mid - (result < 0 ? 1 : 2);
+ }
+ }
+
+ private class ForwardRowIterator extends RowIterator
+ {
+ private ForwardRowIterator(ColumnFilter columns)
+ {
+ super(columns);
+ this.next = 0;
+ }
+
+ public boolean hasNext()
+ {
+ return next < rows;
+ }
+
+ public Row next()
+ {
+ return setRowTo(next++);
+ }
+
+ public boolean seekTo(Clustering name)
+ {
+ if (next >= rows)
+ return false;
+
+ int idx = binarySearch(name, next, rows);
+ next = idx >= 0 ? idx : -idx - 1;
+ return idx >= 0;
+ }
+
+ public Iterator<Row> slice(Slice slice)
+ {
+ int sidx = binarySearch(slice.start(), next, rows);
+ final int start = sidx >= 0 ? sidx : -sidx - 1;
+ if (start >= rows)
+ return Collections.emptyIterator();
+
+ int eidx = binarySearch(slice.end(), start, rows);
+ // The insertion point is the first element greater than slice.end(), so we want the previous index
+ final int end = eidx >= 0 ? eidx : -eidx - 2;
+
+ // Remember the end to speed up potential further slice search
+ next = end;
+
+ if (start > end)
+ return Collections.emptyIterator();
+
+ return new AbstractIterator<Row>()
+ {
+ private int i = start;
+
+ protected Row computeNext()
+ {
+ if (i >= rows || i > end)
+ return endOfData();
+
+ return setRowTo(i++);
+ }
+ };
+ }
+ }
+
+ private class ReverseRowIterator extends RowIterator
+ {
+ private ReverseRowIterator(ColumnFilter columns)
+ {
+ super(columns);
+ this.next = rows - 1;
+ }
+
+ public boolean hasNext()
+ {
+ return next >= 0;
+ }
+
+ public Row next()
+ {
+ return setRowTo(next--);
+ }
+
+ public boolean seekTo(Clustering name)
+ {
+ // We only use that method with forward iterators.
+ throw new UnsupportedOperationException();
+ }
+
+ public Iterator<Row> slice(Slice slice)
+ {
+ int sidx = binarySearch(slice.end(), 0, next + 1);
+ // The insertion point is the first element greater than slice.end(), so we want the previous index
+ final int start = sidx >= 0 ? sidx : -sidx - 2;
+ if (start < 0)
+ return Collections.emptyIterator();
+
+ int eidx = binarySearch(slice.start(), 0, start + 1);
+ final int end = eidx >= 0 ? eidx : -eidx - 1;
+
+ // Remember the end to speed up potential further slice search
+ next = end;
+
+ if (start < end)
+ return Collections.emptyIterator();
+
+ return new AbstractIterator<Row>()
+ {
+ private int i = start;
+
+ protected Row computeNext()
+ {
+ if (i < 0 || i < end)
+ return endOfData();
+
+ return setRowTo(i--);
+ }
+ };
+ }
+ }
+
+ /**
+ * A reusable view over the clustering of this partition.
+ */
+ protected class InternalReusableClustering extends Clustering
+ {
+ final int size = metadata.clusteringColumns().size();
+ private int base;
+
+ public int size()
+ {
+ return size;
+ }
+
+ public Clustering setTo(int row)
+ {
+ base = row * size;
+ return this;
+ }
+
+ public ByteBuffer get(int i)
+ {
+ return clusterings[base + i];
+ }
+
+ public ByteBuffer[] getRawValues()
+ {
+ ByteBuffer[] values = new ByteBuffer[size];
+ for (int i = 0; i < size; i++)
+ values[i] = get(i);
+ return values;
+ }
+ };
+
+ /**
+ * A reusable view over the rows of this partition.
+ */
+ protected class InternalReusableRow extends AbstractReusableRow
+ {
+ private final LivenessInfoArray.Cursor liveness = new LivenessInfoArray.Cursor();
+ private final DeletionTimeArray.Cursor deletion = new DeletionTimeArray.Cursor();
+ private final InternalReusableClustering clustering;
+
+ private int row;
+
+ public InternalReusableRow()
+ {
+ this(new InternalReusableClustering());
+ }
+
+ public InternalReusableRow(InternalReusableClustering clustering)
+ {
+ this.clustering = clustering;
+ }
+
+ protected RowDataBlock data()
+ {
+ return data;
+ }
+
+ public Row setTo(int row)
+ {
+ this.clustering.setTo(row);
+ this.liveness.setTo(livenessInfos, row);
+ this.deletion.setTo(deletions, row);
+ this.row = row;
+ return this;
+ }
+
+ protected int row()
+ {
+ return row;
+ }
+
+ public Clustering clustering()
+ {
+ return clustering;
+ }
+
+ public LivenessInfo primaryKeyLivenessInfo()
+ {
+ return liveness;
+ }
+
+ public DeletionTime deletion()
+ {
+ return deletion;
+ }
+ };
+
+ private static abstract class AbstractSliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator
+ {
+ private AbstractSliceableIterator(AbstractPartitionData data, PartitionColumns columns, boolean isReverseOrder)
+ {
+ super(data.metadata, data.key, data.partitionLevelDeletion(), columns, data.staticRow(), isReverseOrder, data.stats());
+ }
+ }
+
+ /**
+ * A row writer to add rows to this partition.
+ */
+ protected class Writer extends RowDataBlock.Writer
+ {
+ private int clusteringBase;
+
+ private int simpleColumnsSetInRow;
+ private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>();
+
+ public Writer(boolean inOrderCells)
+ {
+ super(data, inOrderCells);
+ }
+
+ public void writeClusteringValue(ByteBuffer value)
+ {
+ ensureCapacity(row);
+ clusterings[clusteringBase++] = value;
+ }
+
+ public void writePartitionKeyLivenessInfo(LivenessInfo info)
+ {
+ ensureCapacity(row);
+ livenessInfos.set(row, info);
+ collectStats(info);
+ }
+
+ public void writeRowDeletion(DeletionTime deletion)
+ {
+ ensureCapacity(row);
+ if (!deletion.isLive())
+ deletions.set(row, deletion);
+
+ collectStats(deletion);
+ }
+
+ @Override
+ public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+ {
+ ensureCapacity(row);
+ collectStats(info);
+
+ if (column.isComplex())
+ complexColumnsSetInRow.add(column);
+ else
+ ++simpleColumnsSetInRow;
+
+ super.writeCell(column, isCounter, value, info, path);
+ }
+
+ @Override
+ public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion)
+ {
+ ensureCapacity(row);
+ collectStats(complexDeletion);
+
+ super.writeComplexDeletion(c, complexDeletion);
+ }
+
+ @Override
+ public void endOfRow()
+ {
+ super.endOfRow();
+ ++rows;
+
+ statsCollector.updateColumnSetPerRow(simpleColumnsSetInRow + complexColumnsSetInRow.size());
+
+ simpleColumnsSetInRow = 0;
+ complexColumnsSetInRow.clear();
+ }
+
+ public int currentRow()
+ {
+ return row;
+ }
+
+ private void ensureCapacity(int rowToSet)
+ {
+ int originalCapacity = livenessInfos.size();
+ if (rowToSet < originalCapacity)
+ return;
+
+ int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
+
+ int clusteringSize = metadata.clusteringColumns().size();
+
+ clusterings = Arrays.copyOf(clusterings, newCapacity * clusteringSize);
+
+ livenessInfos.resize(newCapacity);
+ deletions.resize(newCapacity);
+ }
+
+ @Override
+ public Writer reset()
+ {
+ super.reset();
+ clusteringBase = 0;
+ simpleColumnsSetInRow = 0;
+ complexColumnsSetInRow.clear();
+ return this;
+ }
+ }
+
+ /**
+ * A range tombstone marker writer to add range tombstone markers to this partition.
+ */
+ protected class RangeTombstoneCollector implements RangeTombstoneMarker.Writer
+ {
+ private final boolean reversed;
+
+ private final ByteBuffer[] nextValues = new ByteBuffer[metadata().comparator.size()];
+ private int size;
+ private RangeTombstone.Bound.Kind nextKind;
+
+ private Slice.Bound openBound;
+ private DeletionTime openDeletion;
+
+ public RangeTombstoneCollector(boolean reversed)
+ {
+ this.reversed = reversed;
+ }
+
+ public void writeClusteringValue(ByteBuffer value)
+ {
+ nextValues[size++] = value;
+ }
+
+ public void writeBoundKind(RangeTombstone.Bound.Kind kind)
+ {
+ nextKind = kind;
+ }
+
+ private ByteBuffer[] getValues()
+ {
+ return Arrays.copyOfRange(nextValues, 0, size);
+ }
+
+ private void open(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
+ {
+ openBound = Slice.Bound.create(kind, getValues());
+ openDeletion = deletion.takeAlias();
+ }
+
+ private void close(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
+ {
+ assert deletion.equals(openDeletion) : "Expected " + openDeletion + " but was " + deletion;
+ Slice.Bound closeBound = Slice.Bound.create(kind, getValues());
+ Slice slice = reversed
+ ? Slice.make(closeBound, openBound)
+ : Slice.make(openBound, closeBound);
+ addRangeTombstone(slice, openDeletion);
+ }
+
+ public void writeBoundDeletion(DeletionTime deletion)
+ {
+ assert !nextKind.isBoundary();
+ if (nextKind.isOpen(reversed))
+ open(nextKind, deletion);
+ else
+ close(nextKind, deletion);
+ }
+
+ public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion)
+ {
+ assert nextKind.isBoundary();
+ DeletionTime closeTime = reversed ? startDeletion : endDeletion;
+ DeletionTime openTime = reversed ? endDeletion : startDeletion;
+
+ close(nextKind.closeBoundOfBoundary(reversed), closeTime);
+ open(nextKind.openBoundOfBoundary(reversed), openTime);
+ }
+
+ public void endOfMarker()
+ {
+ clear();
+ }
+
+ private void addRangeTombstone(Slice deletionSlice, DeletionTime dt)
+ {
+ AbstractPartitionData.this.addRangeTombstone(deletionSlice, dt);
+ }
+
+ private void clear()
+ {
+ size = 0;
+ Arrays.fill(nextValues, null);
+ nextKind = null;
+ }
+
+ public void reset()
+ {
+ openBound = null;
+ openDeletion = null;
+ clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..d615ea9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+public abstract class AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator
+{
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
new file mode 100644
index 0000000..68b3970
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+public class ArrayBackedCachedPartition extends ArrayBackedPartition implements CachedPartition
+{
+ private final int createdAtInSec;
+
+ // Note that those fields are really immutable, but we can't easily pass their values to
+ // the ctor so they are not final.
+ private int cachedLiveRows;
+ private int rowsWithNonExpiringCells;
+
+ private int nonTombstoneCellCount;
+ private int nonExpiringLiveCells;
+
+ private ArrayBackedCachedPartition(CFMetaData metadata,
+ DecoratedKey partitionKey,
+ DeletionTime deletionTime,
+ PartitionColumns columns,
+ int initialRowCapacity,
+ boolean sortable,
+ int createdAtInSec)
+ {
+ super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable);
+ this.createdAtInSec = createdAtInSec;
+ }
+
+ /**
+ * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator.
+ *
+ * Warning: Note that this method does not close the provided iterator and it is
+ * up to the caller to do so.
+ *
+ * @param iterator the iterator got gather in memory.
+ * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies.
+ * @return the created partition.
+ */
+ public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int nowInSec)
+ {
+ return create(iterator, 4, nowInSec);
+ }
+
+ /**
+ * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator.
+ *
+ * Warning: Note that this method does not close the provided iterator and it is
+ * up to the caller to do so.
+ *
+ * @param iterator the iterator got gather in memory.
+ * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
+ * correspond or be a good estimation of the number or rows in {@code iterator}.
+ * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies.
+ * @return the created partition.
+ */
+ public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec)
+ {
+ ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(iterator.metadata(),
+ iterator.partitionKey(),
+ iterator.partitionLevelDeletion(),
+ iterator.columns(),
+ initialRowCapacity,
+ iterator.isReverseOrder(),
+ nowInSec);
+
+ partition.staticRow = iterator.staticRow().takeAlias();
+
+ Writer writer = partition.new Writer(nowInSec);
+ RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder());
+
+ copyAll(iterator, writer, markerCollector, partition);
+
+ return partition;
+ }
+
+ public Row lastRow()
+ {
+ if (rows == 0)
+ return null;
+
+ return new InternalReusableRow().setTo(rows - 1);
+ }
+
+ /**
+ * The number of rows that were live at the time the partition was cached.
+ *
+ * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this.
+ *
+ * @return the number of rows in this partition that were live at the time the
+ * partition was cached (this can be different from the number of live rows now
+ * due to expiring cells).
+ */
+ public int cachedLiveRows()
+ {
+ return cachedLiveRows;
+ }
+
+ /**
+ * The number of rows in this cached partition that have at least one non-expiring
+ * non-deleted cell.
+ *
+ * Note that this is generally not a very meaningful number, but this is used by
+ * {@link DataLimits#hasEnoughLiveData} as an optimization.
+ *
+ * @return the number of row that have at least one non-expiring non-deleted cell.
+ */
+ public int rowsWithNonExpiringCells()
+ {
+ return rowsWithNonExpiringCells;
+ }
+
+ public int nonTombstoneCellCount()
+ {
+ return nonTombstoneCellCount;
+ }
+
+ public int nonExpiringLiveCells()
+ {
+ return nonExpiringLiveCells;
+ }
+
+ // Writers that collect the values for 'cachedLiveRows', 'rowsWithNonExpiringCells', 'nonTombstoneCellCount'
+ // and 'nonExpiringLiveCells'.
+ protected class Writer extends AbstractPartitionData.Writer
+ {
+ private final int nowInSec;
+
+ private boolean hasLiveData;
+ private boolean hasNonExpiringCell;
+
+ protected Writer(int nowInSec)
+ {
+ super(true);
+ this.nowInSec = nowInSec;
+ }
+
+ @Override
+ public void writePartitionKeyLivenessInfo(LivenessInfo info)
+ {
+ super.writePartitionKeyLivenessInfo(info);
+ if (info.isLive(nowInSec))
+ hasLiveData = true;
+ }
+
+ @Override
+ public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+ {
+ super.writeCell(column, isCounter, value, info, path);
+
+ if (info.isLive(nowInSec))
+ {
+ hasLiveData = true;
+ if (!info.hasTTL())
+ {
+ hasNonExpiringCell = true;
+ ++ArrayBackedCachedPartition.this.nonExpiringLiveCells;
+ }
+ }
+
+ if (!info.hasLocalDeletionTime() || info.hasTTL())
+ ++ArrayBackedCachedPartition.this.nonTombstoneCellCount;
+ }
+
+ @Override
+ public void endOfRow()
+ {
+ super.endOfRow();
+ if (hasLiveData)
+ ++ArrayBackedCachedPartition.this.cachedLiveRows;
+ if (hasNonExpiringCell)
+ ++ArrayBackedCachedPartition.this.rowsWithNonExpiringCells;
+
+ hasLiveData = false;
+ hasNonExpiringCell = false;
+ }
+ }
+
+ static class Serializer implements ISerializer<CachedPartition>
+ {
+ public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException
+ {
+ assert partition instanceof ArrayBackedCachedPartition;
+ ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
+
+ out.writeInt(p.createdAtInSec);
+ try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
+ {
+ UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rows);
+ }
+ }
+
+ public CachedPartition deserialize(DataInput in) throws IOException
+ {
+ // Note that it would be slightly simpler to just do
+ // ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...));
+ // However deserializing the header separatly is not a lot harder and allows us to:
+ // 1) get the capacity of the partition so we can size it properly directly
+ // 2) saves the creation of a temporary iterator: rows are directly written to the partition, which
+ // is slightly faster.
+
+ int createdAtInSec = in.readInt();
+
+ UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+ assert !h.isReversed && h.rowEstimate >= 0;
+
+ ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.rowEstimate, false, createdAtInSec);
+ partition.staticRow = h.staticRow;
+
+ Writer writer = partition.new Writer(createdAtInSec);
+ RangeTombstoneMarker.Writer markerWriter = partition.new RangeTombstoneCollector(false);
+
+ UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(MessagingService.current_version, SerializationHelper.Flag.LOCAL), h.sHeader, writer, markerWriter);
+ return partition;
+ }
+
+ public long serializedSize(CachedPartition partition, TypeSizes sizes)
+ {
+ assert partition instanceof ArrayBackedCachedPartition;
+ ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
+
+ try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
+ {
+ return sizes.sizeof(p.createdAtInSec)
+ + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rows, sizes);
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
new file mode 100644
index 0000000..d7f3a88
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+public class ArrayBackedPartition extends AbstractPartitionData
+{
+ protected ArrayBackedPartition(CFMetaData metadata,
+ DecoratedKey partitionKey,
+ DeletionTime deletionTime,
+ PartitionColumns columns,
+ int initialRowCapacity,
+ boolean sortable)
+ {
+ super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable);
+ }
+
+ /**
+ * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator.
+ *
+ * Warning: Note that this method does not close the provided iterator and it is
+ * up to the caller to do so.
+ *
+ * @param iterator the iterator to gather in memory.
+ * @return the created partition.
+ */
+ public static ArrayBackedPartition create(UnfilteredRowIterator iterator)
+ {
+ return create(iterator, 4);
+ }
+
+ /**
+ * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator.
+ *
+ * Warning: Note that this method does not close the provided iterator and it is
+ * up to the caller to do so.
+ *
+ * @param iterator the iterator to gather in memory.
+ * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
+ * correspond or be a good estimation of the number or rows in {@code iterator}.
+ * @return the created partition.
+ */
+ public static ArrayBackedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity)
+ {
+ ArrayBackedPartition partition = new ArrayBackedPartition(iterator.metadata(),
+ iterator.partitionKey(),
+ iterator.partitionLevelDeletion(),
+ iterator.columns(),
+ initialRowCapacity,
+ iterator.isReverseOrder());
+
+ partition.staticRow = iterator.staticRow().takeAlias();
+
+ Writer writer = partition.new Writer(true);
+ RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder());
+
+ copyAll(iterator, writer, markerCollector, partition);
+
+ return partition;
+ }
+
+ protected static void copyAll(UnfilteredRowIterator iterator, Writer writer, RangeTombstoneCollector markerCollector, ArrayBackedPartition partition)
+ {
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ ((Row) unfiltered).copyTo(writer);
+ else
+ ((RangeTombstoneMarker) unfiltered).copyTo(markerCollector);
+ }
+
+ // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering
+ // order. So if we've just added them in reverse clustering order, reverse them.
+ if (iterator.isReverseOrder())
+ partition.reverse();
+ }
+}