You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/05/27 20:49:36 UTC
cassandra git commit: Fix null static columns during paging,
reversed queries
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 63165a719 -> d075540c4
Fix null static columns during paging, reversed queries
Patch by Tyler Hobbs; reviewed by Sylvain Lebresne for CASSANDRA-8502
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d075540c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d075540c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d075540c
Branch: refs/heads/cassandra-2.0
Commit: d075540c46209fdabde74db1e210114965372605
Parents: 63165a7
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Wed May 27 13:48:52 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Wed May 27 13:48:52 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/DataRange.java | 81 +++++++++++++++++---
.../cassandra/db/SliceFromReadCommand.java | 24 ++++++
.../cassandra/db/filter/ColumnCounter.java | 67 +++++++++++++++-
.../cassandra/db/filter/ExtendedFilter.java | 13 ++++
.../cassandra/db/filter/SliceQueryFilter.java | 79 ++++++++++++++++++-
.../service/pager/AbstractQueryPager.java | 40 ++++++++--
.../service/pager/RangeSliceQueryPager.java | 4 +-
.../service/pager/SliceQueryPager.java | 6 +-
.../cassandra/cql3/MultiColumnRelationTest.java | 2 +
.../service/pager/AbstractQueryPagerTest.java | 8 +-
12 files changed, 293 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 709100b..054cf79 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.16:
+ * Fix null static columns in pages after the first, paged reversed
+ queries (CASSANDRA-8502)
* Fix failing bound statement after adding a collection (CASSANDRA-9411)
* Fix counting cache serialization in request metrics (CASSANDRA-9466)
* (cqlsh) Add LOGIN command to switch users (CASSANDRA-7212)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index eec4044..f81ec82 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1682,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
boolean countCQL3Rows,
long now)
{
- DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata.comparator);
+ DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata);
return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, now);
}
@@ -1714,7 +1714,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// create a new SliceQueryFilter that selects all cells, but pass the original slice start and finish
// through to DataRange.Paging to be used on the first and last partitions
SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
- dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata.comparator);
+ dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 774a3aa..1be9469 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -22,10 +22,12 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import com.google.common.base.Objects;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Groups key range and column filter for range queries.
@@ -41,7 +43,7 @@ import org.apache.cassandra.dht.*;
*/
public class DataRange
{
- private final AbstractBounds<RowPosition> keyRange;
+ protected final AbstractBounds<RowPosition> keyRange;
protected IDiskAtomFilter columnFilter;
protected final boolean selectFullRow;
@@ -146,6 +148,8 @@ public class DataRange
// The slice of columns that we want to fetch for each row, ignoring page start/end issues.
private final SliceQueryFilter sliceFilter;
+ private final CFMetaData cfm;
+
private final Comparator<ByteBuffer> comparator;
// used to restrict the start of the slice for the first partition in the range
@@ -154,7 +158,11 @@ public class DataRange
// used to restrict the end of the slice for the last partition in the range
private final ByteBuffer lastPartitionColumnFinish;
- private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart, ByteBuffer lastPartitionColumnFinish, Comparator<ByteBuffer> comparator)
+ // tracks the last key that we updated the filter for to avoid duplicating work
+ private ByteBuffer lastKeyFilterWasUpdatedFor;
+
+ private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart,
+ ByteBuffer lastPartitionColumnFinish, CFMetaData cfm, Comparator<ByteBuffer> comparator)
{
super(range, filter);
@@ -163,14 +171,16 @@ public class DataRange
assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
this.sliceFilter = filter;
+ this.cfm = cfm;
this.comparator = comparator;
this.firstPartitionColumnStart = firstPartitionColumnStart;
this.lastPartitionColumnFinish = lastPartitionColumnFinish;
+ this.lastKeyFilterWasUpdatedFor = null;
}
- public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
+ public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, CFMetaData cfm)
{
- this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator);
+ this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator : cfm.comparator);
}
@Override
@@ -181,7 +191,7 @@ public class DataRange
return false;
if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
- return selectFullRow;
+ return true;
return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
}
@@ -201,12 +211,29 @@ public class DataRange
* Maybe we should just remove that hack, but in the meantime, we
* need to keep a reference the last returned filter.
*/
- columnFilter = equals(startKey(), rowKey) || equals(stopKey(), rowKey)
- ? sliceFilter.withUpdatedSlices(slicesForKey(rowKey))
- : sliceFilter;
+ if (equals(startKey(), rowKey) || equals(stopKey(), rowKey))
+ {
+ if (!rowKey.equals(lastKeyFilterWasUpdatedFor))
+ {
+ this.lastKeyFilterWasUpdatedFor = rowKey;
+ columnFilter = sliceFilter.withUpdatedSlices(slicesForKey(rowKey));
+ }
+ }
+ else
+ {
+ columnFilter = sliceFilter;
+ }
+
return columnFilter;
}
+ /** Returns true if the slice includes static columns, false otherwise. */
+ private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm)
+ {
+ return cfm.hasStaticColumns() &&
+ cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0;
+ }
+
private ColumnSlice[] slicesForKey(ByteBuffer key)
{
// We don't call that until it's necessary, so assume we have to do some hard work
@@ -216,19 +243,37 @@ public class DataRange
ByteBuffer newStart = equals(startKey(), key) && firstPartitionColumnStart.hasRemaining() ? firstPartitionColumnStart : null;
ByteBuffer newFinish = equals(stopKey(), key) && lastPartitionColumnFinish.hasRemaining() ? lastPartitionColumnFinish : null;
- List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
+ // in the common case, we'll have the same number of slices
+ List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length);
+ // Check our slices to see if any fall before the page start (in which case they can be removed) or
+ // if they contain the page start (in which case they should start from the page start). However, if the
+ // slices would include static columns, we need to ensure they are also fetched, and so a separate
+ // slice for the static columns may be required.
+ // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
+ // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
for (ColumnSlice slice : sliceFilter.slices)
{
if (newStart != null)
{
if (slice.isBefore(comparator, newStart))
- continue; // we skip that slice
+ {
+ if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
+
+ continue;
+ }
if (slice.includes(comparator, newStart))
+ {
+ if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
+
slice = new ColumnSlice(newStart, slice.finish);
+ }
- // Whether we've updated the slice or not, we don't have to bother about newStart anymore
+ // once we see a slice that either includes the page start or is after it, we can stop checking
+ // against the page start (because the slices are ordered)
newStart = null;
}
@@ -252,5 +297,17 @@ public class DataRange
columnFilter.updateColumnsLimit(count);
sliceFilter.updateColumnsLimit(count);
}
+
+ @Override
+ public String toString()
+ {
+ return Objects.toStringHelper(this)
+ .add("keyRange", keyRange)
+ .add("sliceFilter", sliceFilter)
+ .add("columnFilter", columnFilter)
+ .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : ByteBufferUtil.bytesToHex(firstPartitionColumnStart))
+ .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : ByteBufferUtil.bytesToHex(lastPartitionColumnFinish))
+ .toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index afca338..0ea2de5 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +62,30 @@ public class SliceFromReadCommand extends ReadCommand
public Row getRow(Keyspace keyspace)
{
+ CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+
+ // If we're doing a reversed query and the filter includes static columns, we need to issue two separate
+ // reads in order to guarantee that the static columns are fetched. See CASSANDRA-8502 for more details.
+ if (filter.reversed && filter.hasStaticSlice(cfm))
+ {
+ logger.debug("Splitting reversed slice with static columns into two reads");
+ Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm);
+
+ Row normalResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp));
+ Row staticResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp));
+
+ // add the static results to the start of the normal results
+ if (normalResults.cf == null)
+ return staticResults;
+
+ if (staticResults.cf != null)
+ for (Column col : staticResults.cf.getReverseSortedColumns())
+ normalResults.cf.addColumn(col);
+
+ return normalResults;
+ }
+
return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 2d0df1f..ddd74b3 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -75,10 +75,10 @@ public class ColumnCounter
public static class GroupByPrefix extends ColumnCounter
{
- private final CompositeType type;
- private final int toGroup;
- private ByteBuffer[] previous;
- private boolean previousGroupIsStatic;
+ protected final CompositeType type;
+ protected final int toGroup;
+ protected ByteBuffer[] previous;
+ protected boolean previousGroupIsStatic;
/**
* A column counter that count only 1 for all the columns sharing a
@@ -157,4 +157,63 @@ public class ColumnCounter
previous = current;
}
}
+
+ /**
+ * Similar to GroupByPrefix, but designed to handle counting cells in reverse order.
+ */
+ public static class GroupByPrefixReversed extends GroupByPrefix
+ {
+ public GroupByPrefixReversed(long timestamp, CompositeType type, int toGroup)
+ {
+ super(timestamp, type, toGroup);
+ }
+
+ @Override
+ public void count(Column column, DeletionInfo.InOrderTester tester)
+ {
+ if (tester.isDeleted(column))
+ return;
+
+ if (!column.isLive(timestamp))
+ {
+ tombstones++;
+ return;
+ }
+
+ if (toGroup == 0)
+ {
+ live = 1;
+ return;
+ }
+
+ ByteBuffer[] current = type.split(column.name());
+ assert current.length >= toGroup;
+
+ boolean isStatic = CompositeType.isStaticName(column.name());
+ if (previous == null)
+ {
+ // This is the first group we've seen, and it's static. In this case we want to return a count of 1,
+ // because there are no other live groups.
+ previousGroupIsStatic = true;
+ previous = current;
+ live++;
+ }
+ else if (isStatic)
+ {
+ // Ignore statics if we've seen any other statics or any other groups
+ return;
+ }
+
+ for (int i = 0; i < toGroup; i++)
+ {
+ if (ByteBufferUtil.compareUnsigned(previous[i], current[i]) != 0)
+ {
+ // it's a new group
+ live++;
+ previous = current;
+ return;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 82e889d..e03eba1 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,6 +159,18 @@ public abstract class ExtendedFilter
}
}
+ @Override
+ public String toString()
+ {
+ return Objects.toStringHelper(this)
+ .add("cfs", cfs)
+ .add("dataRange", dataRange)
+ .add("maxResults", maxResults)
+ .add("countCQL3Rows", countCQL3Rows)
+ .add("currentLimit", currentLimit)
+ .toString();
+ }
+
public static class WithClauses extends ExtendedFilter
{
private final List<IndexExpression> clause;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 6e6ab6b..ecf02c1 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -23,6 +23,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,11 +102,61 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
}
- public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator)
+ /** Returns true if the slice includes static columns, false otherwise. */
+ private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm)
{
- Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator;
+ return cfm.hasStaticColumns() &&
+ cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0;
+ }
+
+ public boolean hasStaticSlice(CFMetaData cfm)
+ {
+ for (ColumnSlice slice : slices)
+ if (sliceIncludesStatics(slice, cfm))
+ return true;
+
+ return false;
+ }
+
+ /**
+ * Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the
+ * remainder of the normal data.
+ *
+ * This should only be called when the filter is reversed and the filter is known to cover static columns (through
+ * hasStaticSlice()).
+ *
+ * @return a pair of (static, normal) SliceQueryFilters
+ */
+ public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm)
+ {
+ assert reversed;
+
+ ByteBuffer staticSliceEnd = cfm.getStaticColumnNameBuilder().buildAsEndOfRange();
+ List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length);
+ for (ColumnSlice slice : slices)
+ {
+ if (sliceIncludesStatics(slice, cfm))
+ nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd));
+ else
+ nonStaticSlices.add(slice);
+ }
- List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>();
+ return Pair.create(
+ new SliceQueryFilter(staticSliceEnd, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, count, compositesToGroup),
+ new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup));
+ }
+
+ public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, CFMetaData cfm)
+ {
+ Comparator<ByteBuffer> cmp = reversed ? cfm.comparator.reverseComparator : cfm.comparator;
+
+ // Check our slices to see if any fall before the new start (in which case they can be removed) or
+ // if they contain the new start (in which case they should start from the page start). However, if the
+ // slices would include static columns, we need to ensure they are also fetched, and so a separate
+ // slice for the static columns may be required.
+ // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
+ // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
+ List<ColumnSlice> newSlices = new ArrayList<>();
boolean pastNewStart = false;
for (int i = 0; i < slices.length; i++)
{
@@ -115,12 +169,23 @@ public class SliceQueryFilter implements IDiskAtomFilter
}
if (slices[i].isBefore(cmp, newStart))
+ {
+ if (!reversed && sliceIncludesStatics(slice, cfm))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
+
continue;
+ }
+ else if (slice.includes(cmp, newStart))
+ {
+ if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
- if (slice.includes(cmp, newStart))
newSlices.add(new ColumnSlice(newStart, slice.finish));
+ }
else
+ {
newSlices.add(slice);
+ }
pastNewStart = true;
}
@@ -254,12 +319,18 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new ColumnCounter(now);
else if (compositesToGroup == 0)
return new ColumnCounter.GroupByPrefix(now, null, 0);
+ else if (reversed)
+ return new ColumnCounter.GroupByPrefixReversed(now, (CompositeType)comparator, compositesToGroup);
else
return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
}
public void trim(ColumnFamily cf, int trimTo, long now)
{
+ // each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming
+ if (cf.getColumnCount() < trimTo)
+ return;
+
ColumnCounter counter = columnCounter(cf.getComparator(), now);
Collection<Column> columns = reversed
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index c45dd07..155e538 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,14 +17,12 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Iterator;
+import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnCounter;
@@ -306,13 +304,29 @@ abstract class AbstractQueryPager implements QueryPager
{
ColumnCounter counter = columnCounter();
- // Discard the first 'toDiscard' live
+ List<Column> staticColumns = new ArrayList<>(cfm.staticColumns().size());
+
+ // Discard the first 'toDiscard' live, non-static columns
while (iter.hasNext())
{
Column c = iter.next();
+
+ // if it's a static column, don't count it and save it to add to the trimmed results
+ ColumnDefinition columnDef = cfm.getColumnDefinitionFromColumnName(c.name());
+ if (columnDef != null && columnDef.type == ColumnDefinition.Type.STATIC)
+ {
+ staticColumns.add(c);
+ continue;
+ }
+
counter.count(c, tester);
+
+ // once we've discarded the required amount, add the rest
if (counter.live() > toDiscard)
{
+ for (Column staticColumn : staticColumns)
+ copy.addColumn(staticColumn);
+
copy.addColumn(c);
while (iter.hasNext())
copy.addColumn(iter.next());
@@ -342,9 +356,21 @@ abstract class AbstractQueryPager implements QueryPager
return Math.min(liveCount, toDiscard);
}
- protected static Column firstColumn(ColumnFamily cf)
+ /**
+ * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column
+ * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal
+ * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we
+ * need to start from the last non-static cell.
+ */
+ protected Column firstNonStaticColumn(ColumnFamily cf)
{
- return cf.iterator().next();
+ for (Column column : cf)
+ {
+ ColumnDefinition def = cfm.getColumnDefinitionFromColumnName(column.name());
+ if (def == null || def.type != ColumnDefinition.Type.STATIC)
+ return column;
+ }
+ return null;
}
protected static Column lastColumn(ColumnFamily cf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 0df1d25..3618c56 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -93,7 +93,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
return false;
// Same as SliceQueryPager, we ignore a deleted column
- Column firstColumn = isReversed() ? lastColumn(first.cf) : firstColumn(first.cf);
+ Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf);
return !first.cf.deletionInfo().isDeleted(firstColumn)
&& firstColumn.isLive(timestamp())
&& lastReturnedName.equals(firstColumn.name());
@@ -102,7 +102,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
protected boolean recordLast(Row last)
{
lastReturnedKey = last.key;
- lastReturnedName = (isReversed() ? firstColumn(last.cf) : lastColumn(last.cf)).name();
+ lastReturnedName = (isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf)).name();
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
index cdad0a5..ad5a0bf 100644
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -78,7 +78,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
// more rows than we're supposed to. See CASSANDRA-8108 for more details.
SliceQueryFilter filter = command.filter.withUpdatedCount(Math.min(command.filter.count, pageSize));
if (lastReturned != null)
- filter = filter.withUpdatedStart(lastReturned, cfm.comparator);
+ filter = filter.withUpdatedStart(lastReturned, cfm);
logger.debug("Querying next page of slice query; new filter: {}", filter);
ReadCommand pageCmd = command.withUpdatedFilter(filter);
@@ -92,7 +92,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
if (lastReturned == null)
return false;
- Column firstColumn = isReversed() ? lastColumn(first.cf) : firstColumn(first.cf);
+ Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf);
// Note: we only return true if the column is the lastReturned *and* it is live. If it is deleted, it is ignored by the
// rest of the paging code (it hasn't been counted as live in particular) and we want to act as if it wasn't there.
return !first.cf.deletionInfo().isDeleted(firstColumn)
@@ -102,7 +102,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
protected boolean recordLast(Row last)
{
- Column lastColumn = isReversed() ? firstColumn(last.cf) : lastColumn(last.cf);
+ Column lastColumn = isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf);
lastReturned = lastColumn.name();
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
index e3ccba5..30b7f0f 100644
--- a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
@@ -704,6 +704,8 @@ public class MultiColumnRelationTest
{
for (String tableSuffix : new String[]{"", "_compact"})
{
+ execute("DELETE FROM %s.multiple_clustering_reversed" + tableSuffix + " WHERE a=0");
+
// b and d are reversed in the clustering order
execute("INSERT INTO %s.multiple_clustering_reversed" + tableSuffix + " (a, b, c, d) VALUES (0, 1, 0, 0)");
execute("INSERT INTO %s.multiple_clustering_reversed" + tableSuffix + " (a, b, c, d) VALUES (0, 1, 1, 1)");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
index 5467ec0..273487a 100644
--- a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
@@ -125,9 +125,11 @@ public class AbstractQueryPagerTest
return cf;
}
- private CFMetaData createMetadata()
+ private static CFMetaData createMetadata()
{
- return new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance);
+ CFMetaData cfm = new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance);
+ cfm.rebuild();
+ return cfm;
}
private static ByteBuffer bb(int i)
@@ -147,7 +149,7 @@ public class AbstractQueryPagerTest
// We use this to test more thorougly DiscardFirst and DiscardLast (more generic pager behavior is tested in
// QueryPagerTest). The only thing those method use is the result of the columnCounter() method. So to keep
// it simple, we fake all actual parameters in the ctor below but just override the columnCounter() method.
- super(null, 0, false, null, null, 0);
+ super(null, 0, false, createMetadata(), null, 0);
}
@Override