You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/07/22 18:05:40 UTC
[11/15] cassandra git commit: Simplify some 8099's implementations
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
index 54feb85..33a0917 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.filter;
-import java.io.DataInput;
import java.io.IOException;
import org.apache.cassandra.config.CFMetaData;
@@ -26,6 +25,7 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
/**
@@ -146,7 +146,7 @@ public interface ClusteringIndexFilter
public interface Serializer
{
public void serialize(ClusteringIndexFilter filter, DataOutputPlus out, int version) throws IOException;
- public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException;
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException;
public long serializedSize(ClusteringIndexFilter filter, int version);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index f2cc46f..13329f3 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.filter;
-import java.io.DataInput;
import java.io.IOException;
import java.util.*;
@@ -27,6 +26,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTreeSet;
@@ -94,6 +94,9 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
public boolean isFullyCoveredBy(CachedPartition partition)
{
+ if (partition.isEmpty())
+ return false;
+
// 'partition' contains all columns, so it covers our filter if our last clusterings
// is smaller than the last in the cache
return clusterings.comparator().compare(clusterings.last(), partition.lastRow().clustering()) <= 0;
@@ -109,18 +112,18 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
{
// Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
// the range extend) and it's harmless to left them.
- return new FilteringRowIterator(iterator)
+ return new AlteringUnfilteredRowIterator(iterator)
{
@Override
- public FilteringRow makeRowFilter()
+ public Row computeNextStatic(Row row)
{
- return FilteringRow.columnsFilteringRow(columnFilter);
+ return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata());
}
@Override
- protected boolean includeRow(Row row)
+ public Row computeNext(Row row)
{
- return clusterings.contains(row.clustering());
+ return clusterings.contains(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null;
}
};
}
@@ -214,7 +217,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
sb.append(i++ == 0 ? "" : ", ").append(clustering.toString(metadata));
if (reversed)
sb.append(", reversed");
- return sb.append(")").toString();
+ return sb.append(')').toString();
}
public String toCQLString(CFMetaData metadata)
@@ -223,7 +226,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
return "";
StringBuilder sb = new StringBuilder();
- sb.append("(").append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(")");
+ sb.append('(').append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(')');
sb.append(clusterings.size() == 1 ? " = " : " IN (");
int i = 0;
for (Clustering clustering : clusterings)
@@ -258,13 +261,13 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
private static class NamesDeserializer extends InternalDeserializer
{
- public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
{
ClusteringComparator comparator = metadata.comparator;
BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(comparator);
int size = in.readInt();
for (int i = 0; i < size; i++)
- clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes()).takeAlias());
+ clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes()));
return new ClusteringIndexNamesFilter(clusterings.build(), reversed);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index 8fb319e..4f0e4e2 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.filter;
-import java.io.DataInput;
import java.io.IOException;
import java.util.List;
import java.nio.ByteBuffer;
@@ -28,6 +27,7 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
/**
@@ -91,24 +91,24 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
// Note that we don't filter markers because that's a bit trickier (we don't know in advance until when
// the range extend) and it's harmless to leave them.
- return new FilteringRowIterator(iterator)
+ return new AlteringUnfilteredRowIterator(iterator)
{
@Override
- public FilteringRow makeRowFilter()
+ public boolean hasNext()
{
- return FilteringRow.columnsFilteringRow(columnFilter);
+ return !tester.isDone() && super.hasNext();
}
@Override
- protected boolean includeRow(Row row)
+ public Row computeNextStatic(Row row)
{
- return tester.includes(row.clustering());
+ return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata());
}
@Override
- public boolean hasNext()
+ public Row computeNext(Row row)
{
- return !tester.isDone() && super.hasNext();
+ return tester.includes(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null;
}
};
}
@@ -170,7 +170,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
private static class SliceDeserializer extends InternalDeserializer
{
- public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException
+ public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
{
Slices slices = Slices.serializer.deserialize(in, version, metadata);
return new ClusteringIndexSliceFilter(slices, reversed);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index b98108d..084bad6 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.filter;
-import java.io.DataInput;
import java.io.IOException;
import java.util.*;
@@ -30,8 +29,8 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
@@ -52,15 +51,6 @@ public class ColumnFilter
{
public static final Serializer serializer = new Serializer();
- private static final Comparator<ColumnSubselection> valueComparator = new Comparator<ColumnSubselection>()
- {
- public int compare(ColumnSubselection s1, ColumnSubselection s2)
- {
- assert s1.column().name.equals(s2.column().name);
- return s1.column().cellPathComparator().compare(s1.minIncludedPath(), s2.minIncludedPath());
- }
- };
-
// Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
// by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped.
// Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all.
@@ -115,6 +105,11 @@ public class ColumnFilter
return isFetchAll ? metadata.partitionColumns() : selection;
}
+ public boolean includesAllColumns()
+ {
+ return isFetchAll;
+ }
+
/**
* Whether the provided column is selected by this selection.
*/
@@ -144,7 +139,7 @@ public class ColumnFilter
return true;
for (ColumnSubselection subSel : s)
- if (subSel.includes(cell.path()))
+ if (subSel.compareInclusionOf(cell.path()) == 0)
return true;
return false;
@@ -163,7 +158,7 @@ public class ColumnFilter
return false;
for (ColumnSubselection subSel : s)
- if (subSel.includes(path))
+ if (subSel.compareInclusionOf(path) == 0)
return false;
return true;
@@ -182,7 +177,7 @@ public class ColumnFilter
if (s.isEmpty())
return null;
- return new Tester(s.iterator());
+ return new Tester(isFetchAll, s.iterator());
}
/**
@@ -205,46 +200,43 @@ public class ColumnFilter
public static class Tester
{
+ private final boolean isFetchAll;
private ColumnSubselection current;
private final Iterator<ColumnSubselection> iterator;
- private Tester(Iterator<ColumnSubselection> iterator)
+ private Tester(boolean isFetchAll, Iterator<ColumnSubselection> iterator)
{
+ this.isFetchAll = isFetchAll;
this.iterator = iterator;
}
public boolean includes(CellPath path)
{
- while (current == null)
- {
- if (!iterator.hasNext())
- return false;
-
- current = iterator.next();
- if (current.includes(path))
- return true;
-
- if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0)
- current = null;
- }
- return false;
+ return isFetchAll || includedBySubselection(path);
}
public boolean canSkipValue(CellPath path)
{
- while (current == null)
+ return isFetchAll && !includedBySubselection(path);
+ }
+
+ private boolean includedBySubselection(CellPath path)
+ {
+ while (current != null || iterator.hasNext())
{
- if (!iterator.hasNext())
- return false;
+ if (current == null)
+ current = iterator.next();
- current = iterator.next();
- if (current.includes(path))
+ int cmp = current.compareInclusionOf(path);
+ if (cmp == 0) // The path is included
+ return true;
+ else if (cmp < 0) // The path is before this sub-selection, it's not included by any
return false;
- if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0)
- current = null;
+ // the path is after this sub-selection, we need to check the next one.
+ current = null;
}
- return true;
+ return false;
}
}
@@ -302,7 +294,7 @@ public class ColumnFilter
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null;
if (subSelections != null)
{
- s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), valueComparator);
+ s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
for (ColumnSubselection subSelection : subSelections)
s.put(subSelection.column().name, subSelection);
}
@@ -317,6 +309,9 @@ public class ColumnFilter
if (selection == null)
return "*";
+ if (selection.isEmpty())
+ return "";
+
Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
StringBuilder sb = new StringBuilder();
appendColumnDef(sb, defs.next());
@@ -351,7 +346,7 @@ public class ColumnFilter
private static final int HAS_SELECTION_MASK = 0x02;
private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
- private int makeHeaderByte(ColumnFilter selection)
+ private static int makeHeaderByte(ColumnFilter selection)
{
return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
| (selection.selection != null ? HAS_SELECTION_MASK : 0)
@@ -376,7 +371,7 @@ public class ColumnFilter
}
}
- public ColumnFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+ public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
{
int header = in.readUnsignedByte();
boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0;
@@ -394,7 +389,7 @@ public class ColumnFilter
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
if (hasSubSelections)
{
- subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), valueComparator);
+ subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder());
int size = in.readUnsignedShort();
for (int i = 0; i < size; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
index 652e27c..e45dbee 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.filter;
-import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
@@ -29,6 +28,7 @@ import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -38,7 +38,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
* This only make sense for complex column. For those, this allow for instance
* to select only a slice of a map.
*/
-public abstract class ColumnSubselection
+public abstract class ColumnSubselection implements Comparable<ColumnSubselection>
{
public static final Serializer serializer = new Serializer();
@@ -72,9 +72,19 @@ public abstract class ColumnSubselection
protected abstract Kind kind();
- public abstract CellPath minIncludedPath();
- public abstract CellPath maxIncludedPath();
- public abstract boolean includes(CellPath path);
+ protected abstract CellPath comparisonPath();
+
+ public int compareTo(ColumnSubselection other)
+ {
+ assert other.column().name.equals(column().name);
+ return column().cellPathComparator().compare(comparisonPath(), other.comparisonPath());
+ }
+
+ /**
+ * Given a path, return -1 if the path is before anything selected by this subselection, 0 if it is selected by this
+ * subselection and 1 if the path is after anything selected by this subselection.
+ */
+ public abstract int compareInclusionOf(CellPath path);
private static class Slice extends ColumnSubselection
{
@@ -93,20 +103,20 @@ public abstract class ColumnSubselection
return Kind.SLICE;
}
- public CellPath minIncludedPath()
+ public CellPath comparisonPath()
{
return from;
}
- public CellPath maxIncludedPath()
- {
- return to;
- }
-
- public boolean includes(CellPath path)
+ public int compareInclusionOf(CellPath path)
{
Comparator<CellPath> cmp = column.cellPathComparator();
- return cmp.compare(from, path) <= 0 && cmp.compare(path, to) <= 0;
+ if (cmp.compare(path, from) < 0)
+ return -1;
+ else if (cmp.compare(to, path) < 0)
+ return 1;
+ else
+ return 0;
}
@Override
@@ -133,20 +143,14 @@ public abstract class ColumnSubselection
return Kind.ELEMENT;
}
- public CellPath minIncludedPath()
- {
- return element;
- }
-
- public CellPath maxIncludedPath()
+ public CellPath comparisonPath()
{
return element;
}
- public boolean includes(CellPath path)
+ public int compareInclusionOf(CellPath path)
{
- Comparator<CellPath> cmp = column.cellPathComparator();
- return cmp.compare(element, path) == 0;
+ return column.cellPathComparator().compare(path, element);
}
@Override
@@ -180,7 +184,7 @@ public abstract class ColumnSubselection
throw new AssertionError();
}
- public ColumnSubselection deserialize(DataInput in, int version, CFMetaData metadata) throws IOException
+ public ColumnSubselection deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException
{
ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
ColumnDefinition column = metadata.getColumnDefinition(name);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 76e29ec..206afa4 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -115,8 +115,7 @@ public abstract class DataLimits
* The max number of results this limits enforces.
* <p>
* Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for
- * thrift, it means cells. The {@link #countsCells} allows to distinguish between the two cases if
- * needed.
+ * thrift, it means cells.
*
* @return the maximum number of results this limits enforces.
*/
@@ -124,8 +123,6 @@ public abstract class DataLimits
public abstract int perPartitionCount();
- public abstract boolean countsCells();
-
public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
{
return new CountingUnfilteredPartitionIterator(iter, newCounter(nowInSec, false));
@@ -269,11 +266,6 @@ public abstract class DataLimits
return perPartitionLimit;
}
- public boolean countsCells()
- {
- return false;
- }
-
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// TODO: we should start storing stats on the number of rows (instead of the number of cells, which
@@ -353,7 +345,7 @@ public abstract class DataLimits
{
sb.append("LIMIT ").append(rowLimit);
if (perPartitionLimit != Integer.MAX_VALUE)
- sb.append(" ");
+ sb.append(' ');
}
if (perPartitionLimit != Integer.MAX_VALUE)
@@ -511,11 +503,6 @@ public abstract class DataLimits
return cellPerPartitionLimit;
}
- public boolean countsCells()
- {
- return true;
- }
-
public float estimateTotalResults(ColumnFamilyStore cfs)
{
// remember that getMeansColumns returns a number of cells: we should clean nomenclature
@@ -572,7 +559,7 @@ public abstract class DataLimits
public void newRow(Row row)
{
- for (Cell cell : row)
+ for (Cell cell : row.cells())
{
if (assumeLiveData || cell.isLive(nowInSec))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 8f34efb..5a49bca 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -137,11 +137,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
if (metadata.isCompound())
{
List<ByteBuffer> values = CompositeType.splitName(name);
- return new SimpleClustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
+ return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
}
else
{
- return new SimpleClustering(name);
+ return new Clustering(name);
}
}
@@ -165,28 +165,18 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
super(expressions);
}
- public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec)
+ public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec)
{
if (expressions.isEmpty())
return iter;
- return new WrappingUnfilteredPartitionIterator(iter)
+ return new AlteringUnfilteredPartitionIterator(iter)
{
- @Override
- public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter)
+ protected Row computeNext(DecoratedKey partitionKey, Row row)
{
- return new FilteringRowIterator(iter)
- {
- // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them.
- // (we should however not filter them in the output of the method, hence it's not used as row filter for the
- // FilteringRowIterator)
- private final TombstoneFilteringRow filter = new TombstoneFilteringRow(nowInSec);
-
- protected boolean includeRow(Row row)
- {
- return CQLFilter.this.isSatisfiedBy(iter.partitionKey(), filter.setTo(row));
- }
- };
+ // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them.
+ Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+ return purged != null && CQLFilter.this.isSatisfiedBy(partitionKey, purged) ? row : null;
}
};
}
@@ -515,10 +505,9 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
CollectionType<?> type = (CollectionType<?>)column.type;
if (column.isComplex())
{
- Iterator<Cell> iter = row.getCells(column);
- while (iter.hasNext())
+ ComplexColumnData complexData = row.getComplexColumnData(column);
+ for (Cell cell : complexData)
{
- Cell cell = iter.next();
if (type.kind == CollectionType.Kind.SET)
{
if (type.nameComparator().compare(cell.path().get(0), value) == 0)
@@ -720,7 +709,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
// In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate
// this we create a "fake" definition. This is messy but it works so is probably good enough.
- return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type, null);
+ return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type);
}
public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 794744a..c3a3c08 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -108,17 +108,16 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
public void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
{
- delete(rowKey, clustering, cell.value(), cell.path(), new SimpleDeletionTime(cell.livenessInfo().timestamp(), nowInSec), opGroup);
+ delete(rowKey, clustering, cell.value(), cell.path(), new DeletionTime(cell.timestamp(), nowInSec), opGroup);
}
public void delete(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path, DeletionTime deletion, OpOrder.Group opGroup)
{
DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
- PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
- Row.Writer writer = upd.writer();
- Rows.writeClustering(makeIndexClustering(rowKey, clustering, path), writer);
- writer.writeRowDeletion(deletion);
- writer.endOfRow();
+
+ Row row = ArrayBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
+ PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+
indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
if (logger.isDebugEnabled())
logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, upd);
@@ -126,18 +125,16 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup)
{
- insert(rowKey, clustering, cell, cell.livenessInfo(), opGroup);
+ insert(rowKey, clustering, cell, LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), opGroup);
}
public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, LivenessInfo info, OpOrder.Group opGroup)
{
DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
- PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
- Row.Writer writer = upd.writer();
- Rows.writeClustering(makeIndexClustering(rowKey, clustering, cell), writer);
- writer.writePartitionKeyLivenessInfo(info);
- writer.endOfRow();
+ Row row = ArrayBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
+ PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+
if (logger.isDebugEnabled())
logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), upd);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index ab8e688..897aa9c 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
+import java.util.Iterator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
@@ -87,17 +88,18 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
long timestamp = row.primaryKeyLivenessInfo().timestamp();
int ttl = row.primaryKeyLivenessInfo().ttl();
- for (Cell cell : row)
+ for (Cell cell : row.cells())
{
- if (cell.isLive(nowInSec) && cell.livenessInfo().timestamp() > timestamp)
+ if (cell.isLive(nowInSec) && cell.timestamp() > timestamp)
{
- timestamp = cell.livenessInfo().timestamp();
- ttl = cell.livenessInfo().ttl();
+ timestamp = cell.timestamp();
+ ttl = cell.ttl();
}
}
maybeIndex(key.getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
}
- for (Cell cell : row)
+
+ for (Cell cell : row.cells())
{
if (!indexes(cell.column()))
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 1bd5452..aaefc9c 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -18,15 +18,7 @@
package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -469,7 +461,8 @@ public class SecondaryIndexManager
if (!row.deletion().isLive())
for (PerColumnSecondaryIndex index : indexes)
index.maybeDelete(key, clustering, row.deletion(), opGroup);
- for (Cell cell : row)
+
+ for (Cell cell : row.cells())
{
for (PerColumnSecondaryIndex index : indexes)
{
@@ -636,8 +629,7 @@ public class SecondaryIndexManager
// Completely identical cells (including expiring columns with
// identical ttl & localExpirationTime) will not get this far due
// to the oldCell.equals(newCell) in StandardUpdater.update
- return !oldCell.value().equals(newCell.value())
- || oldCell.livenessInfo().timestamp() != newCell.livenessInfo().timestamp();
+ return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
}
private Set<String> filterByColumn(Set<String> idxNames)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 1d978a2..d4ca707 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -112,7 +112,7 @@ public abstract class SecondaryIndexSearcher
NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
for (Clustering c : requested)
- clusterings.add(index.makeIndexClustering(pk, c, (Cell)null).takeAlias());
+ clusterings.add(index.makeIndexClustering(pk, c, (Cell)null));
return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
}
else
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 7a40a90..e073802 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -112,11 +112,8 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
{
- PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, entry.indexValue, PartitionColumns.NONE, 1);
- Row.Writer writer = upd.writer();
- Rows.writeClustering(entry.indexClustering, writer);
- writer.writeRowDeletion(new SimpleDeletionTime(entry.timestamp, nowInSec));
- writer.endOfRow();
+ Row row = ArrayBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
+ PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row);
indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
if (logger.isDebugEnabled())
@@ -159,10 +156,10 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public IndexedEntry(DecoratedKey indexValue, Clustering indexClustering, long timestamp, ByteBuffer indexedKey, Clustering indexedEntryClustering)
{
this.indexValue = indexValue;
- this.indexClustering = indexClustering.takeAlias();
+ this.indexClustering = indexClustering;
this.timestamp = timestamp;
this.indexedKey = indexedKey;
- this.indexedEntryClustering = indexedEntryClustering.takeAlias();
+ this.indexedEntryClustering = indexedEntryClustering;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
index aa58511..6529ad9 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
@@ -118,7 +118,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex
public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
{
if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null)
- insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
+ insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
index 5af842c..30391cf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -90,10 +90,9 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex
public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
{
- Iterator<Cell> iter = data.getCells(columnDef);
- while (iter.hasNext())
+ ComplexColumnData complexData = data.getComplexColumnData(columnDef);
+ for (Cell cell : complexData)
{
- Cell cell = iter.next();
if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0)
return false;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
index d48e58b..a93f8e1 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
@@ -93,7 +93,7 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex
@Override
public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
{
- insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup);
+ insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 029dd3c..ce92164 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -171,49 +171,20 @@ public class CompositesSearcher extends SecondaryIndexSearcher
final OpOrder.Group writeOp,
final int nowInSec)
{
- return new WrappingUnfilteredRowIterator(dataIter)
+ return new AlteringUnfilteredRowIterator(dataIter)
{
private int entriesIdx;
- private Unfiltered next;
@Override
- public boolean hasNext()
- {
- return prepareNext();
- }
-
- @Override
- public Unfiltered next()
+ protected Row computeNext(Row row)
{
- if (next == null)
- prepareNext();
+ CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
+ if (!index.isStale(row, indexValue, nowInSec))
+ return row;
- Unfiltered toReturn = next;
- next = null;
- return toReturn;
- }
-
- private boolean prepareNext()
- {
- if (next != null)
- return true;
-
- while (super.hasNext())
- {
- next = super.next();
- if (next.kind() != Unfiltered.Kind.ROW)
- return true;
-
- Row row = (Row)next;
- CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
- if (!index.isStale(row, indexValue, nowInSec))
- return true;
-
- // The entry is stale: delete the entry and ignore otherwise
- index.delete(entry, writeOp, nowInSec);
- next = null;
- }
- return false;
+ // The entry is stale: delete the entry and ignore otherwise
+ index.delete(entry, writeOp, nowInSec);
+ return null;
}
private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 6b53640..118fb75 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -138,7 +138,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
// is the indexed name. Ans so we need to materialize the partition.
ArrayBackedPartition result = ArrayBackedPartition.create(iterator);
iterator.close();
- Row data = result.getRow(new SimpleClustering(index.indexedColumn().name.bytes));
+ Row data = result.getRow(new Clustering(index.indexedColumn().name.bytes));
Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn());
return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec)
? null
@@ -173,10 +173,10 @@ public class KeysSearcher extends SecondaryIndexSearcher
{
// Index is stale, remove the index entry and ignore
index.delete(partitionKey.getKey(),
- new SimpleClustering(index.indexedColumn().name.bytes),
+ new Clustering(index.indexedColumn().name.bytes),
indexedValue,
null,
- new SimpleDeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+ new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
writeOp);
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 78ead36..258a8a5 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.marshal;
-import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -36,6 +35,7 @@ import org.apache.cassandra.serializers.MarshalException;
import org.github.jamm.Unmetered;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -325,7 +325,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
if (valueLengthIfFixed() >= 0)
out.write(value);
else
- ByteBufferUtil.writeWithLength(value, out);
+ ByteBufferUtil.writeWithVIntLength(value, out);
}
public long writtenLength(ByteBuffer value)
@@ -333,25 +333,25 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
assert value.hasRemaining();
return valueLengthIfFixed() >= 0
? value.remaining()
- : TypeSizes.sizeofWithLength(value);
+ : TypeSizes.sizeofWithVIntLength(value);
}
- public ByteBuffer readValue(DataInput in) throws IOException
+ public ByteBuffer readValue(DataInputPlus in) throws IOException
{
int length = valueLengthIfFixed();
if (length >= 0)
return ByteBufferUtil.read(in, length);
else
- return ByteBufferUtil.readWithLength(in);
+ return ByteBufferUtil.readWithVIntLength(in);
}
- public void skipValue(DataInput in) throws IOException
+ public void skipValue(DataInputPlus in) throws IOException
{
int length = valueLengthIfFixed();
- if (length < 0)
- length = in.readInt();
-
- FileUtils.skipBytesFully(in, length);
+ if (length >= 0)
+ FileUtils.skipBytesFully(in, length);
+ else
+ ByteBufferUtil.skipWithVIntLength(in);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index a850305..9a096d0 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.marshal;
import java.nio.ByteBuffer;
-import java.io.DataInput;
import java.io.IOException;
import java.util.List;
import java.util.Iterator;
@@ -34,6 +33,7 @@ import org.apache.cassandra.cql3.Sets;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.serializers.CollectionSerializer;
@@ -236,23 +236,22 @@ public abstract class CollectionType<T> extends AbstractType<T>
{
public void serialize(CellPath path, DataOutputPlus out) throws IOException
{
- ByteBufferUtil.writeWithLength(path.get(0), out);
+ ByteBufferUtil.writeWithVIntLength(path.get(0), out);
}
- public CellPath deserialize(DataInput in) throws IOException
+ public CellPath deserialize(DataInputPlus in) throws IOException
{
- return CellPath.create(ByteBufferUtil.readWithLength(in));
+ return CellPath.create(ByteBufferUtil.readWithVIntLength(in));
}
public long serializedSize(CellPath path)
{
- return TypeSizes.sizeofWithLength(path.get(0));
+ return ByteBufferUtil.serializedSizeWithVIntLength(path.get(0));
}
- public void skip(DataInput in) throws IOException
+ public void skip(DataInputPlus in) throws IOException
{
- int length = in.readInt();
- FileUtils.skipBytesFully(in, length);
+ ByteBufferUtil.skipWithVIntLength(in);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
deleted file mode 100644
index 6775cf1..0000000
--- a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
+++ /dev/null
@@ -1,850 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.utils.SearchIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract common class for all non-thread safe Partition implementations.
- */
-public abstract class AbstractPartitionData implements Partition, Iterable<Row>
-{
- private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionData.class);
-
- protected final CFMetaData metadata;
- protected final DecoratedKey key;
-
- protected final DeletionInfo deletionInfo;
- protected final PartitionColumns columns;
-
- protected Row staticRow;
-
- protected int rows;
-
- // The values for the clustering columns of the rows contained in this partition object. If
- // clusteringSize is the size of the clustering comparator for this table, clusterings has size
- // clusteringSize * rows where rows is the number of rows stored, and row i has it's clustering
- // column values at indexes [clusteringSize * i, clusteringSize * (i + 1)).
- protected ByteBuffer[] clusterings;
-
- // The partition key column liveness infos for the rows of this partition (row i has its liveness info at index i).
- protected final LivenessInfoArray livenessInfos;
- // The row deletion for the rows of this partition (row i has its row deletion at index i).
- protected final DeletionTimeArray deletions;
-
- // The row data (cells data + complex deletions for complex columns) for the rows contained in this partition.
- protected final RowDataBlock data;
-
- // Stats over the rows stored in this partition.
- private final RowStats.Collector statsCollector = new RowStats.Collector();
-
- // The maximum timestamp for any data contained in this partition.
- protected long maxTimestamp = Long.MIN_VALUE;
-
- private AbstractPartitionData(CFMetaData metadata,
- DecoratedKey key,
- DeletionInfo deletionInfo,
- ByteBuffer[] clusterings,
- LivenessInfoArray livenessInfos,
- DeletionTimeArray deletions,
- PartitionColumns columns,
- RowDataBlock data)
- {
- this.metadata = metadata;
- this.key = key;
- this.deletionInfo = deletionInfo;
- this.clusterings = clusterings;
- this.livenessInfos = livenessInfos;
- this.deletions = deletions;
- this.columns = columns;
- this.data = data;
-
- collectStats(deletionInfo.getPartitionDeletion());
- Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
- while (iter.hasNext())
- collectStats(iter.next().deletionTime());
- }
-
- protected AbstractPartitionData(CFMetaData metadata,
- DecoratedKey key,
- DeletionInfo deletionInfo,
- PartitionColumns columns,
- RowDataBlock data,
- int initialRowCapacity)
- {
- this(metadata,
- key,
- deletionInfo,
- new ByteBuffer[initialRowCapacity * metadata.clusteringColumns().size()],
- new LivenessInfoArray(initialRowCapacity),
- new DeletionTimeArray(initialRowCapacity),
- columns,
- data);
- }
-
- protected AbstractPartitionData(CFMetaData metadata,
- DecoratedKey key,
- DeletionTime partitionDeletion,
- PartitionColumns columns,
- int initialRowCapacity,
- boolean sortable)
- {
- this(metadata,
- key,
- new DeletionInfo(partitionDeletion.takeAlias()),
- columns,
- new RowDataBlock(columns.regulars, initialRowCapacity, sortable, metadata.isCounter()),
- initialRowCapacity);
- }
-
- private void collectStats(DeletionTime dt)
- {
- statsCollector.updateDeletionTime(dt);
- maxTimestamp = Math.max(maxTimestamp, dt.markedForDeleteAt());
- }
-
- private void collectStats(LivenessInfo info)
- {
- statsCollector.updateTimestamp(info.timestamp());
- statsCollector.updateTTL(info.ttl());
- statsCollector.updateLocalDeletionTime(info.localDeletionTime());
- maxTimestamp = Math.max(maxTimestamp, info.timestamp());
- }
-
- public CFMetaData metadata()
- {
- return metadata;
- }
-
- public DecoratedKey partitionKey()
- {
- return key;
- }
-
- public DeletionTime partitionLevelDeletion()
- {
- return deletionInfo.getPartitionDeletion();
- }
-
- public PartitionColumns columns()
- {
- return columns;
- }
-
- public Row staticRow()
- {
- return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
- }
-
- public RowStats stats()
- {
- return statsCollector.get();
- }
-
- /**
- * The deletion info for the partition update.
- *
- * <b>warning:</b> the returned object should be used in a read-only fashion. In particular,
- * it should not be used to add new range tombstones to this deletion. For that,
- * {@link addRangeTombstone} should be used instead. The reason being that adding directly to
- * the returned object would bypass some stats collection that {@code addRangeTombstone} does.
- *
- * @return the deletion info for the partition update for use as read-only.
- */
- public DeletionInfo deletionInfo()
- {
- // TODO: it is a tad fragile that deletionInfo can be but shouldn't be modified. We
- // could add the option of providing a read-only view of a DeletionInfo instead.
- return deletionInfo;
- }
-
- public void addPartitionDeletion(DeletionTime deletionTime)
- {
- collectStats(deletionTime);
- deletionInfo.add(deletionTime);
- }
-
- public void addRangeTombstone(Slice deletedSlice, DeletionTime deletion)
- {
- addRangeTombstone(new RangeTombstone(deletedSlice, deletion.takeAlias()));
- }
-
- public void addRangeTombstone(RangeTombstone range)
- {
- collectStats(range.deletionTime());
- deletionInfo.add(range, metadata.comparator);
- }
-
- /**
- * Swap row i and j.
- *
- * This is only used when we need to reorder rows because those were not added in clustering order,
- * which happens in {@link PartitionUpdate#sort} and {@link ArrayBackedPartition#create}. This method
- * is public only because {@code PartitionUpdate} needs to implement {@link Sorting.Sortable}, but
- * it should really only be used by subclasses (and with care) in practice.
- */
- public void swap(int i, int j)
- {
- int cs = metadata.clusteringColumns().size();
- for (int k = 0; k < cs; k++)
- {
- ByteBuffer tmp = clusterings[j * cs + k];
- clusterings[j * cs + k] = clusterings[i * cs + k];
- clusterings[i * cs + k] = tmp;
- }
-
- livenessInfos.swap(i, j);
- deletions.swap(i, j);
- data.swap(i, j);
- }
-
- protected void merge(int i, int j, int nowInSec)
- {
- data.merge(i, j, nowInSec);
- if (livenessInfos.timestamp(i) > livenessInfos.timestamp(j))
- livenessInfos.move(i, j);
- if (deletions.supersedes(i, j))
- deletions.move(i, j);
- }
-
- protected void move(int i, int j)
- {
- int cs = metadata.clusteringColumns().size();
- for (int k = 0; k < cs; k++)
- clusterings[j * cs + k] = clusterings[i * cs + k];
- data.move(i, j);
- livenessInfos.move(i, j);
- deletions.move(i, j);
- }
-
- public int rowCount()
- {
- return rows;
- }
-
- public boolean isEmpty()
- {
- return deletionInfo.isLive() && rows == 0 && staticRow().isEmpty();
- }
-
- protected void clear()
- {
- rows = 0;
- Arrays.fill(clusterings, null);
- livenessInfos.clear();
- deletions.clear();
- data.clear();
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- CFMetaData metadata = metadata();
- sb.append(String.format("Partition[%s.%s] key=%s columns=%s deletion=%s",
- metadata.ksName,
- metadata.cfName,
- metadata.getKeyValidator().getString(partitionKey().getKey()),
- columns(),
- deletionInfo));
-
- if (staticRow() != Rows.EMPTY_STATIC_ROW)
- sb.append("\n ").append(staticRow().toString(metadata, true));
-
- // We use createRowIterator() directly instead of iterator() because that avoids
- // sorting for PartitionUpdate (which inherit this method) and that is useful because
- // 1) it can help with debugging and 2) we can't write after sorting but we want to
- // be able to print an update while we build it (again for debugging)
- Iterator<Row> iterator = createRowIterator(null, false);
- while (iterator.hasNext())
- sb.append("\n ").append(iterator.next().toString(metadata, true));
-
- return sb.toString();
- }
-
- protected void reverse()
- {
- for (int i = 0; i < rows / 2; i++)
- swap(i, rows - 1 - i);
- }
-
- public Row getRow(Clustering clustering)
- {
- Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
- // Note that for statics, this will never return null, this will return an empty row. However,
- // it's more consistent for this method to return null if we don't really have a static row.
- return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
- }
-
- /**
- * Returns an iterator that iterators over the rows of this update in clustering order.
- *
- * @return an iterator over the rows of this update.
- */
- public Iterator<Row> iterator()
- {
- return createRowIterator(null, false);
- }
-
- public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed)
- {
- final RowIterator iter = createRowIterator(columns, reversed);
- return new SearchIterator<Clustering, Row>()
- {
- public boolean hasNext()
- {
- return iter.hasNext();
- }
-
- public Row next(Clustering key)
- {
- if (key == Clustering.STATIC_CLUSTERING)
- {
- if (columns.fetchedColumns().statics.isEmpty() || staticRow().isEmpty())
- return Rows.EMPTY_STATIC_ROW;
-
- return FilteringRow.columnsFilteringRow(columns).setTo(staticRow());
- }
-
- return iter.seekTo(key) ? iter.next() : null;
- }
- };
- }
-
- public UnfilteredRowIterator unfilteredIterator()
- {
- return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
- }
-
- public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed)
- {
- return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed));
- }
-
- protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
- {
- return sliceableUnfilteredIterator(ColumnFilter.selection(columns()), false);
- }
-
- protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(final ColumnFilter selection, final boolean reversed)
- {
- return new AbstractSliceableIterator(this, selection.fetchedColumns(), reversed)
- {
- private final RowIterator rowIterator = createRowIterator(selection, reversed);
- private RowAndTombstoneMergeIterator mergeIterator = new RowAndTombstoneMergeIterator(metadata.comparator, reversed);
-
- protected Unfiltered computeNext()
- {
- if (!mergeIterator.isSet())
- mergeIterator.setTo(rowIterator, deletionInfo.rangeIterator(reversed));
-
- return mergeIterator.hasNext() ? mergeIterator.next() : endOfData();
- }
-
- public Iterator<Unfiltered> slice(Slice slice)
- {
- return mergeIterator.setTo(rowIterator.slice(slice), deletionInfo.rangeIterator(slice, reversed));
- }
- };
- }
-
- private RowIterator createRowIterator(ColumnFilter columns, boolean reversed)
- {
- return reversed ? new ReverseRowIterator(columns) : new ForwardRowIterator(columns);
- }
-
- /**
- * An iterator over the rows of this partition that reuse the same row object.
- */
- private abstract class RowIterator extends UnmodifiableIterator<Row>
- {
- protected final InternalReusableClustering clustering = new InternalReusableClustering();
- protected final InternalReusableRow reusableRow;
- protected final FilteringRow filter;
-
- protected int next;
-
- protected RowIterator(final ColumnFilter columns)
- {
- this.reusableRow = new InternalReusableRow(clustering);
- this.filter = columns == null ? null : FilteringRow.columnsFilteringRow(columns);
- }
-
- /*
- * Move the iterator so that row {@code name} is returned next by {@code next} if that
- * row exists. Otherwise the first row sorting after {@code name} will be returned.
- * Returns whether {@code name} was found or not.
- */
- public abstract boolean seekTo(Clustering name);
-
- public abstract Iterator<Row> slice(Slice slice);
-
- protected Row setRowTo(int row)
- {
- reusableRow.setTo(row);
- return filter == null ? reusableRow : filter.setTo(reusableRow);
- }
-
- /**
- * Simple binary search.
- */
- protected int binarySearch(ClusteringPrefix name, int fromIndex, int toIndex)
- {
- int low = fromIndex;
- int mid = toIndex;
- int high = mid - 1;
- int result = -1;
- while (low <= high)
- {
- mid = (low + high) >> 1;
- if ((result = metadata.comparator.compare(name, clustering.setTo(mid))) > 0)
- low = mid + 1;
- else if (result == 0)
- return mid;
- else
- high = mid - 1;
- }
- return -mid - (result < 0 ? 1 : 2);
- }
- }
-
- private class ForwardRowIterator extends RowIterator
- {
- private ForwardRowIterator(ColumnFilter columns)
- {
- super(columns);
- this.next = 0;
- }
-
- public boolean hasNext()
- {
- return next < rows;
- }
-
- public Row next()
- {
- return setRowTo(next++);
- }
-
- public boolean seekTo(Clustering name)
- {
- if (next >= rows)
- return false;
-
- int idx = binarySearch(name, next, rows);
- next = idx >= 0 ? idx : -idx - 1;
- return idx >= 0;
- }
-
- public Iterator<Row> slice(Slice slice)
- {
- int sidx = binarySearch(slice.start(), next, rows);
- final int start = sidx >= 0 ? sidx : -sidx - 1;
- if (start >= rows)
- return Collections.emptyIterator();
-
- int eidx = binarySearch(slice.end(), start, rows);
- // The insertion point is the first element greater than slice.end(), so we want the previous index
- final int end = eidx >= 0 ? eidx : -eidx - 2;
-
- // Remember the end to speed up potential further slice search
- next = end;
-
- if (start > end)
- return Collections.emptyIterator();
-
- return new AbstractIterator<Row>()
- {
- private int i = start;
-
- protected Row computeNext()
- {
- if (i >= rows || i > end)
- return endOfData();
-
- return setRowTo(i++);
- }
- };
- }
- }
-
- private class ReverseRowIterator extends RowIterator
- {
- private ReverseRowIterator(ColumnFilter columns)
- {
- super(columns);
- this.next = rows - 1;
- }
-
- public boolean hasNext()
- {
- return next >= 0;
- }
-
- public Row next()
- {
- return setRowTo(next--);
- }
-
- public boolean seekTo(Clustering name)
- {
- // We only use that method with forward iterators.
- throw new UnsupportedOperationException();
- }
-
- public Iterator<Row> slice(Slice slice)
- {
- int sidx = binarySearch(slice.end(), 0, next + 1);
- // The insertion point is the first element greater than slice.end(), so we want the previous index
- final int start = sidx >= 0 ? sidx : -sidx - 2;
- if (start < 0)
- return Collections.emptyIterator();
-
- int eidx = binarySearch(slice.start(), 0, start + 1);
- final int end = eidx >= 0 ? eidx : -eidx - 1;
-
- // Remember the end to speed up potential further slice search
- next = end;
-
- if (start < end)
- return Collections.emptyIterator();
-
- return new AbstractIterator<Row>()
- {
- private int i = start;
-
- protected Row computeNext()
- {
- if (i < 0 || i < end)
- return endOfData();
-
- return setRowTo(i--);
- }
- };
- }
- }
-
- /**
- * A reusable view over the clustering of this partition.
- */
- protected class InternalReusableClustering extends Clustering
- {
- final int size = metadata.clusteringColumns().size();
- private int base;
-
- public int size()
- {
- return size;
- }
-
- public Clustering setTo(int row)
- {
- base = row * size;
- return this;
- }
-
- public ByteBuffer get(int i)
- {
- return clusterings[base + i];
- }
-
- public ByteBuffer[] getRawValues()
- {
- ByteBuffer[] values = new ByteBuffer[size];
- for (int i = 0; i < size; i++)
- values[i] = get(i);
- return values;
- }
- };
-
- /**
- * A reusable view over the rows of this partition.
- */
- protected class InternalReusableRow extends AbstractReusableRow
- {
- private final LivenessInfoArray.Cursor liveness = new LivenessInfoArray.Cursor();
- private final DeletionTimeArray.Cursor deletion = new DeletionTimeArray.Cursor();
- private final InternalReusableClustering clustering;
-
- private int row;
-
- public InternalReusableRow()
- {
- this(new InternalReusableClustering());
- }
-
- public InternalReusableRow(InternalReusableClustering clustering)
- {
- this.clustering = clustering;
- }
-
- protected RowDataBlock data()
- {
- return data;
- }
-
- public Row setTo(int row)
- {
- this.clustering.setTo(row);
- this.liveness.setTo(livenessInfos, row);
- this.deletion.setTo(deletions, row);
- this.row = row;
- return this;
- }
-
- protected int row()
- {
- return row;
- }
-
- public Clustering clustering()
- {
- return clustering;
- }
-
- public LivenessInfo primaryKeyLivenessInfo()
- {
- return liveness;
- }
-
- public DeletionTime deletion()
- {
- return deletion;
- }
- };
-
- private static abstract class AbstractSliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator
- {
- private AbstractSliceableIterator(AbstractPartitionData data, PartitionColumns columns, boolean isReverseOrder)
- {
- super(data.metadata, data.key, data.partitionLevelDeletion(), columns, data.staticRow(), isReverseOrder, data.stats());
- }
- }
-
- /**
- * A row writer to add rows to this partition.
- */
- protected class Writer extends RowDataBlock.Writer
- {
- private int clusteringBase;
-
- private int simpleColumnsSetInRow;
- private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>();
-
- public Writer(boolean inOrderCells)
- {
- super(data, inOrderCells);
- }
-
- public void writeClusteringValue(ByteBuffer value)
- {
- ensureCapacity(row);
- clusterings[clusteringBase++] = value;
- }
-
- public void writePartitionKeyLivenessInfo(LivenessInfo info)
- {
- ensureCapacity(row);
- livenessInfos.set(row, info);
- collectStats(info);
- }
-
- public void writeRowDeletion(DeletionTime deletion)
- {
- ensureCapacity(row);
- if (!deletion.isLive())
- deletions.set(row, deletion);
-
- collectStats(deletion);
- }
-
- @Override
- public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
- {
- ensureCapacity(row);
- collectStats(info);
-
- if (column.isComplex())
- complexColumnsSetInRow.add(column);
- else
- ++simpleColumnsSetInRow;
-
- super.writeCell(column, isCounter, value, info, path);
- }
-
- @Override
- public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion)
- {
- ensureCapacity(row);
- collectStats(complexDeletion);
-
- super.writeComplexDeletion(c, complexDeletion);
- }
-
- @Override
- public void endOfRow()
- {
- super.endOfRow();
- ++rows;
-
- statsCollector.updateColumnSetPerRow(simpleColumnsSetInRow + complexColumnsSetInRow.size());
-
- simpleColumnsSetInRow = 0;
- complexColumnsSetInRow.clear();
- }
-
- public int currentRow()
- {
- return row;
- }
-
- private void ensureCapacity(int rowToSet)
- {
- int originalCapacity = livenessInfos.size();
- if (rowToSet < originalCapacity)
- return;
-
- int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
-
- int clusteringSize = metadata.clusteringColumns().size();
-
- clusterings = Arrays.copyOf(clusterings, newCapacity * clusteringSize);
-
- livenessInfos.resize(newCapacity);
- deletions.resize(newCapacity);
- }
-
- @Override
- public Writer reset()
- {
- super.reset();
- clusteringBase = 0;
- simpleColumnsSetInRow = 0;
- complexColumnsSetInRow.clear();
- return this;
- }
- }
-
- /**
- * A range tombstone marker writer to add range tombstone markers to this partition.
- */
- protected class RangeTombstoneCollector implements RangeTombstoneMarker.Writer
- {
- private final boolean reversed;
-
- private final ByteBuffer[] nextValues = new ByteBuffer[metadata().comparator.size()];
- private int size;
- private RangeTombstone.Bound.Kind nextKind;
-
- private Slice.Bound openBound;
- private DeletionTime openDeletion;
-
- public RangeTombstoneCollector(boolean reversed)
- {
- this.reversed = reversed;
- }
-
- public void writeClusteringValue(ByteBuffer value)
- {
- nextValues[size++] = value;
- }
-
- public void writeBoundKind(RangeTombstone.Bound.Kind kind)
- {
- nextKind = kind;
- }
-
- private ByteBuffer[] getValues()
- {
- return Arrays.copyOfRange(nextValues, 0, size);
- }
-
- private void open(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
- {
- openBound = Slice.Bound.create(kind, getValues());
- openDeletion = deletion.takeAlias();
- }
-
- private void close(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
- {
- assert deletion.equals(openDeletion) : "Expected " + openDeletion + " but was " + deletion;
- Slice.Bound closeBound = Slice.Bound.create(kind, getValues());
- Slice slice = reversed
- ? Slice.make(closeBound, openBound)
- : Slice.make(openBound, closeBound);
- addRangeTombstone(slice, openDeletion);
- }
-
- public void writeBoundDeletion(DeletionTime deletion)
- {
- assert !nextKind.isBoundary();
- if (nextKind.isOpen(reversed))
- open(nextKind, deletion);
- else
- close(nextKind, deletion);
- }
-
- public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion)
- {
- assert nextKind.isBoundary();
- DeletionTime closeTime = reversed ? startDeletion : endDeletion;
- DeletionTime openTime = reversed ? endDeletion : startDeletion;
-
- close(nextKind.closeBoundOfBoundary(reversed), closeTime);
- open(nextKind.openBoundOfBoundary(reversed), openTime);
- }
-
- public void endOfMarker()
- {
- clear();
- }
-
- private void addRangeTombstone(Slice deletionSlice, DeletionTime dt)
- {
- AbstractPartitionData.this.addRangeTombstone(deletionSlice, dt);
- }
-
- private void clear()
- {
- size = 0;
- Arrays.fill(nextValues, null);
- nextKind = null;
- }
-
- public void reset()
- {
- openBound = null;
- openDeletion = null;
- clear();
- }
- }
-}