You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/28 10:04:02 UTC
[1/3] cassandra git commit: Serialize sstable row columns using
subset encoding
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 16b9f8bba -> 0d74c3ebf
refs/heads/trunk f744b6c05 -> b1ad7f0c7
Serialize sstable row columns using subset encoding
Instead of making an sstable-wide sparse/dense coding
decision, this patch encodes all rows using the Columns
subset encoding, that results in a small bitmap for tables
with fewer than 64 columns, and delta encoding when larger
patch by benedict; reviewed by sylvain for CASSANDRA-10045
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d74c3eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d74c3eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d74c3eb
Branch: refs/heads/cassandra-3.0
Commit: 0d74c3ebf76f4f7875f2a2e5dd25a7a1c1edfc0a
Parents: 16b9f8b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Aug 17 18:41:37 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 28 09:00:52 2015 +0100
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Columns.java | 46 ++--
.../org/apache/cassandra/db/LegacyLayout.java | 2 +-
.../apache/cassandra/db/PartitionColumns.java | 4 +-
.../cassandra/db/SerializationHeader.java | 18 --
.../columniterator/AbstractSSTableIterator.java | 2 +-
.../columniterator/SSTableReversedIterator.java | 2 +-
.../apache/cassandra/db/filter/DataLimits.java | 4 +-
.../db/partitions/PartitionUpdate.java | 4 +-
.../apache/cassandra/db/rows/AbstractRow.java | 3 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 12 +
.../apache/cassandra/db/rows/BufferCell.java | 31 +--
.../apache/cassandra/db/rows/EncodingStats.java | 37 +---
src/java/org/apache/cassandra/db/rows/Row.java | 2 +-
.../db/rows/UnfilteredRowIterators.java | 4 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 217 ++++++-------------
.../io/sstable/SSTableIdentityIterator.java | 2 +-
.../org/apache/cassandra/db/ColumnsTest.java | 8 +-
.../org/apache/cassandra/db/PartitionTest.java | 2 +-
test/unit/org/apache/cassandra/db/RowTest.java | 2 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../cassandra/io/sstable/SSTableUtils.java | 11 +-
21 files changed, 158 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index ddb9930..46e8401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
* Note that in practice, it will either store only static columns, or only regular ones. When
* we need both type of columns, we use a {@link PartitionColumns} object.
*/
-public class Columns implements Iterable<ColumnDefinition>
+public class Columns extends AbstractCollection<ColumnDefinition> implements Collection<ColumnDefinition>
{
public static final Serializer serializer = new Serializer();
public static final Columns NONE = new Columns(BTree.empty(), 0);
@@ -136,7 +136,7 @@ public class Columns implements Iterable<ColumnDefinition>
*
* @return the total number of columns in this object.
*/
- public int columnCount()
+ public int size()
{
return BTree.size(columns);
}
@@ -261,14 +261,16 @@ public class Columns implements Iterable<ColumnDefinition>
*
* @return whether all the columns of {@code other} are contained by this object.
*/
- public boolean contains(Columns other)
+ public boolean containsAll(Collection<?> other)
{
- if (other.columnCount() > columnCount())
+ if (other == this)
+ return true;
+ if (other.size() > this.size())
return false;
BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
- for (ColumnDefinition def : BTree.<ColumnDefinition>iterable(other.columns))
- if (iter.next(def) == null)
+ for (Object def : other)
+ if (iter.next((ColumnDefinition) def) == null)
return false;
return true;
}
@@ -379,28 +381,28 @@ public class Columns implements Iterable<ColumnDefinition>
@Override
public String toString()
{
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder("[");
boolean first = true;
for (ColumnDefinition def : this)
{
if (first) first = false; else sb.append(" ");
sb.append(def.name);
}
- return sb.toString();
+ return sb.append("]").toString();
}
public static class Serializer
{
public void serialize(Columns columns, DataOutputPlus out) throws IOException
{
- out.writeVInt(columns.columnCount());
+ out.writeVInt(columns.size());
for (ColumnDefinition column : columns)
ByteBufferUtil.writeWithVIntLength(column.name.bytes, out);
}
public long serializedSize(Columns columns)
{
- long size = TypeSizes.sizeofVInt(columns.columnCount());
+ long size = TypeSizes.sizeofVInt(columns.size());
for (ColumnDefinition column : columns)
size += ByteBufferUtil.serializedSizeWithVIntLength(column.name.bytes);
return size;
@@ -433,7 +435,7 @@ public class Columns implements Iterable<ColumnDefinition>
* If both ends have a pre-shared superset of the columns we are serializing, we can send them much
* more efficiently. Both ends must provide the identically same set of columns.
*/
- public void serializeSubset(Columns columns, Columns superset, DataOutputPlus out) throws IOException
+ public void serializeSubset(Collection<ColumnDefinition> columns, Columns superset, DataOutputPlus out) throws IOException
{
/**
* We weight this towards small sets, and sets where the majority of items are present, since
@@ -447,8 +449,8 @@ public class Columns implements Iterable<ColumnDefinition>
* to a vint encoded set of deltas, either adding or subtracting (whichever is most efficient).
* We indicate this switch by sending our bitmap with every bit set, i.e. -1L
*/
- int columnCount = columns.columnCount();
- int supersetCount = superset.columnCount();
+ int columnCount = columns.size();
+ int supersetCount = superset.size();
if (columnCount == supersetCount)
{
out.writeUnsignedVInt(0);
@@ -463,10 +465,10 @@ public class Columns implements Iterable<ColumnDefinition>
}
}
- public long serializedSubsetSize(Columns columns, Columns superset)
+ public long serializedSubsetSize(Collection<ColumnDefinition> columns, Columns superset)
{
- int columnCount = columns.columnCount();
- int supersetCount = superset.columnCount();
+ int columnCount = columns.size();
+ int supersetCount = superset.size();
if (columnCount == supersetCount)
{
return TypeSizes.sizeofUnsignedVInt(0);
@@ -488,7 +490,7 @@ public class Columns implements Iterable<ColumnDefinition>
{
return superset;
}
- else if (superset.columnCount() >= 64)
+ else if (superset.size() >= 64)
{
return deserializeLargeSubset(in, superset, (int) encoded);
}
@@ -512,7 +514,7 @@ public class Columns implements Iterable<ColumnDefinition>
// encodes a 1 bit for every *missing* column, on the assumption presence is more common,
// and because this is consistent with encoding 0 to represent all present
- private static long encodeBitmap(Columns columns, Columns superset, int supersetCount)
+ private static long encodeBitmap(Collection<ColumnDefinition> columns, Columns superset, int supersetCount)
{
long bitmap = 0L;
BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
@@ -521,7 +523,7 @@ public class Columns implements Iterable<ColumnDefinition>
for (ColumnDefinition column : columns)
{
if (iter.next(column) == null)
- throw new IllegalStateException();
+ throw new IllegalStateException(columns + " is not a subset of " + superset);
int currentIndex = iter.indexOfCurrent();
int count = currentIndex - expectIndex;
@@ -537,7 +539,7 @@ public class Columns implements Iterable<ColumnDefinition>
}
@DontInline
- private void serializeLargeSubset(Columns columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
+ private void serializeLargeSubset(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
{
// write flag indicating we're in lengthy mode
out.writeUnsignedVInt(supersetCount - columnCount);
@@ -572,7 +574,7 @@ public class Columns implements Iterable<ColumnDefinition>
@DontInline
private Columns deserializeLargeSubset(DataInputPlus in, Columns superset, int delta) throws IOException
{
- int supersetCount = superset.columnCount();
+ int supersetCount = superset.size();
int columnCount = supersetCount - delta;
BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
@@ -609,7 +611,7 @@ public class Columns implements Iterable<ColumnDefinition>
}
@DontInline
- private int serializeLargeSubsetSize(Columns columns, int columnCount, Columns superset, int supersetCount)
+ private int serializeLargeSubsetSize(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount)
{
// write flag indicating we're in lengthy mode
int size = TypeSizes.sizeofUnsignedVInt(supersetCount - columnCount);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 1c72d31..628ac75 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -534,7 +534,7 @@ public abstract class LegacyLayout
// TODO: there is in practice nothing to do here, but we need to handle the column_metadata for super columns somewhere else
throw new UnsupportedOperationException();
- Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.columnCount());
+ Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.size());
for (ColumnDefinition column : statics)
columnsToFetch.add(column.name.bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
index aa60198..e1008df 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -78,7 +78,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
public boolean includes(PartitionColumns columns)
{
- return statics.contains(columns.statics) && regulars.contains(columns.regulars);
+ return statics.containsAll(columns.statics) && regulars.containsAll(columns.regulars);
}
public Iterator<ColumnDefinition> iterator()
@@ -94,7 +94,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
/** * Returns the total number of static and regular columns. */
public int size()
{
- return regulars.columnCount() + statics.columnCount();
+ return regulars.size() + statics.size();
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 88f6832..8d4e604 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -54,9 +54,6 @@ public class SerializationHeader
private final Map<ByteBuffer, AbstractType<?>> typeMap;
- // Whether or not to store cell in a sparse or dense way. See UnfilteredSerializer for details.
- private final boolean useSparseColumnLayout;
-
private SerializationHeader(AbstractType<?> keyType,
List<AbstractType<?>> clusteringTypes,
PartitionColumns columns,
@@ -68,21 +65,6 @@ public class SerializationHeader
this.columns = columns;
this.stats = stats;
this.typeMap = typeMap;
-
- // For the dense layout, we have a 1 byte overhead for absent columns. For the sparse layout, it's a 1
- // overhead for present columns (in fact we use a 2 byte id, but assuming vint encoding, we'll pay 2 bytes
- // only for the columns after the 128th one and for simplicity we assume that once you have that many column,
- // you'll tend towards a clearly dense or clearly sparse case so that the heurstic above shouldn't still be
- // too inapropriate). So if on average more than half of our columns are set per row, we better go for sparse.
- this.useSparseColumnLayout = stats.avgColumnSetPerRow <= (columns.regulars.columnCount()/ 2);
- }
-
- public boolean useSparseColumnLayout(boolean isStatic)
- {
- // We always use a dense layout for the static row. Having very many static columns with only a few set at
- // any given time doesn't feel very common at all (and we already optimize the case where no static at all
- // are provided).
- return !isStatic && useSparseColumnLayout;
}
public static SerializationHeader forKeyCache(CFMetaData metadata)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 5e6165f..cf4bff7 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -214,7 +214,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
- return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+ return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
}
public boolean hasNext()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 4d2e294..a5a1938 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -74,7 +74,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
protected ReusablePartitionData createBuffer(int blocksCount)
{
int estimatedRowCount = 16;
- int columnCount = metadata().partitionColumns().regulars.columnCount();
+ int columnCount = metadata().partitionColumns().regulars.size();
if (columnCount == 0 || metadata().clusteringColumns().isEmpty())
{
estimatedRowCount = 1;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 3e608b4..0d6f816 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -270,7 +270,7 @@ public abstract class DataLimits
{
// TODO: we should start storing stats on the number of rows (instead of the number of cells, which
// is what getMeanColumns returns)
- float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+ float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return rowsPerPartition * (cfs.estimateKeys());
}
@@ -506,7 +506,7 @@ public abstract class DataLimits
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// remember that getMeansColumns returns a number of cells: we should clean nomenclature
- float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+ float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return cellsPerPartition * cfs.estimateKeys();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3d2c94b..5e056d2 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -529,7 +529,7 @@ public class PartitionUpdate extends AbstractBTreePartition
if (row.isStatic())
{
// We test for == first because in most case it'll be true and that is faster
- assert columns().statics == row.columns() || columns().statics.contains(row.columns());
+ assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
Row staticRow = holder.staticRow.isEmpty()
? row
: Rows.merge(holder.staticRow, row, createdAtInSec);
@@ -538,7 +538,7 @@ public class PartitionUpdate extends AbstractBTreePartition
else
{
// We test for == first because in most case it'll be true and that is faster
- assert columns().regulars == row.columns() || columns().regulars.contains(row.columns());
+ assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
rowBuilder.add(row);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 6090274..fca765f 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -18,6 +18,7 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.AbstractCollection;
import java.util.Objects;
import com.google.common.collect.Iterables;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.FBUtilities;
* Unless you have a very good reason not to, every row implementation
* should probably extend this class.
*/
-public abstract class AbstractRow implements Row
+public abstract class AbstractRow extends AbstractCollection<ColumnData> implements Row
{
public Unfiltered.Kind kind()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 7e50716..ed036af 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -22,6 +22,7 @@ import java.util.*;
import java.util.function.Predicate;
import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import com.google.common.collect.Iterators;
import org.apache.cassandra.config.CFMetaData;
@@ -35,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.btree.UpdateFunction;
/**
@@ -164,6 +166,11 @@ public class BTreeRow extends AbstractRow
return columns;
}
+ public Collection<ColumnDefinition> actualColumns()
+ {
+ return Collections2.transform(this, ColumnData::column);
+ }
+
public LivenessInfo primaryKeyLivenessInfo()
{
return primaryKeyLivenessInfo;
@@ -207,6 +214,11 @@ public class BTreeRow extends AbstractRow
return searchIterator();
}
+ public int size()
+ {
+ return BTree.size(btree);
+ }
+
public Iterable<Cell> cells()
{
return CellIterator::new;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index f9a3026..4176ba6 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -218,28 +218,21 @@ public class BufferCell extends AbstractCell
*/
static class Serializer implements Cell.Serializer
{
- private final static int PRESENCE_MASK = 0x01; // Marks the actual presence of a cell. This is used only when serialized on-disk and
- // on-wire (i.e. an actual ByteBufferBackedCell instance cannot have this flag set).
- private final static int IS_DELETED_MASK = 0x02; // Whether the cell is a tombstone or not.
- private final static int IS_EXPIRING_MASK = 0x04; // Whether the cell is expiring.
- private final static int HAS_EMPTY_VALUE_MASK = 0x08; // Wether the cell has an empty value. This will be the case for tombstone in particular.
- private final static int USE_ROW_TIMESTAMP_MASK = 0x10; // Wether the cell has the same timestamp than the row this is a cell of.
- private final static int USE_ROW_TTL_MASK = 0x20; // Wether the cell has the same ttl than the row this is a cell of.
+ private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not.
+ private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring.
+ private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+ private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+ private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
{
- if (cell == null)
- {
- out.writeByte((byte)0);
- return;
- }
-
+ assert cell != null;
boolean hasValue = cell.value().hasRemaining();
boolean isDeleted = cell.isTombstone();
boolean isExpiring = cell.isExpiring();
boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
- int flags = PRESENCE_MASK;
+ int flags = 0;
if (!hasValue)
flags |= HAS_EMPTY_VALUE_MASK;
@@ -273,9 +266,6 @@ public class BufferCell extends AbstractCell
public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
{
int flags = in.readUnsignedByte();
- if ((flags & PRESENCE_MASK) == 0)
- return null;
-
boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
@@ -317,10 +307,6 @@ public class BufferCell extends AbstractCell
public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
{
long size = 1; // flags
-
- if (cell == null)
- return size;
-
boolean hasValue = cell.value().hasRemaining();
boolean isDeleted = cell.isTombstone();
boolean isExpiring = cell.isExpiring();
@@ -348,9 +334,6 @@ public class BufferCell extends AbstractCell
public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
{
int flags = in.readUnsignedByte();
- if ((flags & PRESENCE_MASK) == 0)
- return false;
-
boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/EncodingStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index efa40ad..5f1a749 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -30,8 +30,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
* <p>
* Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
* the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
- * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
- * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
+ * delta-encode those information for the sake of vint encoding.
* <p>
* Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
* they are, the less effective the storage will be, but provided the stats are not completly wacky,
@@ -63,7 +62,7 @@ public class EncodingStats
}
// We should use this sparingly obviously
- public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH, -1);
+ public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH);
public static final Serializer serializer = new Serializer();
@@ -71,13 +70,9 @@ public class EncodingStats
public final int minLocalDeletionTime;
public final int minTTL;
- // Will be < 0 if the value is unknown
- public final int avgColumnSetPerRow;
-
public EncodingStats(long minTimestamp,
int minLocalDeletionTime,
- int minTTL,
- int avgColumnSetPerRow)
+ int minTTL)
{
// Note that the exact value of those don't impact correctness, just the efficiency of the encoding. So when we
// get a value for timestamp (resp. minLocalDeletionTime) that means 'no object had a timestamp' (resp. 'a local
@@ -87,7 +82,6 @@ public class EncodingStats
this.minTimestamp = minTimestamp == LivenessInfo.NO_TIMESTAMP ? TIMESTAMP_EPOCH : minTimestamp;
this.minLocalDeletionTime = minLocalDeletionTime == LivenessInfo.NO_EXPIRATION_TIME ? DELETION_TIME_EPOCH : minLocalDeletionTime;
this.minTTL = minTTL;
- this.avgColumnSetPerRow = avgColumnSetPerRow;
}
/**
@@ -110,11 +104,7 @@ public class EncodingStats
? that.minTTL
: (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
- int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
- ? that.avgColumnSetPerRow
- : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
-
- return new EncodingStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
+ return new EncodingStats(minTimestamp, minDelTime, minTTL);
}
@Override
@@ -125,8 +115,7 @@ public class EncodingStats
EncodingStats that = (EncodingStats) o;
- return this.avgColumnSetPerRow == that.avgColumnSetPerRow
- && this.minLocalDeletionTime == that.minLocalDeletionTime
+ return this.minLocalDeletionTime == that.minLocalDeletionTime
&& this.minTTL == that.minTTL
&& this.minTimestamp == that.minTimestamp;
}
@@ -134,13 +123,13 @@ public class EncodingStats
@Override
public int hashCode()
{
- return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL);
}
@Override
public String toString()
{
- return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL);
}
public static class Collector implements PartitionStatisticsCollector
@@ -237,8 +226,7 @@ public class EncodingStats
{
return new EncodingStats(isTimestampSet ? minTimestamp : TIMESTAMP_EPOCH,
isDelTimeSet ? minDeletionTime : DELETION_TIME_EPOCH,
- isTTLSet ? minTTL : TTL_EPOCH,
- isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+ isTTLSet ? minTTL : TTL_EPOCH);
}
public static EncodingStats collect(Row staticRow, Iterator<Row> rows, DeletionInfo deletionInfo)
@@ -260,15 +248,13 @@ public class EncodingStats
out.writeVInt(stats.minTimestamp - TIMESTAMP_EPOCH);
out.writeVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH);
out.writeVInt(stats.minTTL - TTL_EPOCH);
- out.writeVInt(stats.avgColumnSetPerRow);
}
public int serializedSize(EncodingStats stats)
{
return TypeSizes.sizeofVInt(stats.minTimestamp - TIMESTAMP_EPOCH)
- + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
- + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH)
- + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow);
+ + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
+ + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH);
}
public EncodingStats deserialize(DataInputPlus in) throws IOException
@@ -276,8 +262,7 @@ public class EncodingStats
long minTimestamp = in.readVInt() + TIMESTAMP_EPOCH;
int minLocalDeletionTime = (int)in.readVInt() + DELETION_TIME_EPOCH;
int minTTL = (int)in.readVInt() + TTL_EPOCH;
- int avgColumnSetPerRow = (int)in.readVInt();
- return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 996e89a..003dd04 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
* it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own
* row deletion.
*/
-public interface Row extends Unfiltered, Iterable<ColumnData>
+public interface Row extends Unfiltered, Collection<ColumnData>
{
/**
* The clustering values for this row.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 60f0dcb..48e00f9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -98,6 +98,8 @@ public abstract class UnfilteredRowIterators
*/
public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
{
+ PartitionColumns columns = staticRow == null ? PartitionColumns.NONE
+ : new PartitionColumns(staticRow.columns(), Columns.NONE);
return new UnfilteredRowIterator()
{
public CFMetaData metadata()
@@ -112,7 +114,7 @@ public abstract class UnfilteredRowIterators
public PartitionColumns columns()
{
- return PartitionColumns.NONE;
+ return columns;
}
public DecoratedKey partitionKey()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 14b06cf..0866810 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,11 +19,12 @@ package org.apache.cassandra.db.rows;
import java.io.IOException;
+import com.google.common.collect.Collections2;
+
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.SearchIterator;
/**
* Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@ -37,29 +38,20 @@ import org.apache.cassandra.utils.SearchIterator;
* {@code Clustering.serializer}. Note that static row are an exception and
* don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
* whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
- * complex ones. There is actually 2 slightly different possible layout for those
- * cell: a dense one and a sparse one. Which one is used depends on the serialization
- * header and more precisely of {@link SerializationHeader#useSparseColumnLayout(boolean)}:
- * 1) in the dense layout, there will be as many <sci> and <ccj> as there is columns
- * in the serialization header. *Each simple column <sci> will simply be a <cell>
- * (which might have no value, see below), while each <ccj> will be
- * [<delTime>]<cell1>...<celln><emptyCell> where <delTime> is the deletion for
- * this complex column (if flags indicates it present), <celln> are the <cell>
- * for this complex column and <emptyCell> is a last cell that will have no value
- * to indicate the end of this column.
- * 2) in the sparse layout, there won't be "empty" cells, i.e. only the column that
- * actually have a cell are represented. For that, each <sci> and <ccj> start
- * by a 2 byte index that points to the column in the header it belongs to. After
- * that, each <sci> and <ccj> is the same than for the dense layout. But contrarily
- * to the dense layout we won't know how many elements are serialized so a 2 byte
- * marker with a value of -1 will indicates the end of the row.
+ * complex ones.
+ * The columns for the row are then serialized if they differ from those in the header,
+ * and each cell then follows:
+ * * Each simple column <sci> will simply be a <cell>
+ * (which might have no value, see below),
+ * * Each <ccj> will be [<delTime>]<n><cell1>...<celln> where <delTime>
+ * is the deletion for this complex column (if flags indicates it present), <n>
+ * is the vint encoded value of n, i.e. <celln>'s 1-based index, <celli>
+ * are the <cell> for this complex column
* <marker> is <bound><deletion> where <bound> is the marker bound as serialized
* by {@code Slice.Bound.serializer} and <deletion> is the marker deletion
* time.
*
- * <cell> A cell start with a 1 byte <flag>. Thre rightmost 1st bit indicates
- * if there is actually a value for this cell. If this flag is unset,
- * nothing more follows for the cell. The 2nd and third flag indicates if
+ * <cell> A cell start with a 1 byte <flag>. The 2nd and third flag bits indicate if
* it's a deleted or expiring cell. The 4th flag indicates if the value
* is empty or not. The 5th and 6th indicates if the timestamp and ttl/
* localDeletionTime for the cell are the same than the row one (if that
@@ -85,6 +77,7 @@ public class UnfilteredSerializer
private final static int HAS_TTL = 0x10; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
private final static int HAS_DELETION = 0x20; // Whether the encoded row has some deletion info.
private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns.
+ private final static int HAS_ALL_COLUMNS = 0x80; // Whether the encoded row has all of the columns from the header present
public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
throws IOException
@@ -105,9 +98,11 @@ public class UnfilteredSerializer
int flags = 0;
boolean isStatic = row.isStatic();
+ Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
DeletionTime deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
+ boolean hasAllColumns = (row.size() == headerColumns.size());
if (isStatic)
flags |= IS_STATIC;
@@ -119,6 +114,8 @@ public class UnfilteredSerializer
flags |= HAS_DELETION;
if (hasComplexDeletion)
flags |= HAS_COMPLEX_DELETION;
+ if (hasAllColumns)
+ flags |= HAS_ALL_COLUMNS;
out.writeByte((byte)flags);
if (!isStatic)
@@ -134,55 +131,27 @@ public class UnfilteredSerializer
if ((flags & HAS_DELETION) != 0)
header.writeDeletionTime(deletion, out);
- Columns columns = header.columns(isStatic);
- int simpleCount = columns.simpleColumnCount();
- boolean useSparse = header.useSparseColumnLayout(isStatic);
- SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
- for (int i = 0; i < simpleCount; i++)
- writeSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, out, useSparse);
+ if (!hasAllColumns)
+ Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
- for (int i = simpleCount; i < columns.columnCount(); i++)
- writeComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, out, useSparse);
-
- if (useSparse)
- out.writeVInt(-1);
- }
-
- private void writeSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
- throws IOException
- {
- if (useSparse)
+ for (ColumnData data : row)
{
- if (cell == null)
- return;
-
- out.writeVInt(idx);
+ if (data.column.isSimple())
+ Cell.serializer.serialize((Cell) data, out, pkLiveness, header);
+ else
+ writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out);
}
- Cell.serializer.serialize(cell, out, rowLiveness, header);
}
- private void writeComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
+ private void writeComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out)
throws IOException
{
- if (useSparse)
- {
- if (data == null)
- return;
-
- out.writeVInt(idx);
- }
-
if (hasComplexDeletion)
header.writeDeletionTime(data == null ? DeletionTime.LIVE : data.complexDeletion(), out);
- if (data != null)
- {
- for (Cell cell : data)
- Cell.serializer.serialize(cell, out, rowLiveness, header);
- }
-
- Cell.serializer.serialize(null, out, rowLiveness, header);
+ out.writeUnsignedVInt(data.cellsCount());
+ for (Cell cell : data)
+ Cell.serializer.serialize(cell, out, rowLiveness, header);
}
public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
@@ -215,9 +184,11 @@ public class UnfilteredSerializer
long size = 1; // flags
boolean isStatic = row.isStatic();
+ Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
DeletionTime deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
+ boolean hasAllColumns = (row.size() == headerColumns.size());
if (!isStatic)
size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
@@ -232,57 +203,32 @@ public class UnfilteredSerializer
if (!deletion.isLive())
size += header.deletionTimeSerializedSize(deletion);
- Columns columns = header.columns(isStatic);
- int simpleCount = columns.simpleColumnCount();
- boolean useSparse = header.useSparseColumnLayout(isStatic);
- SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
- for (int i = 0; i < simpleCount; i++)
- size += sizeOfSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, useSparse);
-
- for (int i = simpleCount; i < columns.columnCount(); i++)
- size += sizeOfComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, useSparse);
-
- if (useSparse)
- size += TypeSizes.sizeofVInt(-1);
+ if (!hasAllColumns)
+ size += Columns.serializer.serializedSubsetSize(Collections2.transform(row, ColumnData::column), header.columns(isStatic));
- return size;
- }
-
- private long sizeOfSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
- {
- long size = 0;
- if (useSparse)
+ for (ColumnData data : row)
{
- if (cell == null)
- return size;
-
- size += TypeSizes.sizeofVInt(idx);
+ if (data.column.isSimple())
+ size += Cell.serializer.serializedSize((Cell) data, pkLiveness, header);
+ else
+ size += sizeOfComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header);
}
- return size + Cell.serializer.serializedSize(cell, rowLiveness, header);
+
+ return size;
}
- private long sizeOfComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
+ private long sizeOfComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
{
long size = 0;
- if (useSparse)
- {
- if (data == null)
- return size;
-
- size += TypeSizes.sizeofVInt(idx);
- }
if (hasComplexDeletion)
size += header.deletionTimeSerializedSize(data == null ? DeletionTime.LIVE : data.complexDeletion());
- if (data != null)
- {
- for (Cell cell : data)
- size += Cell.serializer.serializedSize(cell, rowLiveness, header);
- }
+ size += TypeSizes.sizeofUnsignedVInt(data.cellsCount());
+ for (Cell cell : data)
+ size += Cell.serializer.serializedSize(cell, rowLiveness, header);
- return size + Cell.serializer.serializedSize(null, rowLiveness, header);
+ return size;
}
public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
@@ -369,6 +315,8 @@ public class UnfilteredSerializer
boolean hasTTL = (flags & HAS_TTL) != 0;
boolean hasDeletion = (flags & HAS_DELETION) != 0;
boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+ boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+ Columns headerColumns = header.columns(isStatic);
LivenessInfo rowLiveness = LivenessInfo.EMPTY;
if (hasTimestamp)
@@ -382,33 +330,16 @@ public class UnfilteredSerializer
builder.addPrimaryKeyLivenessInfo(rowLiveness);
builder.addRowDeletion(hasDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE);
- Columns columns = header.columns(isStatic);
- if (header.useSparseColumnLayout(isStatic))
- {
- int count = columns.columnCount();
- int simpleCount = columns.simpleColumnCount();
- int i;
- while ((i = (int)in.readVInt()) >= 0)
- {
- if (i > count)
- throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
- if (i < simpleCount)
- readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
- else
- readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, builder, rowLiveness);
- }
- }
- else
+ Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+ for (ColumnDefinition column : columns)
{
- for (int i = 0; i < columns.simpleColumnCount(); i++)
- readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
-
- for (int i = 0; i < columns.complexColumnCount(); i++)
- readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, builder, rowLiveness);
+ if (column.isSimple())
+ readSimpleColumn(column, in, header, helper, builder, rowLiveness);
+ else
+ readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, rowLiveness);
}
- return builder.build();
+ return builder.build();
}
catch (RuntimeException | AssertionError e)
{
@@ -426,7 +357,7 @@ public class UnfilteredSerializer
if (helper.includes(column))
{
Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
- if (cell != null && !helper.isDropped(cell, false))
+ if (!helper.isDropped(cell, false))
builder.addCell(cell);
}
else
@@ -448,9 +379,10 @@ public class UnfilteredSerializer
builder.addComplexDeletion(column, complexDeletion);
}
- Cell cell;
- while ((cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper)) != null)
+ int count = (int) in.readUnsignedVInt();
+ while (--count >= 0)
{
+ Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
if (helper.includes(cell.path()) && !helper.isDropped(cell, true))
builder.addCell(cell);
}
@@ -470,6 +402,8 @@ public class UnfilteredSerializer
boolean hasTTL = (flags & HAS_TTL) != 0;
boolean hasDeletion = (flags & HAS_DELETION) != 0;
boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+ boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+ Columns headerColumns = header.columns(isStatic);
// Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
// the size we think due to VINT encoding
@@ -483,30 +417,13 @@ public class UnfilteredSerializer
if (hasDeletion)
header.skipDeletionTime(in);
- Columns columns = header.columns(isStatic);
- if (header.useSparseColumnLayout(isStatic))
- {
- int count = columns.columnCount();
- int simpleCount = columns.simpleColumnCount();
- int i;
- while ((i = (int)in.readVInt()) >= 0)
- {
- if (i > count)
- throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
- if (i < simpleCount)
- Cell.serializer.skip(in, columns.getSimple(i), header);
- else
- skipComplexColumn(in, columns.getComplex(i - simpleCount), header, hasComplexDeletion);
- }
- }
- else
+ Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+ for (ColumnDefinition column : columns)
{
- for (int i = 0; i < columns.simpleColumnCount(); i++)
- Cell.serializer.skip(in, columns.getSimple(i), header);
-
- for (int i = 0; i < columns.complexColumnCount(); i++)
- skipComplexColumn(in, columns.getComplex(i), header, hasComplexDeletion);
+ if (column.isSimple())
+ Cell.serializer.skip(in, column, header);
+ else
+ skipComplexColumn(in, column, header, hasComplexDeletion);
}
}
@@ -536,7 +453,9 @@ public class UnfilteredSerializer
if (hasComplexDeletion)
header.skipDeletionTime(in);
- while (Cell.serializer.skip(in, column, header));
+ int count = (int) in.readUnsignedVInt();
+ while (--count >= 0)
+ Cell.serializer.skip(in, column, header);
}
public static boolean isEndOfPartition(int flags)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 793cd81..acdf6bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -127,7 +127,7 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
- return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+ return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
}
public int compareTo(SSTableIdentityIterator o)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 1a91c3d..c080948 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -83,9 +83,9 @@ public class ColumnsTest
private void assertSubset(Columns superset, Columns subset)
{
- Assert.assertTrue(superset.contains(superset));
- Assert.assertTrue(superset.contains(subset));
- Assert.assertFalse(subset.contains(superset));
+ Assert.assertTrue(superset.containsAll(superset));
+ Assert.assertTrue(superset.containsAll(subset));
+ Assert.assertFalse(subset.containsAll(superset));
}
@Test
@@ -275,7 +275,7 @@ public class ColumnsTest
Columns subset = columns;
for (ColumnDefinition def : remove)
subset = subset.without(def);
- Assert.assertEquals(columns.columnCount() - remove.size(), subset.columnCount());
+ Assert.assertEquals(columns.size() - remove.size(), subset.size());
List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
remainingDefs.removeAll(remove);
return new ColumnsCheck(subset, remainingDefs);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index f651093..f0a63a8 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -105,7 +105,7 @@ public class PartitionTest
CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new DataInputBuffer(bufOut.getData()));
- assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount());
+ assertEquals(partition.columns().regulars.size(), deserialized.columns().regulars.size());
assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));
assertTrue(deserialized.columns().regulars.getSimple(5).equals(partition.columns().regulars.getSimple(5)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 3aaf21f..cd80a2f 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -141,7 +141,7 @@ public class RowTest
row = (Row) unfiltered;
assertEquals("a2", defA.cellValueType().getString(row.getCell(defA).value()));
assertEquals("b1", defB.cellValueType().getString(row.getCell(defB).value()));
- assertEquals(2, row.columns().columnCount());
+ assertEquals(2, row.columns().size());
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 5887fd4..e9bf4c5 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -325,8 +326,9 @@ public class ScrubTest
keys.size(),
0L,
0,
- SerializationHeader.make(cfs.metadata,
- Collections.emptyList())))
+ new SerializationHeader(cfs.metadata,
+ cfs.metadata.partitionColumns(),
+ EncodingStats.NO_STATS)))
{
for (String k : keys)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 89c0d61..7826317 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -179,9 +179,17 @@ public class SSTableUtils
public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
{
+ PartitionColumns.Builder builder = PartitionColumns.builder();
+ for (PartitionUpdate update : sorted.values())
+ builder.addAll(update.columns());
final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator();
return write(sorted.size(), new Appender()
{
+ public SerializationHeader header()
+ {
+ return new SerializationHeader(Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
+ }
+
@Override
public boolean append(SSTableTxnWriter writer) throws IOException
{
@@ -207,7 +215,7 @@ public class SSTableUtils
File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname);
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId);
- SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST);
+ SerializationHeader header = appender.header();
SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
while (appender.append(writer)) { /* pass */ }
Collection<SSTableReader> readers = writer.finish(true);
@@ -223,6 +231,7 @@ public class SSTableUtils
public static abstract class Appender
{
+ public abstract SerializationHeader header();
/** Called with an open writer until it returns false. */
public abstract boolean append(SSTableTxnWriter writer) throws IOException;
}
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1ad7f0c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1ad7f0c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1ad7f0c
Branch: refs/heads/trunk
Commit: b1ad7f0c7fb3b6799dc7babdf65fcbdcd2331d6e
Parents: f744b6c 0d74c3e
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Aug 28 09:03:41 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 28 09:03:41 2015 +0100
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Columns.java | 46 ++--
.../org/apache/cassandra/db/LegacyLayout.java | 2 +-
.../apache/cassandra/db/PartitionColumns.java | 4 +-
.../cassandra/db/SerializationHeader.java | 18 --
.../columniterator/AbstractSSTableIterator.java | 2 +-
.../columniterator/SSTableReversedIterator.java | 2 +-
.../apache/cassandra/db/filter/DataLimits.java | 4 +-
.../db/partitions/PartitionUpdate.java | 4 +-
.../apache/cassandra/db/rows/AbstractRow.java | 3 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 12 +
.../apache/cassandra/db/rows/BufferCell.java | 31 +--
.../apache/cassandra/db/rows/EncodingStats.java | 37 +---
src/java/org/apache/cassandra/db/rows/Row.java | 2 +-
.../db/rows/UnfilteredRowIterators.java | 4 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 217 ++++++-------------
.../io/sstable/SSTableIdentityIterator.java | 2 +-
.../org/apache/cassandra/db/ColumnsTest.java | 8 +-
.../org/apache/cassandra/db/PartitionTest.java | 2 +-
test/unit/org/apache/cassandra/db/RowTest.java | 2 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../cassandra/io/sstable/SSTableUtils.java | 11 +-
21 files changed, 158 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
[2/3] cassandra git commit: Serialize sstable row columns using
subset encoding
Posted by be...@apache.org.
Serialize sstable row columns using subset encoding
Instead of making an sstable-wide sparse/dense coding
decision, this patch encodes all rows using the Columns
subset encoding, that results in a small bitmap for tables
with fewer than 64 columns, and delta encoding when larger
patch by benedict; reviewed by sylvain for CASSANDRA-10045
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d74c3eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d74c3eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d74c3eb
Branch: refs/heads/trunk
Commit: 0d74c3ebf76f4f7875f2a2e5dd25a7a1c1edfc0a
Parents: 16b9f8b
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Aug 17 18:41:37 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Aug 28 09:00:52 2015 +0100
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Columns.java | 46 ++--
.../org/apache/cassandra/db/LegacyLayout.java | 2 +-
.../apache/cassandra/db/PartitionColumns.java | 4 +-
.../cassandra/db/SerializationHeader.java | 18 --
.../columniterator/AbstractSSTableIterator.java | 2 +-
.../columniterator/SSTableReversedIterator.java | 2 +-
.../apache/cassandra/db/filter/DataLimits.java | 4 +-
.../db/partitions/PartitionUpdate.java | 4 +-
.../apache/cassandra/db/rows/AbstractRow.java | 3 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 12 +
.../apache/cassandra/db/rows/BufferCell.java | 31 +--
.../apache/cassandra/db/rows/EncodingStats.java | 37 +---
src/java/org/apache/cassandra/db/rows/Row.java | 2 +-
.../db/rows/UnfilteredRowIterators.java | 4 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 217 ++++++-------------
.../io/sstable/SSTableIdentityIterator.java | 2 +-
.../org/apache/cassandra/db/ColumnsTest.java | 8 +-
.../org/apache/cassandra/db/PartitionTest.java | 2 +-
test/unit/org/apache/cassandra/db/RowTest.java | 2 +-
.../unit/org/apache/cassandra/db/ScrubTest.java | 6 +-
.../cassandra/io/sstable/SSTableUtils.java | 11 +-
21 files changed, 158 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index ddb9930..46e8401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -46,7 +46,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
* Note that in practice, it will either store only static columns, or only regular ones. When
* we need both type of columns, we use a {@link PartitionColumns} object.
*/
-public class Columns implements Iterable<ColumnDefinition>
+public class Columns extends AbstractCollection<ColumnDefinition> implements Collection<ColumnDefinition>
{
public static final Serializer serializer = new Serializer();
public static final Columns NONE = new Columns(BTree.empty(), 0);
@@ -136,7 +136,7 @@ public class Columns implements Iterable<ColumnDefinition>
*
* @return the total number of columns in this object.
*/
- public int columnCount()
+ public int size()
{
return BTree.size(columns);
}
@@ -261,14 +261,16 @@ public class Columns implements Iterable<ColumnDefinition>
*
* @return whether all the columns of {@code other} are contained by this object.
*/
- public boolean contains(Columns other)
+ public boolean containsAll(Collection<?> other)
{
- if (other.columnCount() > columnCount())
+ if (other == this)
+ return true;
+ if (other.size() > this.size())
return false;
BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = BTree.slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
- for (ColumnDefinition def : BTree.<ColumnDefinition>iterable(other.columns))
- if (iter.next(def) == null)
+ for (Object def : other)
+ if (iter.next((ColumnDefinition) def) == null)
return false;
return true;
}
@@ -379,28 +381,28 @@ public class Columns implements Iterable<ColumnDefinition>
@Override
public String toString()
{
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder("[");
boolean first = true;
for (ColumnDefinition def : this)
{
if (first) first = false; else sb.append(" ");
sb.append(def.name);
}
- return sb.toString();
+ return sb.append("]").toString();
}
public static class Serializer
{
public void serialize(Columns columns, DataOutputPlus out) throws IOException
{
- out.writeVInt(columns.columnCount());
+ out.writeVInt(columns.size());
for (ColumnDefinition column : columns)
ByteBufferUtil.writeWithVIntLength(column.name.bytes, out);
}
public long serializedSize(Columns columns)
{
- long size = TypeSizes.sizeofVInt(columns.columnCount());
+ long size = TypeSizes.sizeofVInt(columns.size());
for (ColumnDefinition column : columns)
size += ByteBufferUtil.serializedSizeWithVIntLength(column.name.bytes);
return size;
@@ -433,7 +435,7 @@ public class Columns implements Iterable<ColumnDefinition>
* If both ends have a pre-shared superset of the columns we are serializing, we can send them much
* more efficiently. Both ends must provide the identically same set of columns.
*/
- public void serializeSubset(Columns columns, Columns superset, DataOutputPlus out) throws IOException
+ public void serializeSubset(Collection<ColumnDefinition> columns, Columns superset, DataOutputPlus out) throws IOException
{
/**
* We weight this towards small sets, and sets where the majority of items are present, since
@@ -447,8 +449,8 @@ public class Columns implements Iterable<ColumnDefinition>
* to a vint encoded set of deltas, either adding or subtracting (whichever is most efficient).
* We indicate this switch by sending our bitmap with every bit set, i.e. -1L
*/
- int columnCount = columns.columnCount();
- int supersetCount = superset.columnCount();
+ int columnCount = columns.size();
+ int supersetCount = superset.size();
if (columnCount == supersetCount)
{
out.writeUnsignedVInt(0);
@@ -463,10 +465,10 @@ public class Columns implements Iterable<ColumnDefinition>
}
}
- public long serializedSubsetSize(Columns columns, Columns superset)
+ public long serializedSubsetSize(Collection<ColumnDefinition> columns, Columns superset)
{
- int columnCount = columns.columnCount();
- int supersetCount = superset.columnCount();
+ int columnCount = columns.size();
+ int supersetCount = superset.size();
if (columnCount == supersetCount)
{
return TypeSizes.sizeofUnsignedVInt(0);
@@ -488,7 +490,7 @@ public class Columns implements Iterable<ColumnDefinition>
{
return superset;
}
- else if (superset.columnCount() >= 64)
+ else if (superset.size() >= 64)
{
return deserializeLargeSubset(in, superset, (int) encoded);
}
@@ -512,7 +514,7 @@ public class Columns implements Iterable<ColumnDefinition>
// encodes a 1 bit for every *missing* column, on the assumption presence is more common,
// and because this is consistent with encoding 0 to represent all present
- private static long encodeBitmap(Columns columns, Columns superset, int supersetCount)
+ private static long encodeBitmap(Collection<ColumnDefinition> columns, Columns superset, int supersetCount)
{
long bitmap = 0L;
BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
@@ -521,7 +523,7 @@ public class Columns implements Iterable<ColumnDefinition>
for (ColumnDefinition column : columns)
{
if (iter.next(column) == null)
- throw new IllegalStateException();
+ throw new IllegalStateException(columns + " is not a subset of " + superset);
int currentIndex = iter.indexOfCurrent();
int count = currentIndex - expectIndex;
@@ -537,7 +539,7 @@ public class Columns implements Iterable<ColumnDefinition>
}
@DontInline
- private void serializeLargeSubset(Columns columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
+ private void serializeLargeSubset(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
{
// write flag indicating we're in lengthy mode
out.writeUnsignedVInt(supersetCount - columnCount);
@@ -572,7 +574,7 @@ public class Columns implements Iterable<ColumnDefinition>
@DontInline
private Columns deserializeLargeSubset(DataInputPlus in, Columns superset, int delta) throws IOException
{
- int supersetCount = superset.columnCount();
+ int supersetCount = superset.size();
int columnCount = supersetCount - delta;
BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
@@ -609,7 +611,7 @@ public class Columns implements Iterable<ColumnDefinition>
}
@DontInline
- private int serializeLargeSubsetSize(Columns columns, int columnCount, Columns superset, int supersetCount)
+ private int serializeLargeSubsetSize(Collection<ColumnDefinition> columns, int columnCount, Columns superset, int supersetCount)
{
// write flag indicating we're in lengthy mode
int size = TypeSizes.sizeofUnsignedVInt(supersetCount - columnCount);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 1c72d31..628ac75 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -534,7 +534,7 @@ public abstract class LegacyLayout
// TODO: there is in practice nothing to do here, but we need to handle the column_metadata for super columns somewhere else
throw new UnsupportedOperationException();
- Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.columnCount());
+ Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.size());
for (ColumnDefinition column : statics)
columnsToFetch.add(column.name.bytes);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
index aa60198..e1008df 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -78,7 +78,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
public boolean includes(PartitionColumns columns)
{
- return statics.contains(columns.statics) && regulars.contains(columns.regulars);
+ return statics.containsAll(columns.statics) && regulars.containsAll(columns.regulars);
}
public Iterator<ColumnDefinition> iterator()
@@ -94,7 +94,7 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
/** * Returns the total number of static and regular columns. */
public int size()
{
- return regulars.columnCount() + statics.columnCount();
+ return regulars.size() + statics.size();
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 88f6832..8d4e604 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -54,9 +54,6 @@ public class SerializationHeader
private final Map<ByteBuffer, AbstractType<?>> typeMap;
- // Whether or not to store cell in a sparse or dense way. See UnfilteredSerializer for details.
- private final boolean useSparseColumnLayout;
-
private SerializationHeader(AbstractType<?> keyType,
List<AbstractType<?>> clusteringTypes,
PartitionColumns columns,
@@ -68,21 +65,6 @@ public class SerializationHeader
this.columns = columns;
this.stats = stats;
this.typeMap = typeMap;
-
- // For the dense layout, we have a 1 byte overhead for absent columns. For the sparse layout, it's a 1
- // overhead for present columns (in fact we use a 2 byte id, but assuming vint encoding, we'll pay 2 bytes
- // only for the columns after the 128th one and for simplicity we assume that once you have that many column,
- // you'll tend towards a clearly dense or clearly sparse case so that the heurstic above shouldn't still be
- // too inapropriate). So if on average more than half of our columns are set per row, we better go for sparse.
- this.useSparseColumnLayout = stats.avgColumnSetPerRow <= (columns.regulars.columnCount()/ 2);
- }
-
- public boolean useSparseColumnLayout(boolean isStatic)
- {
- // We always use a dense layout for the static row. Having very many static columns with only a few set at
- // any given time doesn't feel very common at all (and we already optimize the case where no static at all
- // are provided).
- return !isStatic && useSparseColumnLayout;
}
public static SerializationHeader forKeyCache(CFMetaData metadata)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 5e6165f..cf4bff7 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -214,7 +214,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
- return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+ return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
}
public boolean hasNext()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 4d2e294..a5a1938 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -74,7 +74,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
protected ReusablePartitionData createBuffer(int blocksCount)
{
int estimatedRowCount = 16;
- int columnCount = metadata().partitionColumns().regulars.columnCount();
+ int columnCount = metadata().partitionColumns().regulars.size();
if (columnCount == 0 || metadata().clusteringColumns().isEmpty())
{
estimatedRowCount = 1;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 3e608b4..0d6f816 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -270,7 +270,7 @@ public abstract class DataLimits
{
// TODO: we should start storing stats on the number of rows (instead of the number of cells, which
// is what getMeanColumns returns)
- float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+ float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return rowsPerPartition * (cfs.estimateKeys());
}
@@ -506,7 +506,7 @@ public abstract class DataLimits
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// remember that getMeansColumns returns a number of cells: we should clean nomenclature
- float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount();
+ float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size();
return cellsPerPartition * cfs.estimateKeys();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3d2c94b..5e056d2 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -529,7 +529,7 @@ public class PartitionUpdate extends AbstractBTreePartition
if (row.isStatic())
{
// We test for == first because in most case it'll be true and that is faster
- assert columns().statics == row.columns() || columns().statics.contains(row.columns());
+ assert columns().statics.containsAll(row.columns()) : columns().statics + " is not superset of " + row.columns();
Row staticRow = holder.staticRow.isEmpty()
? row
: Rows.merge(holder.staticRow, row, createdAtInSec);
@@ -538,7 +538,7 @@ public class PartitionUpdate extends AbstractBTreePartition
else
{
// We test for == first because in most case it'll be true and that is faster
- assert columns().regulars == row.columns() || columns().regulars.contains(row.columns());
+ assert columns().regulars.containsAll(row.columns()) : columns().regulars + " is not superset of " + row.columns();
rowBuilder.add(row);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 6090274..fca765f 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -18,6 +18,7 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.AbstractCollection;
import java.util.Objects;
import com.google.common.collect.Iterables;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.FBUtilities;
* Unless you have a very good reason not to, every row implementation
* should probably extend this class.
*/
-public abstract class AbstractRow implements Row
+public abstract class AbstractRow extends AbstractCollection<ColumnData> implements Row
{
public Unfiltered.Kind kind()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 7e50716..ed036af 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -22,6 +22,7 @@ import java.util.*;
import java.util.function.Predicate;
import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import com.google.common.collect.Iterators;
import org.apache.cassandra.config.CFMetaData;
@@ -35,6 +36,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
import org.apache.cassandra.utils.btree.UpdateFunction;
/**
@@ -164,6 +166,11 @@ public class BTreeRow extends AbstractRow
return columns;
}
+ public Collection<ColumnDefinition> actualColumns()
+ {
+ return Collections2.transform(this, ColumnData::column);
+ }
+
public LivenessInfo primaryKeyLivenessInfo()
{
return primaryKeyLivenessInfo;
@@ -207,6 +214,11 @@ public class BTreeRow extends AbstractRow
return searchIterator();
}
+ public int size()
+ {
+ return BTree.size(btree);
+ }
+
public Iterable<Cell> cells()
{
return CellIterator::new;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index f9a3026..4176ba6 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -218,28 +218,21 @@ public class BufferCell extends AbstractCell
*/
static class Serializer implements Cell.Serializer
{
- private final static int PRESENCE_MASK = 0x01; // Marks the actual presence of a cell. This is used only when serialized on-disk and
- // on-wire (i.e. an actual ByteBufferBackedCell instance cannot have this flag set).
- private final static int IS_DELETED_MASK = 0x02; // Whether the cell is a tombstone or not.
- private final static int IS_EXPIRING_MASK = 0x04; // Whether the cell is expiring.
- private final static int HAS_EMPTY_VALUE_MASK = 0x08; // Wether the cell has an empty value. This will be the case for tombstone in particular.
- private final static int USE_ROW_TIMESTAMP_MASK = 0x10; // Wether the cell has the same timestamp than the row this is a cell of.
- private final static int USE_ROW_TTL_MASK = 0x20; // Wether the cell has the same ttl than the row this is a cell of.
+ private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not.
+ private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring.
+ private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+ private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+ private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
{
- if (cell == null)
- {
- out.writeByte((byte)0);
- return;
- }
-
+ assert cell != null;
boolean hasValue = cell.value().hasRemaining();
boolean isDeleted = cell.isTombstone();
boolean isExpiring = cell.isExpiring();
boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
- int flags = PRESENCE_MASK;
+ int flags = 0;
if (!hasValue)
flags |= HAS_EMPTY_VALUE_MASK;
@@ -273,9 +266,6 @@ public class BufferCell extends AbstractCell
public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
{
int flags = in.readUnsignedByte();
- if ((flags & PRESENCE_MASK) == 0)
- return null;
-
boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
@@ -317,10 +307,6 @@ public class BufferCell extends AbstractCell
public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
{
long size = 1; // flags
-
- if (cell == null)
- return size;
-
boolean hasValue = cell.value().hasRemaining();
boolean isDeleted = cell.isTombstone();
boolean isExpiring = cell.isExpiring();
@@ -348,9 +334,6 @@ public class BufferCell extends AbstractCell
public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
{
int flags = in.readUnsignedByte();
- if ((flags & PRESENCE_MASK) == 0)
- return false;
-
boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/EncodingStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index efa40ad..5f1a749 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -30,8 +30,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
* <p>
* Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely,
* the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to
- * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow}
- * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}).
+ * delta-encode those information for the sake of vint encoding.
* <p>
* Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate
* they are, the less effective the storage will be, but provided the stats are not completly wacky,
@@ -63,7 +62,7 @@ public class EncodingStats
}
// We should use this sparingly obviously
- public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH, -1);
+ public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH);
public static final Serializer serializer = new Serializer();
@@ -71,13 +70,9 @@ public class EncodingStats
public final int minLocalDeletionTime;
public final int minTTL;
- // Will be < 0 if the value is unknown
- public final int avgColumnSetPerRow;
-
public EncodingStats(long minTimestamp,
int minLocalDeletionTime,
- int minTTL,
- int avgColumnSetPerRow)
+ int minTTL)
{
// Note that the exact value of those don't impact correctness, just the efficiency of the encoding. So when we
// get a value for timestamp (resp. minLocalDeletionTime) that means 'no object had a timestamp' (resp. 'a local
@@ -87,7 +82,6 @@ public class EncodingStats
this.minTimestamp = minTimestamp == LivenessInfo.NO_TIMESTAMP ? TIMESTAMP_EPOCH : minTimestamp;
this.minLocalDeletionTime = minLocalDeletionTime == LivenessInfo.NO_EXPIRATION_TIME ? DELETION_TIME_EPOCH : minLocalDeletionTime;
this.minTTL = minTTL;
- this.avgColumnSetPerRow = avgColumnSetPerRow;
}
/**
@@ -110,11 +104,7 @@ public class EncodingStats
? that.minTTL
: (that.minTTL == TTL_EPOCH ? this.minTTL : Math.min(this.minTTL, that.minTTL));
- int avgColumnSetPerRow = this.avgColumnSetPerRow < 0
- ? that.avgColumnSetPerRow
- : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2);
-
- return new EncodingStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow);
+ return new EncodingStats(minTimestamp, minDelTime, minTTL);
}
@Override
@@ -125,8 +115,7 @@ public class EncodingStats
EncodingStats that = (EncodingStats) o;
- return this.avgColumnSetPerRow == that.avgColumnSetPerRow
- && this.minLocalDeletionTime == that.minLocalDeletionTime
+ return this.minLocalDeletionTime == that.minLocalDeletionTime
&& this.minTTL == that.minTTL
&& this.minTimestamp == that.minTimestamp;
}
@@ -134,13 +123,13 @@ public class EncodingStats
@Override
public int hashCode()
{
- return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL);
}
@Override
public String toString()
{
- return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ return String.format("EncodingStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL);
}
public static class Collector implements PartitionStatisticsCollector
@@ -237,8 +226,7 @@ public class EncodingStats
{
return new EncodingStats(isTimestampSet ? minTimestamp : TIMESTAMP_EPOCH,
isDelTimeSet ? minDeletionTime : DELETION_TIME_EPOCH,
- isTTLSet ? minTTL : TTL_EPOCH,
- isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1);
+ isTTLSet ? minTTL : TTL_EPOCH);
}
public static EncodingStats collect(Row staticRow, Iterator<Row> rows, DeletionInfo deletionInfo)
@@ -260,15 +248,13 @@ public class EncodingStats
out.writeVInt(stats.minTimestamp - TIMESTAMP_EPOCH);
out.writeVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH);
out.writeVInt(stats.minTTL - TTL_EPOCH);
- out.writeVInt(stats.avgColumnSetPerRow);
}
public int serializedSize(EncodingStats stats)
{
return TypeSizes.sizeofVInt(stats.minTimestamp - TIMESTAMP_EPOCH)
- + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
- + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH)
- + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow);
+ + TypeSizes.sizeofVInt(stats.minLocalDeletionTime - DELETION_TIME_EPOCH)
+ + TypeSizes.sizeofVInt(stats.minTTL - TTL_EPOCH);
}
public EncodingStats deserialize(DataInputPlus in) throws IOException
@@ -276,8 +262,7 @@ public class EncodingStats
long minTimestamp = in.readVInt() + TIMESTAMP_EPOCH;
int minLocalDeletionTime = (int)in.readVInt() + DELETION_TIME_EPOCH;
int minTTL = (int)in.readVInt() + TTL_EPOCH;
- int avgColumnSetPerRow = (int)in.readVInt();
- return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow);
+ return new EncodingStats(minTimestamp, minLocalDeletionTime, minTTL);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 996e89a..003dd04 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -43,7 +43,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction;
* it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own
* row deletion.
*/
-public interface Row extends Unfiltered, Iterable<ColumnData>
+public interface Row extends Unfiltered, Collection<ColumnData>
{
/**
* The clustering values for this row.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 60f0dcb..48e00f9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -98,6 +98,8 @@ public abstract class UnfilteredRowIterators
*/
public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
{
+ PartitionColumns columns = staticRow == null ? PartitionColumns.NONE
+ : new PartitionColumns(staticRow.columns(), Columns.NONE);
return new UnfilteredRowIterator()
{
public CFMetaData metadata()
@@ -112,7 +114,7 @@ public abstract class UnfilteredRowIterators
public PartitionColumns columns()
{
- return PartitionColumns.NONE;
+ return columns;
}
public DecoratedKey partitionKey()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 14b06cf..0866810 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,11 +19,12 @@ package org.apache.cassandra.db.rows;
import java.io.IOException;
+import com.google.common.collect.Collections2;
+
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.SearchIterator;
/**
* Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@ -37,29 +38,20 @@ import org.apache.cassandra.utils.SearchIterator;
* {@code Clustering.serializer}. Note that static row are an exception and
* don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion
* whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the
- * complex ones. There is actually 2 slightly different possible layout for those
- * cell: a dense one and a sparse one. Which one is used depends on the serialization
- * header and more precisely of {@link SerializationHeader#useSparseColumnLayout(boolean)}:
- * 1) in the dense layout, there will be as many <sci> and <ccj> as there is columns
- * in the serialization header. *Each simple column <sci> will simply be a <cell>
- * (which might have no value, see below), while each <ccj> will be
- * [<delTime>]<cell1>...<celln><emptyCell> where <delTime> is the deletion for
- * this complex column (if flags indicates it present), <celln> are the <cell>
- * for this complex column and <emptyCell> is a last cell that will have no value
- * to indicate the end of this column.
- * 2) in the sparse layout, there won't be "empty" cells, i.e. only the column that
- * actually have a cell are represented. For that, each <sci> and <ccj> start
- * by a 2 byte index that points to the column in the header it belongs to. After
- * that, each <sci> and <ccj> is the same than for the dense layout. But contrarily
- * to the dense layout we won't know how many elements are serialized so a 2 byte
- * marker with a value of -1 will indicates the end of the row.
+ * complex ones.
+ * The columns for the row are then serialized if they differ from those in the header,
+ * and each cell then follows:
+ * * Each simple column <sci> will simply be a <cell>
+ * (which might have no value, see below),
+ * * Each <ccj> will be [<delTime>]<n><cell1>...<celln> where <delTime>
+ * is the deletion for this complex column (if flags indicates it present), <n>
+ * is the vint encoded value of n, i.e. <celln>'s 1-based index, <celli>
+ * are the <cell> for this complex column
* <marker> is <bound><deletion> where <bound> is the marker bound as serialized
* by {@code Slice.Bound.serializer} and <deletion> is the marker deletion
* time.
*
- * <cell> A cell start with a 1 byte <flag>. Thre rightmost 1st bit indicates
- * if there is actually a value for this cell. If this flag is unset,
- * nothing more follows for the cell. The 2nd and third flag indicates if
+ * <cell> A cell start with a 1 byte <flag>. The 2nd and third flag bits indicate if
* it's a deleted or expiring cell. The 4th flag indicates if the value
* is empty or not. The 5th and 6th indicates if the timestamp and ttl/
* localDeletionTime for the cell are the same than the row one (if that
@@ -85,6 +77,7 @@ public class UnfilteredSerializer
private final static int HAS_TTL = 0x10; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true).
private final static int HAS_DELETION = 0x20; // Whether the encoded row has some deletion info.
private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns.
+ private final static int HAS_ALL_COLUMNS = 0x80; // Whether the encoded row has all of the columns from the header present
public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version)
throws IOException
@@ -105,9 +98,11 @@ public class UnfilteredSerializer
int flags = 0;
boolean isStatic = row.isStatic();
+ Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
DeletionTime deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
+ boolean hasAllColumns = (row.size() == headerColumns.size());
if (isStatic)
flags |= IS_STATIC;
@@ -119,6 +114,8 @@ public class UnfilteredSerializer
flags |= HAS_DELETION;
if (hasComplexDeletion)
flags |= HAS_COMPLEX_DELETION;
+ if (hasAllColumns)
+ flags |= HAS_ALL_COLUMNS;
out.writeByte((byte)flags);
if (!isStatic)
@@ -134,55 +131,27 @@ public class UnfilteredSerializer
if ((flags & HAS_DELETION) != 0)
header.writeDeletionTime(deletion, out);
- Columns columns = header.columns(isStatic);
- int simpleCount = columns.simpleColumnCount();
- boolean useSparse = header.useSparseColumnLayout(isStatic);
- SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
- for (int i = 0; i < simpleCount; i++)
- writeSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, out, useSparse);
+ if (!hasAllColumns)
+ Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out);
- for (int i = simpleCount; i < columns.columnCount(); i++)
- writeComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, out, useSparse);
-
- if (useSparse)
- out.writeVInt(-1);
- }
-
- private void writeSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
- throws IOException
- {
- if (useSparse)
+ for (ColumnData data : row)
{
- if (cell == null)
- return;
-
- out.writeVInt(idx);
+ if (data.column.isSimple())
+ Cell.serializer.serialize((Cell) data, out, pkLiveness, header);
+ else
+ writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out);
}
- Cell.serializer.serialize(cell, out, rowLiveness, header);
}
- private void writeComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
+ private void writeComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out)
throws IOException
{
- if (useSparse)
- {
- if (data == null)
- return;
-
- out.writeVInt(idx);
- }
-
if (hasComplexDeletion)
header.writeDeletionTime(data == null ? DeletionTime.LIVE : data.complexDeletion(), out);
- if (data != null)
- {
- for (Cell cell : data)
- Cell.serializer.serialize(cell, out, rowLiveness, header);
- }
-
- Cell.serializer.serialize(null, out, rowLiveness, header);
+ out.writeUnsignedVInt(data.cellsCount());
+ for (Cell cell : data)
+ Cell.serializer.serialize(cell, out, rowLiveness, header);
}
public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version)
@@ -215,9 +184,11 @@ public class UnfilteredSerializer
long size = 1; // flags
boolean isStatic = row.isStatic();
+ Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
DeletionTime deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
+ boolean hasAllColumns = (row.size() == headerColumns.size());
if (!isStatic)
size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes());
@@ -232,57 +203,32 @@ public class UnfilteredSerializer
if (!deletion.isLive())
size += header.deletionTimeSerializedSize(deletion);
- Columns columns = header.columns(isStatic);
- int simpleCount = columns.simpleColumnCount();
- boolean useSparse = header.useSparseColumnLayout(isStatic);
- SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator();
-
- for (int i = 0; i < simpleCount; i++)
- size += sizeOfSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, useSparse);
-
- for (int i = simpleCount; i < columns.columnCount(); i++)
- size += sizeOfComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, useSparse);
-
- if (useSparse)
- size += TypeSizes.sizeofVInt(-1);
+ if (!hasAllColumns)
+ size += Columns.serializer.serializedSubsetSize(Collections2.transform(row, ColumnData::column), header.columns(isStatic));
- return size;
- }
-
- private long sizeOfSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
- {
- long size = 0;
- if (useSparse)
+ for (ColumnData data : row)
{
- if (cell == null)
- return size;
-
- size += TypeSizes.sizeofVInt(idx);
+ if (data.column.isSimple())
+ size += Cell.serializer.serializedSize((Cell) data, pkLiveness, header);
+ else
+ size += sizeOfComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header);
}
- return size + Cell.serializer.serializedSize(cell, rowLiveness, header);
+
+ return size;
}
- private long sizeOfComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse)
+ private long sizeOfComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header)
{
long size = 0;
- if (useSparse)
- {
- if (data == null)
- return size;
-
- size += TypeSizes.sizeofVInt(idx);
- }
if (hasComplexDeletion)
size += header.deletionTimeSerializedSize(data == null ? DeletionTime.LIVE : data.complexDeletion());
- if (data != null)
- {
- for (Cell cell : data)
- size += Cell.serializer.serializedSize(cell, rowLiveness, header);
- }
+ size += TypeSizes.sizeofUnsignedVInt(data.cellsCount());
+ for (Cell cell : data)
+ size += Cell.serializer.serializedSize(cell, rowLiveness, header);
- return size + Cell.serializer.serializedSize(null, rowLiveness, header);
+ return size;
}
public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version)
@@ -369,6 +315,8 @@ public class UnfilteredSerializer
boolean hasTTL = (flags & HAS_TTL) != 0;
boolean hasDeletion = (flags & HAS_DELETION) != 0;
boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+ boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+ Columns headerColumns = header.columns(isStatic);
LivenessInfo rowLiveness = LivenessInfo.EMPTY;
if (hasTimestamp)
@@ -382,33 +330,16 @@ public class UnfilteredSerializer
builder.addPrimaryKeyLivenessInfo(rowLiveness);
builder.addRowDeletion(hasDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE);
- Columns columns = header.columns(isStatic);
- if (header.useSparseColumnLayout(isStatic))
- {
- int count = columns.columnCount();
- int simpleCount = columns.simpleColumnCount();
- int i;
- while ((i = (int)in.readVInt()) >= 0)
- {
- if (i > count)
- throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
- if (i < simpleCount)
- readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
- else
- readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, builder, rowLiveness);
- }
- }
- else
+ Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+ for (ColumnDefinition column : columns)
{
- for (int i = 0; i < columns.simpleColumnCount(); i++)
- readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness);
-
- for (int i = 0; i < columns.complexColumnCount(); i++)
- readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, builder, rowLiveness);
+ if (column.isSimple())
+ readSimpleColumn(column, in, header, helper, builder, rowLiveness);
+ else
+ readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, rowLiveness);
}
- return builder.build();
+ return builder.build();
}
catch (RuntimeException | AssertionError e)
{
@@ -426,7 +357,7 @@ public class UnfilteredSerializer
if (helper.includes(column))
{
Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
- if (cell != null && !helper.isDropped(cell, false))
+ if (!helper.isDropped(cell, false))
builder.addCell(cell);
}
else
@@ -448,9 +379,10 @@ public class UnfilteredSerializer
builder.addComplexDeletion(column, complexDeletion);
}
- Cell cell;
- while ((cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper)) != null)
+ int count = (int) in.readUnsignedVInt();
+ while (--count >= 0)
{
+ Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
if (helper.includes(cell.path()) && !helper.isDropped(cell, true))
builder.addCell(cell);
}
@@ -470,6 +402,8 @@ public class UnfilteredSerializer
boolean hasTTL = (flags & HAS_TTL) != 0;
boolean hasDeletion = (flags & HAS_DELETION) != 0;
boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+ boolean hasAllColumns = (flags & HAS_ALL_COLUMNS) != 0;
+ Columns headerColumns = header.columns(isStatic);
// Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have
// the size we think due to VINT encoding
@@ -483,30 +417,13 @@ public class UnfilteredSerializer
if (hasDeletion)
header.skipDeletionTime(in);
- Columns columns = header.columns(isStatic);
- if (header.useSparseColumnLayout(isStatic))
- {
- int count = columns.columnCount();
- int simpleCount = columns.simpleColumnCount();
- int i;
- while ((i = (int)in.readVInt()) >= 0)
- {
- if (i > count)
- throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));
-
- if (i < simpleCount)
- Cell.serializer.skip(in, columns.getSimple(i), header);
- else
- skipComplexColumn(in, columns.getComplex(i - simpleCount), header, hasComplexDeletion);
- }
- }
- else
+ Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in);
+ for (ColumnDefinition column : columns)
{
- for (int i = 0; i < columns.simpleColumnCount(); i++)
- Cell.serializer.skip(in, columns.getSimple(i), header);
-
- for (int i = 0; i < columns.complexColumnCount(); i++)
- skipComplexColumn(in, columns.getComplex(i), header, hasComplexDeletion);
+ if (column.isSimple())
+ Cell.serializer.skip(in, column, header);
+ else
+ skipComplexColumn(in, column, header, hasComplexDeletion);
}
}
@@ -536,7 +453,9 @@ public class UnfilteredSerializer
if (hasComplexDeletion)
header.skipDeletionTime(in);
- while (Cell.serializer.skip(in, column, header));
+ int count = (int) in.readUnsignedVInt();
+ while (--count >= 0)
+ Cell.serializer.skip(in, column, header);
}
public static boolean isEndOfPartition(int flags)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 793cd81..acdf6bb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -127,7 +127,7 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem
{
// We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
// SerializationHeader.make() for details) so we use the latter instead.
- return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
+ return new EncodingStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL());
}
public int compareTo(SSTableIdentityIterator o)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 1a91c3d..c080948 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -83,9 +83,9 @@ public class ColumnsTest
private void assertSubset(Columns superset, Columns subset)
{
- Assert.assertTrue(superset.contains(superset));
- Assert.assertTrue(superset.contains(subset));
- Assert.assertFalse(subset.contains(superset));
+ Assert.assertTrue(superset.containsAll(superset));
+ Assert.assertTrue(superset.containsAll(subset));
+ Assert.assertFalse(subset.containsAll(superset));
}
@Test
@@ -275,7 +275,7 @@ public class ColumnsTest
Columns subset = columns;
for (ColumnDefinition def : remove)
subset = subset.without(def);
- Assert.assertEquals(columns.columnCount() - remove.size(), subset.columnCount());
+ Assert.assertEquals(columns.size() - remove.size(), subset.size());
List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
remainingDefs.removeAll(remove);
return new ColumnsCheck(subset, remainingDefs);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index f651093..f0a63a8 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -105,7 +105,7 @@ public class PartitionTest
CachedPartition deserialized = CachedPartition.cacheSerializer.deserialize(new DataInputBuffer(bufOut.getData()));
- assertEquals(partition.columns().regulars.columnCount(), deserialized.columns().regulars.columnCount());
+ assertEquals(partition.columns().regulars.size(), deserialized.columns().regulars.size());
assertTrue(deserialized.columns().regulars.getSimple(1).equals(partition.columns().regulars.getSimple(1)));
assertTrue(deserialized.columns().regulars.getSimple(5).equals(partition.columns().regulars.getSimple(5)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/RowTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowTest.java b/test/unit/org/apache/cassandra/db/RowTest.java
index 3aaf21f..cd80a2f 100644
--- a/test/unit/org/apache/cassandra/db/RowTest.java
+++ b/test/unit/org/apache/cassandra/db/RowTest.java
@@ -141,7 +141,7 @@ public class RowTest
row = (Row) unfiltered;
assertEquals("a2", defA.cellValueType().getString(row.getCell(defA).value()));
assertEquals("b1", defB.cellValueType().getString(row.getCell(defB).value()));
- assertEquals(2, row.columns().columnCount());
+ assertEquals(2, row.columns().size());
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 5887fd4..e9bf4c5 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -325,8 +326,9 @@ public class ScrubTest
keys.size(),
0L,
0,
- SerializationHeader.make(cfs.metadata,
- Collections.emptyList())))
+ new SerializationHeader(cfs.metadata,
+ cfs.metadata.partitionColumns(),
+ EncodingStats.NO_STATS)))
{
for (String k : keys)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d74c3eb/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 89c0d61..7826317 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -179,9 +179,17 @@ public class SSTableUtils
public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
{
+ PartitionColumns.Builder builder = PartitionColumns.builder();
+ for (PartitionUpdate update : sorted.values())
+ builder.addAll(update.columns());
final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator();
return write(sorted.size(), new Appender()
{
+ public SerializationHeader header()
+ {
+ return new SerializationHeader(Schema.instance.getCFMetaData(ksname, cfname), builder.build(), EncodingStats.NO_STATS);
+ }
+
@Override
public boolean append(SSTableTxnWriter writer) throws IOException
{
@@ -207,7 +215,7 @@ public class SSTableUtils
File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname);
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId);
- SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST);
+ SerializationHeader header = appender.header();
SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
while (appender.append(writer)) { /* pass */ }
Collection<SSTableReader> readers = writer.finish(true);
@@ -223,6 +231,7 @@ public class SSTableUtils
public static abstract class Appender
{
+ public abstract SerializationHeader header();
/** Called with an open writer until it returns false. */
public abstract boolean append(SSTableTxnWriter writer) throws IOException;
}