You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/22 15:25:32 UTC
cassandra git commit: Re-enable skipping non-queried column values
Repository: cassandra
Updated Branches:
refs/heads/trunk 129b68c1c -> fd74a0360
Re-enable skipping non-queried column values
patch by slebresne; reviewed by beobal for CASSANDRA-10657
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fd74a036
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fd74a036
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fd74a036
Branch: refs/heads/trunk
Commit: fd74a03602421ca07b6b1087803c54577adae4dd
Parents: 129b68c
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Dec 22 17:08:17 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jan 22 15:25:12 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/SinglePartitionReadCommand.java | 2 +-
.../cassandra/db/filter/ColumnFilter.java | 217 +++++++++++--------
.../db/partitions/PartitionUpdate.java | 26 ++-
.../org/apache/cassandra/db/rows/BTreeRow.java | 34 ++-
.../cassandra/db/rows/ComplexColumnData.java | 22 +-
src/java/org/apache/cassandra/db/rows/Row.java | 10 +
.../apache/cassandra/db/rows/RowIterators.java | 17 ++
.../cassandra/db/rows/SerializationHelper.java | 39 +++-
.../db/rows/UnfilteredRowIterators.java | 18 ++
.../cassandra/db/rows/UnfilteredSerializer.java | 4 +-
.../cassandra/db/rows/WithOnlyQueriedData.java | 49 +++++
.../apache/cassandra/schema/SchemaKeyspace.java | 2 +-
.../apache/cassandra/service/DataResolver.java | 15 +-
.../cassandra/streaming/StreamReceiveTask.java | 3 +-
.../cassandra/thrift/CassandraServer.java | 4 +-
16 files changed, 338 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a44b967..5c577f7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.4
+ * Skip values for non-queried columns (CASSANDRA-10657)
* Add support for secondary indexes on static columns (CASSANDRA-8103)
* CommitLogUpgradeTestMaker creates broken commit logs (CASSANDRA-11051)
* Add metric for number of dropped mutations (CASSANDRA-10866)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 4c87d10..a1de3d6 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -746,7 +746,7 @@ public class SinglePartitionReadCommand extends ReadCommand
try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
{
- final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
+ final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter()));
StageManager.getStage(Stage.MUTATION).execute(new Runnable()
{
public void run()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index e22c154..9c4c714 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -36,44 +36,57 @@ import org.apache.cassandra.io.util.DataOutputPlus;
* Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected
* by a query.
*
- * In practice, this class cover 2 main cases:
- * 1) most user queries have to internally query all columns, because the CQL semantic requires us to know if
- * a row is live or not even if it has no values for the columns requested by the user (see #6588for more
- * details). However, while we need to know for columns if it has live values, we can actually save from
- * sending the values for those columns that will not be returned to the user.
- * 2) for some internal queries (and for queries using #6588 if we introduce it), we're actually fine only
- * actually querying some of the columns.
+ * We distinguish 2 sets of columns in practice: the _fetched_ columns, which are the columns that we (may, see
+ * below) need to fetch internally, and the _queried_ columns, which are the columns that the user has selected
+ * in its request.
*
- * For complex columns, this class allows to be more fine grained than the column by only selection some of the
- * cells of the complex column (either individual cell by path name, or some slice).
+ * The reason for distinguishing those 2 sets is that due to the CQL semantic (see #6588 for more details), we
+ * often need to internally fetch all columns for the queried table, but can still do some optimizations for those
+ * columns that are not directly queried by the user (see #10657 for more details).
+ *
+ * Note that in practice:
+ * - the _queried_ columns set is always included in the _fetched_ one.
+ * - whenever those sets are different, we know the _fetched_ set contains all columns for the table, so we
+ * don't have to record this set, we just keep a pointer to the table metadata. The only set we concretely
+ * store is thus the _queried_ one.
+ * - in the special case of a {@code SELECT *} query, we want to query all columns, and _fetched_ == _queried.
+ * As this is a common case, we special case it by keeping the _queried_ set {@code null} (and we retrieve
+ * the columns through the metadata pointer).
+ *
+ * For complex columns, this class optionally allows to specify a subset of the cells to query for each column.
+ * We can either select individual cells by path name, or a slice of them. Note that this is a sub-selection of
+ * _queried_ cells, so if _fetched_ != _queried_, then the cell selected by this sub-selection are considered
+ * queried and the other ones are considered fetched (and if a column has some sub-selection, it must be a queried
+ * column, which is actually enforced by the Builder below).
*/
public class ColumnFilter
{
public static final Serializer serializer = new Serializer();
- // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved
- // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped.
- // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all.
+ // True if _fetched_ is all the columns, in which case metadata must not be null. If false,
+ // then _fetched_ == _queried_ and we only store _queried_.
private final boolean isFetchAll;
private final CFMetaData metadata; // can be null if !isFetchAll
- private final PartitionColumns selection; // can be null if isFetchAll and we don't want to skip any value
+ private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_
private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null
private ColumnFilter(boolean isFetchAll,
CFMetaData metadata,
- PartitionColumns columns,
+ PartitionColumns queried,
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections)
{
+ assert !isFetchAll || metadata != null;
+ assert isFetchAll || queried != null;
this.isFetchAll = isFetchAll;
this.metadata = metadata;
- this.selection = columns;
+ this.queried = queried;
this.subSelections = subSelections;
}
/**
- * A selection that includes all columns (and their values).
+ * A filter that includes all columns for the provided table.
*/
public static ColumnFilter all(CFMetaData metadata)
{
@@ -81,7 +94,7 @@ public class ColumnFilter
}
/**
- * A selection that only fetch the provided columns.
+ * A filter that only fetches/queries the provided columns.
* <p>
* Note that this shouldn't be used for CQL queries in general as all columns should be queried to
* preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and
@@ -93,81 +106,93 @@ public class ColumnFilter
}
/**
- * The columns that needs to be fetched internally for this selection.
- * <p>
- * This is the columns that must be present in the internal rows returned by queries using this selection,
- * not the columns that are actually queried by the user (see the class javadoc for details).
+ * The columns that needs to be fetched internally for this filter.
*
- * @return the column to fetch for this selection.
+ * @return the columns to fetch for this filter.
*/
public PartitionColumns fetchedColumns()
{
- return isFetchAll ? metadata.partitionColumns() : selection;
+ return isFetchAll ? metadata.partitionColumns() : queried;
+ }
+
+ /**
+ * The columns actually queried by the user.
+ * <p>
+ * Note that this is in general not all the columns that are fetched internally (see {@link #fetchedColumns}).
+ */
+ public PartitionColumns queriedColumns()
+ {
+ assert queried != null || isFetchAll;
+ return queried == null ? metadata.partitionColumns() : queried;
}
- public boolean includesAllColumns()
+ public boolean fetchesAllColumns()
{
return isFetchAll;
}
/**
- * Whether the provided column is selected by this selection.
+ * Whether _fetched_ == _queried_ for this filter, and so if the {@code isQueried()} methods
+ * can return {@code false} for some column/cell.
*/
- public boolean includes(ColumnDefinition column)
+ public boolean allFetchedColumnsAreQueried()
{
- return isFetchAll || selection.contains(column);
+ return !isFetchAll || (queried == null && subSelections == null);
}
/**
- * Whether we can skip the value for the provided selected column.
+ * Whether the provided column is fetched by this filter.
*/
- public boolean canSkipValue(ColumnDefinition column)
+ public boolean fetches(ColumnDefinition column)
{
- // We don't use that currently, see #10655 for more details.
- return false;
+ return isFetchAll || queried.contains(column);
}
/**
- * Whether the provided cell of a complex column is selected by this selection.
+ * Whether the provided column, which is assumed to be _fetched_ by this filter (so the caller must guarantee
+ * that {@code fetches(column) == true}, is also _queried_ by the user.
+ *
+ * !WARNING! please be sure to understand the difference between _fetched_ and _queried_
+ * columns that this class made before using this method. If unsure, you probably want
+ * to use the {@link #fetches} method.
*/
- public boolean includes(Cell cell)
+ public boolean fetchedColumnIsQueried(ColumnDefinition column)
{
- if (isFetchAll || subSelections == null || !cell.column().isComplex())
- return true;
-
- SortedSet<ColumnSubselection> s = subSelections.get(cell.column().name);
- if (s.isEmpty())
- return true;
-
- for (ColumnSubselection subSel : s)
- if (subSel.compareInclusionOf(cell.path()) == 0)
- return true;
-
- return false;
+ return !isFetchAll || queried == null || queried.contains(column);
}
/**
- * Whether we can skip the value of the cell of a complex column.
+ * Whether the provided complex cell (identified by its column and path), which is assumed to be _fetched_ by
+ * this filter, is also _queried_ by the user.
+ *
+ * !WARNING! please be sure to understand the difference between _fetched_ and _queried_
+ * columns that this class made before using this method. If unsure, you probably want
+ * to use the {@link #fetches} method.
*/
- public boolean canSkipValue(ColumnDefinition column, CellPath path)
+ public boolean fetchedCellIsQueried(ColumnDefinition column, CellPath path)
{
- if (!isFetchAll || subSelections == null || !column.isComplex())
- return false;
+ assert path != null;
+ if (!isFetchAll || subSelections == null)
+ return true;
SortedSet<ColumnSubselection> s = subSelections.get(column.name);
+ // No subsection for this column means everything is queried
if (s.isEmpty())
- return false;
+ return true;
for (ColumnSubselection subSel : s)
if (subSel.compareInclusionOf(path) == 0)
- return false;
+ return true;
- return true;
+ return false;
}
/**
* Creates a new {@code Tester} to efficiently test the inclusion of cells of complex column
* {@code column}.
+ *
+ * @return the created tester or {@code null} if all the cells from the provided column
+ * are queried.
*/
public Tester newTester(ColumnDefinition column)
{
@@ -182,8 +207,8 @@ public class ColumnFilter
}
/**
- * Returns a {@code ColumnFilter}} builder that includes all columns (so the selections
- * added to the builder are the columns/cells for which we shouldn't skip the values).
+ * Returns a {@code ColumnFilter}} builder that fetches all columns (and queries the columns
+ * added to the builder, or everything if no column is added).
*/
public static Builder allColumnsBuilder(CFMetaData metadata)
{
@@ -191,8 +216,7 @@ public class ColumnFilter
}
/**
- * Returns a {@code ColumnFilter}} builder that includes only the columns/cells
- * added to the builder.
+ * Returns a {@code ColumnFilter} builder that only fetches the columns/cells added to the builder.
*/
public static Builder selectionBuilder()
{
@@ -211,17 +235,20 @@ public class ColumnFilter
this.iterator = iterator;
}
- public boolean includes(CellPath path)
+ public boolean fetches(CellPath path)
{
- return isFetchAll || includedBySubselection(path);
+ return isFetchAll || hasSubselection(path);
}
- public boolean canSkipValue(CellPath path)
+ /**
+ * Must only be called if {@code fetches(path) == true}.
+ */
+ public boolean fetchedCellIsQueried(CellPath path)
{
- return isFetchAll && !includedBySubselection(path);
+ return !isFetchAll || hasSubselection(path);
}
- private boolean includedBySubselection(CellPath path)
+ private boolean hasSubselection(CellPath path)
{
while (current != null || iterator.hasNext())
{
@@ -241,10 +268,22 @@ public class ColumnFilter
}
}
+ /**
+ * A builder for a {@code ColumnFilter} object.
+ *
+ * Note that the columns added to this build are the _queried_ column. Whether or not all columns
+ * are _fetched_ depends on which ctor you've used to obtained this builder, allColumnsBuilder (all
+ * columns are fetched) or selectionBuilder (only the queried columns are fetched).
+ *
+ * Note that for a allColumnsBuilder, if no queried columns are added, this is interpreted as querying
+ * all columns, not querying none (but if you know you want to query all columns, prefer
+ * {@link ColumnFilter#all)}. For selectionBuilder, adding no queried columns means no column will be
+ * fetched (so the builder will return {@code PartitionColumns.NONE}).
+ */
public static class Builder
{
- private final CFMetaData metadata;
- private PartitionColumns.Builder selection;
+ private final CFMetaData metadata; // null if we don't fetch all columns
+ private PartitionColumns.Builder queriedBuilder;
private List<ColumnSubselection> subSelections;
private Builder(CFMetaData metadata)
@@ -254,17 +293,17 @@ public class ColumnFilter
public Builder add(ColumnDefinition c)
{
- if (selection == null)
- selection = PartitionColumns.builder();
- selection.add(c);
+ if (queriedBuilder == null)
+ queriedBuilder = PartitionColumns.builder();
+ queriedBuilder.add(c);
return this;
}
public Builder addAll(Iterable<ColumnDefinition> columns)
{
- if (selection == null)
- selection = PartitionColumns.builder();
- selection.addAll(columns);
+ if (queriedBuilder == null)
+ queriedBuilder = PartitionColumns.builder();
+ queriedBuilder.addAll(columns);
return this;
}
@@ -291,11 +330,11 @@ public class ColumnFilter
{
boolean isFetchAll = metadata != null;
- PartitionColumns selectedColumns = selection == null ? null : selection.build();
- // It's only ok to have selection == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder
+ PartitionColumns queried = queriedBuilder == null ? null : queriedBuilder.build();
+ // It's only ok to have queried == null in ColumnFilter if isFetchAll. So deal with the case of a selectionBuilder
// with nothing selected (we can at least happen on some backward compatible queries - CASSANDRA-10471).
- if (!isFetchAll && selectedColumns == null)
- selectedColumns = PartitionColumns.NONE;
+ if (!isFetchAll && queried == null)
+ queried = PartitionColumns.NONE;
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null;
if (subSelections != null)
@@ -305,7 +344,7 @@ public class ColumnFilter
s.put(subSelection.column().name, subSelection);
}
- return new ColumnFilter(isFetchAll, metadata, selectedColumns, s);
+ return new ColumnFilter(isFetchAll, metadata, queried, s);
}
}
@@ -315,10 +354,10 @@ public class ColumnFilter
if (isFetchAll)
return "*";
- if (selection.isEmpty())
+ if (queried.isEmpty())
return "";
- Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+ Iterator<ColumnDefinition> defs = queried.selectOrderIterator();
if (!defs.hasNext())
return "<none>";
@@ -355,13 +394,13 @@ public class ColumnFilter
public static class Serializer
{
private static final int IS_FETCH_ALL_MASK = 0x01;
- private static final int HAS_SELECTION_MASK = 0x02;
+ private static final int HAS_QUERIED_MASK = 0x02;
private static final int HAS_SUB_SELECTIONS_MASK = 0x04;
private static int makeHeaderByte(ColumnFilter selection)
{
return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0)
- | (selection.selection != null ? HAS_SELECTION_MASK : 0)
+ | (selection.queried != null ? HAS_QUERIED_MASK : 0)
| (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0);
}
@@ -369,10 +408,10 @@ public class ColumnFilter
{
out.writeByte(makeHeaderByte(selection));
- if (selection.selection != null)
+ if (selection.queried != null)
{
- Columns.serializer.serialize(selection.selection.statics, out);
- Columns.serializer.serialize(selection.selection.regulars, out);
+ Columns.serializer.serialize(selection.queried.statics, out);
+ Columns.serializer.serialize(selection.queried.regulars, out);
}
if (selection.subSelections != null)
@@ -387,15 +426,15 @@ public class ColumnFilter
{
int header = in.readUnsignedByte();
boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0;
- boolean hasSelection = (header & HAS_SELECTION_MASK) != 0;
+ boolean hasQueried = (header & HAS_QUERIED_MASK) != 0;
boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0;
- PartitionColumns selection = null;
- if (hasSelection)
+ PartitionColumns queried = null;
+ if (hasQueried)
{
Columns statics = Columns.serializer.deserialize(in, metadata);
Columns regulars = Columns.serializer.deserialize(in, metadata);
- selection = new PartitionColumns(statics, regulars);
+ queried = new PartitionColumns(statics, regulars);
}
SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null;
@@ -410,17 +449,17 @@ public class ColumnFilter
}
}
- return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, selection, subSelections);
+ return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections);
}
public long serializedSize(ColumnFilter selection, int version)
{
long size = 1; // header byte
- if (selection.selection != null)
+ if (selection.queried != null)
{
- size += Columns.serializer.serializedSize(selection.selection.statics);
- size += Columns.serializer.serializedSize(selection.selection.regulars);
+ size += Columns.serializer.serializedSize(selection.queried.statics);
+ size += Columns.serializer.serializedSize(selection.queried.regulars);
}
if (selection.subSelections != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f10b3b6..02369e4 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -193,18 +193,36 @@ public class PartitionUpdate extends AbstractBTreePartition
/**
* Turns the given iterator into an update.
*
+ * @param iterator the iterator to turn into updates.
+ * @param filter the column filter used when querying {@code iterator}. This is used to make
+ * sure we don't include data for which the value has been skipped while reading (as we would
+ * then be writing something incorrect).
+ *
* Warning: this method does not close the provided iterator, it is up to
* the caller to close it.
*/
- public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
+ public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, ColumnFilter filter)
{
+ iterator = UnfilteredRowIterators.withOnlyQueriedData(iterator, filter);
Holder holder = build(iterator, 16);
MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo;
return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
}
- public static PartitionUpdate fromIterator(RowIterator iterator)
+ /**
+ * Turns the given iterator into an update.
+ *
+ * @param iterator the iterator to turn into updates.
+ * @param filter the column filter used when querying {@code iterator}. This is used to make
+ * sure we don't include data for which the value has been skipped while reading (as we would
+ * then be writing something incorrect).
+ *
+ * Warning: this method does not close the provided iterator, it is up to
+ * the caller to close it.
+ */
+ public static PartitionUpdate fromIterator(RowIterator iterator, ColumnFilter filter)
{
+ iterator = RowIterators.withOnlyQueriedData(iterator, filter);
MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
Holder holder = build(iterator, deletionInfo, true, 16);
return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false);
@@ -296,7 +314,7 @@ public class PartitionUpdate extends AbstractBTreePartition
int nowInSecs = FBUtilities.nowInSeconds();
List<UnfilteredRowIterator> asIterators = Lists.transform(updates, AbstractBTreePartition::unfilteredIterator);
- return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs));
+ return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs), ColumnFilter.all(updates.get(0).metadata()));
}
/**
@@ -690,7 +708,7 @@ public class PartitionUpdate extends AbstractBTreePartition
try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
{
assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families
- return PartitionUpdate.fromIterator(iterator);
+ return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata()));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index e8667e0..a0912ae 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -237,10 +237,12 @@ public class BTreeRow extends AbstractRow
{
Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns();
- if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
+ boolean mayFilterColumns = !filter.fetchesAllColumns() || !filter.allFetchedColumnsAreQueried();
+ boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time());
+
+ if (!mayFilterColumns && !mayHaveShadowed && droppedColumns.isEmpty())
return this;
- boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time());
LivenessInfo newInfo = primaryKeyLivenessInfo;
Deletion newDeletion = deletion;
@@ -255,6 +257,8 @@ public class BTreeRow extends AbstractRow
Columns columns = filter.fetchedColumns().columns(isStatic());
Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester();
+ Predicate<ColumnDefinition> queriedByUserTester = filter.queriedColumns().columns(isStatic()).inOrderInclusionTester();
+ final LivenessInfo rowLiveness = newInfo;
return transformAndFilter(newInfo, newDeletion, (cd) -> {
ColumnDefinition column = cd.column();
@@ -263,11 +267,31 @@ public class BTreeRow extends AbstractRow
CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes);
if (column.isComplex())
- return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
+ return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped, rowLiveness);
Cell cell = (Cell) cd;
- return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell))
- ? cell : null;
+ // We include the cell unless it is 1) shadowed, 2) for a dropped column or 3) skippable.
+ // And a cell is skippable if it is for a column that is not queried by the user and its timestamp
+ // is lower than the row timestamp (see #10657 or SerializationHelper.includes() for details).
+ boolean isForDropped = dropped != null && cell.timestamp() <= dropped.droppedTime;
+ boolean isShadowed = mayHaveShadowed && activeDeletion.deletes(cell);
+ boolean isSkippable = !queriedByUserTester.test(column) && cell.timestamp() < rowLiveness.timestamp();
+ return isForDropped || isShadowed || isSkippable ? null : cell;
+ });
+ }
+
+ public Row withOnlyQueriedData(ColumnFilter filter)
+ {
+ if (filter.allFetchedColumnsAreQueried())
+ return this;
+
+ return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> {
+
+ ColumnDefinition column = cd.column();
+ if (column.isComplex())
+ return ((ComplexColumnData)cd).withOnlyQueriedData(filter);
+
+ return filter.fetchedColumnIsQueried(column) ? cd : null;
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index fab529b..ac137e7 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.SetType;
@@ -144,19 +145,21 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared);
}
- public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped)
+ public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped, LivenessInfo rowLiveness)
{
ColumnFilter.Tester cellTester = filter.newTester(column);
if (cellTester == null && activeDeletion.isLive() && dropped == null)
return this;
DeletionTime newDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion;
- return transformAndFilter(newDeletion,
- (cell) ->
- (cellTester == null || cellTester.includes(cell.path()))
- && !activeDeletion.deletes(cell)
- && (dropped == null || cell.timestamp() > dropped.droppedTime)
- ? cell : null);
+ return transformAndFilter(newDeletion, (cell) ->
+ {
+ boolean isForDropped = dropped != null && cell.timestamp() <= dropped.droppedTime;
+ boolean isShadowed = activeDeletion.deletes(cell);
+ boolean isSkippable = cellTester != null && (!cellTester.fetches(cell.path())
+ || (!cellTester.fetchedCellIsQueried(cell.path()) && cell.timestamp() < rowLiveness.timestamp()));
+ return isForDropped || isShadowed || isSkippable ? null : cell;
+ });
}
public ComplexColumnData purge(DeletionPurger purger, int nowInSec)
@@ -165,6 +168,11 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
return transformAndFilter(newDeletion, (cell) -> cell.purge(purger, nowInSec));
}
+ public ComplexColumnData withOnlyQueriedData(ColumnFilter filter)
+ {
+ return transformAndFilter(complexDeletion, (cell) -> filter.fetchedCellIsQueried(column, cell.path()) ? null : cell);
+ }
+
private ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell, ? extends Cell> function)
{
Object[] transformed = BTree.transformAndFilter(cells, function);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 8a67e9b..173631e 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -205,6 +205,16 @@ public interface Row extends Unfiltered, Collection<ColumnData>
public Row purge(DeletionPurger purger, int nowInSec);
/**
+ * Returns a copy of this row which only include the data queried by {@code filter}, excluding anything _fetched_ for
+ * internal reasons but not queried by the user (see {@link ColumnFilter} for details).
+ *
+ * @param filter the {@code ColumnFilter} to use when deciding what is user queried. This should be the filter
+ * that was used when querying the row on which this method is called.
+ * @return the row but with all data that wasn't queried by the user skipped.
+ */
+ public Row withOnlyQueriedData(ColumnFilter filter);
+
+ /**
* Returns a copy of this row where all counter cells have they "local" shard marked for clearing.
*/
public Row markCounterLocalToBeCleared();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index 551edb8..ca248b6 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.utils.FBUtilities;
@@ -49,6 +50,22 @@ public abstract class RowIterators
}
/**
+ * Filter the provided iterator to only include cells that are selected by the user.
+ *
+ * @param iterator the iterator to filter.
+ * @param filter the {@code ColumnFilter} to use when deciding which cells are queried by the user. This should be the filter
+ * that was used when querying {@code iterator}.
+ * @return the filtered iterator..
+ */
+ public static RowIterator withOnlyQueriedData(RowIterator iterator, ColumnFilter filter)
+ {
+ if (filter.allFetchedColumnsAreQueried())
+ return iterator;
+
+ return Transformation.apply(iterator, new WithOnlyQueriedData(filter));
+ }
+
+ /**
* Wraps the provided iterator so it logs the returned rows for debugging purposes.
* <p>
* Note that this is only meant for debugging as this can log a very large amount of
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index 6b4bc2e..e40a1e1 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -67,34 +67,49 @@ public class SerializationHelper
this(metadata, version, flag, null);
}
- public Columns fetchedStaticColumns(SerializationHeader header)
- {
- return columnsToFetch == null ? header.columns().statics : columnsToFetch.fetchedColumns().statics;
- }
-
- public Columns fetchedRegularColumns(SerializationHeader header)
+ public boolean includes(ColumnDefinition column)
{
- return columnsToFetch == null ? header.columns().regulars : columnsToFetch.fetchedColumns().regulars;
+ return columnsToFetch == null || columnsToFetch.fetches(column);
}
- public boolean includes(ColumnDefinition column)
+ public boolean includes(Cell cell, LivenessInfo rowLiveness)
{
- return columnsToFetch == null || columnsToFetch.includes(column);
+ if (columnsToFetch == null)
+ return true;
+
+ // During queries, some columns are included even though they are not queried by the user because
+ // we always need to distinguish between having a row (with potentially only null values) and not
+ // having a row at all (see #CASSANDRA-7085 for background). In the case where the column is not
+ // actually requested by the user however (canSkipValue), we can skip the full cell if the cell
+ // timestamp is lower than the row one, because in that case, the row timestamp is enough proof
+ // of the liveness of the row. Otherwise, we'll only be able to skip the values of those cells.
+ ColumnDefinition column = cell.column();
+ if (column.isComplex())
+ {
+ if (!includes(cell.path()))
+ return false;
+
+ return !canSkipValue(cell.path()) || cell.timestamp() >= rowLiveness.timestamp();
+ }
+ else
+ {
+ return columnsToFetch.fetchedColumnIsQueried(column) || cell.timestamp() >= rowLiveness.timestamp();
+ }
}
public boolean includes(CellPath path)
{
- return path == null || tester == null || tester.includes(path);
+ return path == null || tester == null || tester.fetches(path);
}
public boolean canSkipValue(ColumnDefinition column)
{
- return columnsToFetch != null && columnsToFetch.canSkipValue(column);
+ return columnsToFetch != null && !columnsToFetch.fetchedColumnIsQueried(column);
}
public boolean canSkipValue(CellPath path)
{
- return path != null && tester != null && tester.canSkipValue(path);
+ return path != null && tester != null && !tester.fetchedCellIsQueried(path);
}
public void startOfComplexColumn(ColumnDefinition column)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index ea929d7..9aa7cc4 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.transform.FilteredRows;
import org.apache.cassandra.db.transform.MoreRows;
import org.apache.cassandra.db.transform.Transformation;
@@ -128,6 +129,23 @@ public abstract class UnfilteredRowIterators
}
/**
+ * Filter the provided iterator to exclude cells that have been fetched but are not queried by the user
+ * (see ColumnFilter for detailes).
+ *
+ * @param iterator the iterator to filter.
+ * @param filter the {@code ColumnFilter} to use when deciding which columns are the one queried by the
+ * user. This should be the filter that was used when querying {@code iterator}.
+ * @return the filtered iterator..
+ */
+ public static UnfilteredRowIterator withOnlyQueriedData(UnfilteredRowIterator iterator, ColumnFilter filter)
+ {
+ if (filter.allFetchedColumnsAreQueried())
+ return iterator;
+
+ return Transformation.apply(iterator, new WithOnlyQueriedData(filter));
+ }
+
+ /**
* Returns an iterator that concatenate two atom iterators.
* This method assumes that both iterator are from the same partition and that the atom from
* {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 0451161..ae9d6e9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -453,7 +453,7 @@ public class UnfilteredSerializer
if (helper.includes(column))
{
Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
- if (!helper.isDropped(cell, false))
+ if (helper.includes(cell, rowLiveness) && !helper.isDropped(cell, false))
builder.addCell(cell);
}
else
@@ -479,7 +479,7 @@ public class UnfilteredSerializer
while (--count >= 0)
{
Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper);
- if (helper.includes(cell.path()) && !helper.isDropped(cell, true))
+ if (helper.includes(cell, rowLiveness) && !helper.isDropped(cell, true))
builder.addCell(cell);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
new file mode 100644
index 0000000..3930f91
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.transform.Transformation;
+
+/**
+ * Function to skip cells (from an iterator) that are not part of those queried by the user
+ * according to the provided {@code ColumnFilter}. See {@link UnfilteredRowIterators#withoutSkippedValues}
+ * for more details.
+ */
+public class WithOnlyQueriedData<I extends BaseRowIterator<?>> extends Transformation<I>
+{
+ private final ColumnFilter filter;
+
+ public WithOnlyQueriedData(ColumnFilter filter)
+ {
+ this.filter = filter;
+ }
+
+ @Override
+ protected Row applyToStatic(Row row)
+ {
+ return row.withOnlyQueriedData(filter);
+ }
+
+ @Override
+ protected Row applyToRow(Row row)
+ {
+ return row.withOnlyQueriedData(filter);
+ }
+};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index a28423d..d0b1256 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -364,7 +364,7 @@ public final class SchemaKeyspace
mutationMap.put(key, mutation);
}
- mutation.add(PartitionUpdate.fromIterator(partition));
+ mutation.add(PartitionUpdate.fromIterator(partition, cmd.columnFilter()));
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index f3858d7..cc2786f 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
@@ -197,10 +198,22 @@ public class DataResolver extends ResponseResolver
public void onCell(int i, Clustering clustering, Cell merged, Cell original)
{
- if (merged != null && !merged.equals(original))
+ if (merged != null && !merged.equals(original) && isQueried(merged))
currentRow(i, clustering).addCell(merged);
}
+ private boolean isQueried(Cell cell)
+ {
+ // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may
+ // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair.
+ // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL
+ // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has
+ /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are
+ // not repaired.
+ ColumnDefinition column = cell.column();
+ ColumnFilter filter = command.columnFilter();
+ return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column);
+ }
};
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6280f3a..081030d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -166,7 +167,7 @@ public class StreamReceiveTask extends StreamTask
try (UnfilteredRowIterator rowIterator = scanner.next())
{
//Apply unsafe (we will flush below before transaction is done)
- new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+ new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))).applyUnsafe();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index ee86f9d..61d9b5f 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -941,7 +941,7 @@ public class CassandraServer implements Cassandra.Iface
DecoratedKey dk = metadata.decorateKey(key);
int nowInSec = FBUtilities.nowInSeconds();
- PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec));
+ PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec), ColumnFilter.all(metadata));
// Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(partitionUpdates);
@@ -1143,7 +1143,7 @@ public class CassandraServer implements Cassandra.Iface
sortAndMerge(metadata, cells, nowInSec);
DecoratedKey dk = metadata.decorateKey(key);
- PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()));
+ PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()), ColumnFilter.all(metadata));
// Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);