You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/05/27 20:51:23 UTC
[1/2] cassandra git commit: Fix null static columns during paging,
reversed queries
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 74280b1ee -> a8dce228d
Fix null static columns during paging, reversed queries
Patch by Tyler Hobbs; reviewed by Sylvain Lebresne for CASSANDRA-8502
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d075540c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d075540c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d075540c
Branch: refs/heads/cassandra-2.1
Commit: d075540c46209fdabde74db1e210114965372605
Parents: 63165a7
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Wed May 27 13:48:52 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Wed May 27 13:48:52 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/DataRange.java | 81 +++++++++++++++++---
.../cassandra/db/SliceFromReadCommand.java | 24 ++++++
.../cassandra/db/filter/ColumnCounter.java | 67 +++++++++++++++-
.../cassandra/db/filter/ExtendedFilter.java | 13 ++++
.../cassandra/db/filter/SliceQueryFilter.java | 79 ++++++++++++++++++-
.../service/pager/AbstractQueryPager.java | 40 ++++++++--
.../service/pager/RangeSliceQueryPager.java | 4 +-
.../service/pager/SliceQueryPager.java | 6 +-
.../cassandra/cql3/MultiColumnRelationTest.java | 2 +
.../service/pager/AbstractQueryPagerTest.java | 8 +-
12 files changed, 293 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 709100b..054cf79 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.16:
+ * Fix null static columns in pages after the first, paged reversed
+ queries (CASSANDRA-8502)
* Fix failing bound statement after adding a collection (CASSANDRA-9411)
* Fix counting cache serialization in request metrics (CASSANDRA-9466)
* (cqlsh) Add LOGIN command to switch users (CASSANDRA-7212)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index eec4044..f81ec82 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1682,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
boolean countCQL3Rows,
long now)
{
- DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata.comparator);
+ DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata);
return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, now);
}
@@ -1714,7 +1714,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// create a new SliceQueryFilter that selects all cells, but pass the original slice start and finish
// through to DataRange.Paging to be used on the first and last partitions
SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
- dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata.comparator);
+ dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 774a3aa..1be9469 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -22,10 +22,12 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import com.google.common.base.Objects;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Groups key range and column filter for range queries.
@@ -41,7 +43,7 @@ import org.apache.cassandra.dht.*;
*/
public class DataRange
{
- private final AbstractBounds<RowPosition> keyRange;
+ protected final AbstractBounds<RowPosition> keyRange;
protected IDiskAtomFilter columnFilter;
protected final boolean selectFullRow;
@@ -146,6 +148,8 @@ public class DataRange
// The slice of columns that we want to fetch for each row, ignoring page start/end issues.
private final SliceQueryFilter sliceFilter;
+ private final CFMetaData cfm;
+
private final Comparator<ByteBuffer> comparator;
// used to restrict the start of the slice for the first partition in the range
@@ -154,7 +158,11 @@ public class DataRange
// used to restrict the end of the slice for the last partition in the range
private final ByteBuffer lastPartitionColumnFinish;
- private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart, ByteBuffer lastPartitionColumnFinish, Comparator<ByteBuffer> comparator)
+ // tracks the last key that we updated the filter for to avoid duplicating work
+ private ByteBuffer lastKeyFilterWasUpdatedFor;
+
+ private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart,
+ ByteBuffer lastPartitionColumnFinish, CFMetaData cfm, Comparator<ByteBuffer> comparator)
{
super(range, filter);
@@ -163,14 +171,16 @@ public class DataRange
assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
this.sliceFilter = filter;
+ this.cfm = cfm;
this.comparator = comparator;
this.firstPartitionColumnStart = firstPartitionColumnStart;
this.lastPartitionColumnFinish = lastPartitionColumnFinish;
+ this.lastKeyFilterWasUpdatedFor = null;
}
- public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
+ public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, CFMetaData cfm)
{
- this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator);
+ this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator : cfm.comparator);
}
@Override
@@ -181,7 +191,7 @@ public class DataRange
return false;
if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
- return selectFullRow;
+ return true;
return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
}
@@ -201,12 +211,29 @@ public class DataRange
* Maybe we should just remove that hack, but in the meantime, we
* need to keep a reference the last returned filter.
*/
- columnFilter = equals(startKey(), rowKey) || equals(stopKey(), rowKey)
- ? sliceFilter.withUpdatedSlices(slicesForKey(rowKey))
- : sliceFilter;
+ if (equals(startKey(), rowKey) || equals(stopKey(), rowKey))
+ {
+ if (!rowKey.equals(lastKeyFilterWasUpdatedFor))
+ {
+ this.lastKeyFilterWasUpdatedFor = rowKey;
+ columnFilter = sliceFilter.withUpdatedSlices(slicesForKey(rowKey));
+ }
+ }
+ else
+ {
+ columnFilter = sliceFilter;
+ }
+
return columnFilter;
}
+ /** Returns true if the slice includes static columns, false otherwise. */
+ private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm)
+ {
+ return cfm.hasStaticColumns() &&
+ cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0;
+ }
+
private ColumnSlice[] slicesForKey(ByteBuffer key)
{
// We don't call that until it's necessary, so assume we have to do some hard work
@@ -216,19 +243,37 @@ public class DataRange
ByteBuffer newStart = equals(startKey(), key) && firstPartitionColumnStart.hasRemaining() ? firstPartitionColumnStart : null;
ByteBuffer newFinish = equals(stopKey(), key) && lastPartitionColumnFinish.hasRemaining() ? lastPartitionColumnFinish : null;
- List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
+ // in the common case, we'll have the same number of slices
+ List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length);
+ // Check our slices to see if any fall before the page start (in which case they can be removed) or
+ // if they contain the page start (in which case they should start from the page start). However, if the
+ // slices would include static columns, we need to ensure they are also fetched, and so a separate
+ // slice for the static columns may be required.
+ // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
+ // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
for (ColumnSlice slice : sliceFilter.slices)
{
if (newStart != null)
{
if (slice.isBefore(comparator, newStart))
- continue; // we skip that slice
+ {
+ if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
+
+ continue;
+ }
if (slice.includes(comparator, newStart))
+ {
+ if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
+
slice = new ColumnSlice(newStart, slice.finish);
+ }
- // Whether we've updated the slice or not, we don't have to bother about newStart anymore
+ // once we see a slice that either includes the page start or is after it, we can stop checking
+ // against the page start (because the slices are ordered)
newStart = null;
}
@@ -252,5 +297,17 @@ public class DataRange
columnFilter.updateColumnsLimit(count);
sliceFilter.updateColumnsLimit(count);
}
+
+ @Override
+ public String toString()
+ {
+ return Objects.toStringHelper(this)
+ .add("keyRange", keyRange)
+ .add("sliceFilter", sliceFilter)
+ .add("columnFilter", columnFilter)
+ .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : ByteBufferUtil.bytesToHex(firstPartitionColumnStart))
+ .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : ByteBufferUtil.bytesToHex(lastPartitionColumnFinish))
+ .toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index afca338..0ea2de5 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +62,30 @@ public class SliceFromReadCommand extends ReadCommand
public Row getRow(Keyspace keyspace)
{
+ CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+
+ // If we're doing a reversed query and the filter includes static columns, we need to issue two separate
+ // reads in order to guarantee that the static columns are fetched. See CASSANDRA-8502 for more details.
+ if (filter.reversed && filter.hasStaticSlice(cfm))
+ {
+ logger.debug("Splitting reversed slice with static columns into two reads");
+ Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm);
+
+ Row normalResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp));
+ Row staticResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp));
+
+ // add the static results to the start of the normal results
+ if (normalResults.cf == null)
+ return staticResults;
+
+ if (staticResults.cf != null)
+ for (Column col : staticResults.cf.getReverseSortedColumns())
+ normalResults.cf.addColumn(col);
+
+ return normalResults;
+ }
+
return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 2d0df1f..ddd74b3 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -75,10 +75,10 @@ public class ColumnCounter
public static class GroupByPrefix extends ColumnCounter
{
- private final CompositeType type;
- private final int toGroup;
- private ByteBuffer[] previous;
- private boolean previousGroupIsStatic;
+ protected final CompositeType type;
+ protected final int toGroup;
+ protected ByteBuffer[] previous;
+ protected boolean previousGroupIsStatic;
/**
* A column counter that count only 1 for all the columns sharing a
@@ -157,4 +157,63 @@ public class ColumnCounter
previous = current;
}
}
+
+ /**
+ * Similar to GroupByPrefix, but designed to handle counting cells in reverse order.
+ */
+ public static class GroupByPrefixReversed extends GroupByPrefix
+ {
+ public GroupByPrefixReversed(long timestamp, CompositeType type, int toGroup)
+ {
+ super(timestamp, type, toGroup);
+ }
+
+ @Override
+ public void count(Column column, DeletionInfo.InOrderTester tester)
+ {
+ if (tester.isDeleted(column))
+ return;
+
+ if (!column.isLive(timestamp))
+ {
+ tombstones++;
+ return;
+ }
+
+ if (toGroup == 0)
+ {
+ live = 1;
+ return;
+ }
+
+ ByteBuffer[] current = type.split(column.name());
+ assert current.length >= toGroup;
+
+ boolean isStatic = CompositeType.isStaticName(column.name());
+ if (previous == null)
+ {
+ // This is the first group we've seen, and it's static. In this case we want to return a count of 1,
+ // because there are no other live groups.
+ previousGroupIsStatic = true;
+ previous = current;
+ live++;
+ }
+ else if (isStatic)
+ {
+ // Ignore statics if we've seen any other statics or any other groups
+ return;
+ }
+
+ for (int i = 0; i < toGroup; i++)
+ {
+ if (ByteBufferUtil.compareUnsigned(previous[i], current[i]) != 0)
+ {
+ // it's a new group
+ live++;
+ previous = current;
+ return;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 82e889d..e03eba1 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -158,6 +159,18 @@ public abstract class ExtendedFilter
}
}
+ @Override
+ public String toString()
+ {
+ return Objects.toStringHelper(this)
+ .add("cfs", cfs)
+ .add("dataRange", dataRange)
+ .add("maxResults", maxResults)
+ .add("countCQL3Rows", countCQL3Rows)
+ .add("currentLimit", currentLimit)
+ .toString();
+ }
+
public static class WithClauses extends ExtendedFilter
{
private final List<IndexExpression> clause;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 6e6ab6b..ecf02c1 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -23,6 +23,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,11 +102,61 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
}
- public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator)
+ /** Returns true if the slice includes static columns, false otherwise. */
+ private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm)
{
- Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator;
+ return cfm.hasStaticColumns() &&
+ cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0;
+ }
+
+ public boolean hasStaticSlice(CFMetaData cfm)
+ {
+ for (ColumnSlice slice : slices)
+ if (sliceIncludesStatics(slice, cfm))
+ return true;
+
+ return false;
+ }
+
+ /**
+ * Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the
+ * remainder of the normal data.
+ *
+ * This should only be called when the filter is reversed and the filter is known to cover static columns (through
+ * hasStaticSlice()).
+ *
+ * @return a pair of (static, normal) SliceQueryFilters
+ */
+ public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm)
+ {
+ assert reversed;
+
+ ByteBuffer staticSliceEnd = cfm.getStaticColumnNameBuilder().buildAsEndOfRange();
+ List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length);
+ for (ColumnSlice slice : slices)
+ {
+ if (sliceIncludesStatics(slice, cfm))
+ nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd));
+ else
+ nonStaticSlices.add(slice);
+ }
- List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>();
+ return Pair.create(
+ new SliceQueryFilter(staticSliceEnd, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, count, compositesToGroup),
+ new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup));
+ }
+
+ public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, CFMetaData cfm)
+ {
+ Comparator<ByteBuffer> cmp = reversed ? cfm.comparator.reverseComparator : cfm.comparator;
+
+ // Check our slices to see if any fall before the new start (in which case they can be removed) or
+ // if they contain the new start (in which case they should start from the page start). However, if the
+ // slices would include static columns, we need to ensure they are also fetched, and so a separate
+ // slice for the static columns may be required.
+ // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
+ // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
+ List<ColumnSlice> newSlices = new ArrayList<>();
boolean pastNewStart = false;
for (int i = 0; i < slices.length; i++)
{
@@ -115,12 +169,23 @@ public class SliceQueryFilter implements IDiskAtomFilter
}
if (slices[i].isBefore(cmp, newStart))
+ {
+ if (!reversed && sliceIncludesStatics(slice, cfm))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
+
continue;
+ }
+ else if (slice.includes(cmp, newStart))
+ {
+ if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER))
+ newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
- if (slice.includes(cmp, newStart))
newSlices.add(new ColumnSlice(newStart, slice.finish));
+ }
else
+ {
newSlices.add(slice);
+ }
pastNewStart = true;
}
@@ -254,12 +319,18 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new ColumnCounter(now);
else if (compositesToGroup == 0)
return new ColumnCounter.GroupByPrefix(now, null, 0);
+ else if (reversed)
+ return new ColumnCounter.GroupByPrefixReversed(now, (CompositeType)comparator, compositesToGroup);
else
return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
}
public void trim(ColumnFamily cf, int trimTo, long now)
{
+ // each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming
+ if (cf.getColumnCount() < trimTo)
+ return;
+
ColumnCounter counter = columnCounter(cf.getComparator(), now);
Collection<Column> columns = reversed
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index c45dd07..155e538 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,14 +17,12 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Iterator;
+import java.util.*;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnCounter;
@@ -306,13 +304,29 @@ abstract class AbstractQueryPager implements QueryPager
{
ColumnCounter counter = columnCounter();
- // Discard the first 'toDiscard' live
+ List<Column> staticColumns = new ArrayList<>(cfm.staticColumns().size());
+
+ // Discard the first 'toDiscard' live, non-static columns
while (iter.hasNext())
{
Column c = iter.next();
+
+ // if it's a static column, don't count it and save it to add to the trimmed results
+ ColumnDefinition columnDef = cfm.getColumnDefinitionFromColumnName(c.name());
+ if (columnDef != null && columnDef.type == ColumnDefinition.Type.STATIC)
+ {
+ staticColumns.add(c);
+ continue;
+ }
+
counter.count(c, tester);
+
+ // once we've discarded the required amount, add the rest
if (counter.live() > toDiscard)
{
+ for (Column staticColumn : staticColumns)
+ copy.addColumn(staticColumn);
+
copy.addColumn(c);
while (iter.hasNext())
copy.addColumn(iter.next());
@@ -342,9 +356,21 @@ abstract class AbstractQueryPager implements QueryPager
return Math.min(liveCount, toDiscard);
}
- protected static Column firstColumn(ColumnFamily cf)
+ /**
+ * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column
+ * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal
+ * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we
+ * need to start from the last non-static cell.
+ */
+ protected Column firstNonStaticColumn(ColumnFamily cf)
{
- return cf.iterator().next();
+ for (Column column : cf)
+ {
+ ColumnDefinition def = cfm.getColumnDefinitionFromColumnName(column.name());
+ if (def == null || def.type != ColumnDefinition.Type.STATIC)
+ return column;
+ }
+ return null;
}
protected static Column lastColumn(ColumnFamily cf)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 0df1d25..3618c56 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -93,7 +93,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
return false;
// Same as SliceQueryPager, we ignore a deleted column
- Column firstColumn = isReversed() ? lastColumn(first.cf) : firstColumn(first.cf);
+ Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf);
return !first.cf.deletionInfo().isDeleted(firstColumn)
&& firstColumn.isLive(timestamp())
&& lastReturnedName.equals(firstColumn.name());
@@ -102,7 +102,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
protected boolean recordLast(Row last)
{
lastReturnedKey = last.key;
- lastReturnedName = (isReversed() ? firstColumn(last.cf) : lastColumn(last.cf)).name();
+ lastReturnedName = (isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf)).name();
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
index cdad0a5..ad5a0bf 100644
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -78,7 +78,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
// more rows than we're supposed to. See CASSANDRA-8108 for more details.
SliceQueryFilter filter = command.filter.withUpdatedCount(Math.min(command.filter.count, pageSize));
if (lastReturned != null)
- filter = filter.withUpdatedStart(lastReturned, cfm.comparator);
+ filter = filter.withUpdatedStart(lastReturned, cfm);
logger.debug("Querying next page of slice query; new filter: {}", filter);
ReadCommand pageCmd = command.withUpdatedFilter(filter);
@@ -92,7 +92,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
if (lastReturned == null)
return false;
- Column firstColumn = isReversed() ? lastColumn(first.cf) : firstColumn(first.cf);
+ Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf);
// Note: we only return true if the column is the lastReturned *and* it is live. If it is deleted, it is ignored by the
// rest of the paging code (it hasn't been counted as live in particular) and we want to act as if it wasn't there.
return !first.cf.deletionInfo().isDeleted(firstColumn)
@@ -102,7 +102,7 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
protected boolean recordLast(Row last)
{
- Column lastColumn = isReversed() ? firstColumn(last.cf) : lastColumn(last.cf);
+ Column lastColumn = isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf);
lastReturned = lastColumn.name();
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
index e3ccba5..30b7f0f 100644
--- a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
@@ -704,6 +704,8 @@ public class MultiColumnRelationTest
{
for (String tableSuffix : new String[]{"", "_compact"})
{
+ execute("DELETE FROM %s.multiple_clustering_reversed" + tableSuffix + " WHERE a=0");
+
// b and d are reversed in the clustering order
execute("INSERT INTO %s.multiple_clustering_reversed" + tableSuffix + " (a, b, c, d) VALUES (0, 1, 0, 0)");
execute("INSERT INTO %s.multiple_clustering_reversed" + tableSuffix + " (a, b, c, d) VALUES (0, 1, 1, 1)");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d075540c/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
index 5467ec0..273487a 100644
--- a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
@@ -125,9 +125,11 @@ public class AbstractQueryPagerTest
return cf;
}
- private CFMetaData createMetadata()
+ private static CFMetaData createMetadata()
{
- return new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance);
+ CFMetaData cfm = new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance);
+ cfm.rebuild();
+ return cfm;
}
private static ByteBuffer bb(int i)
@@ -147,7 +149,7 @@ public class AbstractQueryPagerTest
// We use this to test more thorougly DiscardFirst and DiscardLast (more generic pager behavior is tested in
// QueryPagerTest). The only thing those method use is the result of the columnCounter() method. So to keep
// it simple, we fake all actual parameters in the ctor below but just override the columnCounter() method.
- super(null, 0, false, null, null, 0);
+ super(null, 0, false, createMetadata(), null, 0);
}
@Override
[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by ty...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a8dce228
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a8dce228
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a8dce228
Branch: refs/heads/cassandra-2.1
Commit: a8dce228de443716556f59510b0494078bbe97a2
Parents: 74280b1 d075540
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Wed May 27 13:50:58 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Wed May 27 13:50:58 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/DataRange.java | 82 ++++++++++++++++---
.../cassandra/db/SliceFromReadCommand.java | 28 +++++++
.../cassandra/db/filter/ColumnCounter.java | 61 +++++++++++++-
.../cassandra/db/filter/SliceQueryFilter.java | 83 ++++++++++++++++++--
.../service/pager/AbstractQueryPager.java | 40 ++++++++--
.../service/pager/RangeSliceQueryPager.java | 4 +-
.../service/pager/SliceQueryPager.java | 6 +-
.../service/pager/AbstractQueryPagerTest.java | 8 +-
10 files changed, 278 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cea6135,054cf79..d00cc65
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,7 +1,36 @@@
-2.0.16:
+2.1.6
+ * (cqlsh) Better float precision by default (CASSANDRA-9224)
+ * Improve estimated row count (CASSANDRA-9107)
+ * Optimize range tombstone memory footprint (CASSANDRA-8603)
+ * Use configured gcgs in anticompaction (CASSANDRA-9397)
+ * Warn on misuse of unlogged batches (CASSANDRA-9282)
+ * Failure detector detects and ignores local pauses (CASSANDRA-9183)
+ * Add utility class to support for rate limiting a given log statement (CASSANDRA-9029)
+ * Add missing consistency levels to cassandra-stess (CASSANDRA-9361)
+ * Fix commitlog getCompletedTasks to not increment (CASSANDRA-9339)
+ * Fix for harmless exceptions logged as ERROR (CASSANDRA-8564)
+ * Delete processed sstables in sstablesplit/sstableupgrade (CASSANDRA-8606)
+ * Improve sstable exclusion from partition tombstones (CASSANDRA-9298)
+ * Validate the indexed column rather than the cell's contents for 2i (CASSANDRA-9057)
+ * Add support for top-k custom 2i queries (CASSANDRA-8717)
+ * Fix error when dropping table during compaction (CASSANDRA-9251)
+ * cassandra-stress supports validation operations over user profiles (CASSANDRA-8773)
+ * Add support for rate limiting log messages (CASSANDRA-9029)
+ * Log the partition key with tombstone warnings (CASSANDRA-8561)
+ * Reduce runWithCompactionsDisabled poll interval to 1ms (CASSANDRA-9271)
+ * Fix PITR commitlog replay (CASSANDRA-9195)
+ * GCInspector logs very different times (CASSANDRA-9124)
+ * Fix deleting from an empty list (CASSANDRA-9198)
+ * Update tuple and collection types that use a user-defined type when that UDT
+ is modified (CASSANDRA-9148, CASSANDRA-9192)
+ * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261)
+ * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151)
+ * Repair waits for anticompaction to finish (CASSANDRA-9097)
+ * Fix streaming not holding ref when stream error (CASSANDRA-9295)
+ * Fix canonical view returning early opened SSTables (CASSANDRA-9396)
+Merged from 2.0:
+ * Fix null static columns in pages after the first, paged reversed
+ queries (CASSANDRA-8502)
- * Fix failing bound statement after adding a collection (CASSANDRA-9411)
* Fix counting cache serialization in request metrics (CASSANDRA-9466)
* (cqlsh) Add LOGIN command to switch users (CASSANDRA-7212)
* Clone SliceQueryFilter in AbstractReadCommand implementations (CASSANDRA-8940)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DataRange.java
index dd4fbe8,1be9469..08fdd14
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@@ -22,11 -22,12 +22,13 @@@ import java.util.ArrayList
import java.util.Comparator;
import java.util.List;
+ import com.google.common.base.Objects;
+ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
- import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
++import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.dht.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Groups key range and column filter for range queries.
@@@ -146,15 -147,22 +148,22 @@@ public class DataRang
{
// The slice of columns that we want to fetch for each row, ignoring page start/end issues.
private final SliceQueryFilter sliceFilter;
+
+ private final CFMetaData cfm;
+
- private final Comparator<ByteBuffer> comparator;
+ private final Comparator<Composite> comparator;
// used to restrict the start of the slice for the first partition in the range
- private final ByteBuffer firstPartitionColumnStart;
+ private final Composite firstPartitionColumnStart;
// used to restrict the end of the slice for the last partition in the range
- private final ByteBuffer lastPartitionColumnFinish;
+ private final Composite lastPartitionColumnFinish;
- private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite firstPartitionColumnStart, Composite lastPartitionColumnFinish, Comparator<Composite> comparator)
+ // tracks the last key that we updated the filter for to avoid duplicating work
+ private ByteBuffer lastKeyFilterWasUpdatedFor;
+
- private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer firstPartitionColumnStart,
- ByteBuffer lastPartitionColumnFinish, CFMetaData cfm, Comparator<ByteBuffer> comparator)
++ private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite firstPartitionColumnStart,
++ Composite lastPartitionColumnFinish, CFMetaData cfm, Comparator<Composite> comparator)
{
super(range, filter);
@@@ -166,11 -175,12 +176,12 @@@
this.comparator = comparator;
this.firstPartitionColumnStart = firstPartitionColumnStart;
this.lastPartitionColumnFinish = lastPartitionColumnFinish;
+ this.lastKeyFilterWasUpdatedFor = null;
}
- public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CellNameType comparator)
- public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, CFMetaData cfm)
++ public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, Composite columnStart, Composite columnFinish, CFMetaData cfm)
{
- this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator() : comparator);
- this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator : cfm.comparator);
++ this(range, filter, columnStart, columnFinish, cfm, filter.isReversed() ? cfm.comparator.reverseComparator() : cfm.comparator);
}
@Override
@@@ -207,27 -227,53 +228,52 @@@
return columnFilter;
}
+ /** Returns true if the slice includes static columns, false otherwise. */
+ private boolean sliceIncludesStatics(ColumnSlice slice, boolean reversed, CFMetaData cfm)
+ {
+ return cfm.hasStaticColumns() &&
- cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0;
++ slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
+ }
+
private ColumnSlice[] slicesForKey(ByteBuffer key)
{
- // We don't call that until it's necessary, so assume we have to do some hard work
// Also note that firstPartitionColumnStart and lastPartitionColumnFinish, when used, only "restrict" the filter slices,
// it doesn't expand on them. As such, we can ignore the case where they are empty and we do
// as it screw up with the logic below (see #6592)
- ByteBuffer newStart = equals(startKey(), key) && firstPartitionColumnStart.hasRemaining() ? firstPartitionColumnStart : null;
- ByteBuffer newFinish = equals(stopKey(), key) && lastPartitionColumnFinish.hasRemaining() ? lastPartitionColumnFinish : null;
+ Composite newStart = equals(startKey(), key) && !firstPartitionColumnStart.isEmpty() ? firstPartitionColumnStart : null;
+ Composite newFinish = equals(stopKey(), key) && !lastPartitionColumnFinish.isEmpty() ? lastPartitionColumnFinish : null;
- List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
+ // in the common case, we'll have the same number of slices
+ List<ColumnSlice> newSlices = new ArrayList<>(sliceFilter.slices.length);
+ // Check our slices to see if any fall before the page start (in which case they can be removed) or
+ // if they contain the page start (in which case they should start from the page start). However, if the
+ // slices would include static columns, we need to ensure they are also fetched, and so a separate
+ // slice for the static columns may be required.
+ // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
+ // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
for (ColumnSlice slice : sliceFilter.slices)
{
if (newStart != null)
{
if (slice.isBefore(comparator, newStart))
- continue; // we skip that slice
+ {
+ if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm))
- newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
++ newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
+
+ continue;
+ }
if (slice.includes(comparator, newStart))
+ {
- if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER))
- newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
++ if (!sliceFilter.reversed && sliceIncludesStatics(slice, false, cfm) && !newStart.equals(Composites.EMPTY))
++ newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
+
slice = new ColumnSlice(newStart, slice.finish);
+ }
- // Whether we've updated the slice or not, we don't have to bother about newStart anymore
+ // once we see a slice that either includes the page start or is after it, we can stop checking
+ // against the page start (because the slices are ordered)
newStart = null;
}
@@@ -251,5 -297,17 +297,17 @@@
columnFilter.updateColumnsLimit(count);
sliceFilter.updateColumnsLimit(count);
}
+
+ @Override
+ public String toString()
+ {
+ return Objects.toStringHelper(this)
+ .add("keyRange", keyRange)
+ .add("sliceFilter", sliceFilter)
+ .add("columnFilter", columnFilter)
- .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : ByteBufferUtil.bytesToHex(firstPartitionColumnStart))
- .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : ByteBufferUtil.bytesToHex(lastPartitionColumnFinish))
++ .add("firstPartitionColumnStart", firstPartitionColumnStart == null ? "null" : cfm.comparator.getString(firstPartitionColumnStart))
++ .add("lastPartitionColumnFinish", lastPartitionColumnFinish == null ? "null" : cfm.comparator.getString(lastPartitionColumnFinish))
+ .toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 80a624d,0ea2de5..6995193
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@@ -21,7 -21,10 +21,9 @@@ import java.io.DataInput
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.cassandra.utils.Pair;
+import com.google.common.base.Objects;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
@@@ -33,9 -38,11 +35,12 @@@ import org.apache.cassandra.io.util.Dat
import org.apache.cassandra.service.RowDataResolver;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.Pair;
public class SliceFromReadCommand extends ReadCommand
{
- static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
++ private static final Logger logger = LoggerFactory.getLogger(SliceFromReadCommand.class);
+
static final SliceFromReadCommandSerializer serializer = new SliceFromReadCommandSerializer();
public final SliceQueryFilter filter;
@@@ -53,7 -62,30 +58,30 @@@
public Row getRow(Keyspace keyspace)
{
+ CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName);
DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+
+ // If we're doing a reversed query and the filter includes static columns, we need to issue two separate
+ // reads in order to guarantee that the static columns are fetched. See CASSANDRA-8502 for more details.
+ if (filter.reversed && filter.hasStaticSlice(cfm))
+ {
+ logger.debug("Splitting reversed slice with static columns into two reads");
+ Pair<SliceQueryFilter, SliceQueryFilter> newFilters = filter.splitOutStaticSlice(cfm);
+
+ Row normalResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.right, timestamp));
+ Row staticResults = keyspace.getRow(new QueryFilter(dk, cfName, newFilters.left, timestamp));
+
+ // add the static results to the start of the normal results
+ if (normalResults.cf == null)
+ return staticResults;
+
+ if (staticResults.cf != null)
- for (Column col : staticResults.cf.getReverseSortedColumns())
- normalResults.cf.addColumn(col);
++ for (Cell cell : staticResults.cf.getReverseSortedColumns())
++ normalResults.cf.addColumn(cell);
+
+ return normalResults;
+ }
+
return keyspace.getRow(new QueryFilter(dk, cfName, filter, timestamp));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index 8be26e1,ddd74b3..d7a8873
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@@ -73,9 -75,10 +73,9 @@@ public class ColumnCounte
public static class GroupByPrefix extends ColumnCounter
{
- private final CellNameType type;
- private final int toGroup;
- private CellName previous;
- protected final CompositeType type;
++ protected final CellNameType type;
+ protected final int toGroup;
- protected ByteBuffer[] previous;
- protected boolean previousGroupIsStatic;
++ protected CellName previous;
/**
* A column counter that count only 1 for all the columns sharing a
@@@ -148,4 -157,63 +148,59 @@@
previous = current;
}
}
+
+ /**
+ * Similar to GroupByPrefix, but designed to handle counting cells in reverse order.
+ */
+ public static class GroupByPrefixReversed extends GroupByPrefix
+ {
- public GroupByPrefixReversed(long timestamp, CompositeType type, int toGroup)
++ public GroupByPrefixReversed(long timestamp, CellNameType type, int toGroup)
+ {
+ super(timestamp, type, toGroup);
+ }
+
+ @Override
- public void count(Column column, DeletionInfo.InOrderTester tester)
++ public void count(Cell cell, DeletionInfo.InOrderTester tester)
+ {
- if (tester.isDeleted(column))
++ if (tester.isDeleted(cell))
+ return;
+
- if (!column.isLive(timestamp))
++ if (!cell.isLive(timestamp))
+ {
+ tombstones++;
+ return;
+ }
+
+ if (toGroup == 0)
+ {
+ live = 1;
+ return;
+ }
+
- ByteBuffer[] current = type.split(column.name());
- assert current.length >= toGroup;
++ CellName current = cell.name();
++ assert current.size() >= toGroup;
+
- boolean isStatic = CompositeType.isStaticName(column.name());
+ if (previous == null)
+ {
- // This is the first group we've seen, and it's static. In this case we want to return a count of 1,
- // because there are no other live groups.
- previousGroupIsStatic = true;
++ // This is the first group we've seen. If it happens to be static, we still want to increment the
++ // count because a) there are no-static rows (statics are always last in reversed order), and b) any
++ // static cells we see after this will not increment the count
+ previous = current;
+ live++;
+ }
- else if (isStatic)
- {
- // Ignore statics if we've seen any other statics or any other groups
- return;
- }
-
- for (int i = 0; i < toGroup; i++)
++ else if (!current.isStatic()) // ignore statics if we've seen any other statics or any other groups
+ {
- if (ByteBufferUtil.compareUnsigned(previous[i], current[i]) != 0)
++ for (int i = 0; i < toGroup; i++)
+ {
- // it's a new group
- live++;
- previous = current;
- return;
++ if (type.subtype(i).compare(previous.get(i), current.get(i)) != 0)
++ {
++ // it's a new group
++ live++;
++ previous = current;
++ return;
++ }
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index 1195d4c,ecf02c1..9d5b705
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@@ -27,19 -30,16 +27,19 @@@ import com.google.common.collect.Iterat
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
++import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.columniterator.SSTableSliceIterator;
- import org.apache.cassandra.db.composites.CType;
- import org.apache.cassandra.db.composites.CellName;
- import org.apache.cassandra.db.composites.CellNameType;
- import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
++import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.tracing.Tracing;
++import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.Pair;
public class SliceQueryFilter implements IDiskAtomFilter
{
@@@ -111,27 -102,90 +111,88 @@@
return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
}
- public SliceQueryFilter withUpdatedStart(Composite newStart, CellNameType comparator)
+ /** Returns true if the slice includes static columns, false otherwise. */
+ private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm)
{
- Comparator<Composite> cmp = reversed ? comparator.reverseComparator() : comparator;
+ return cfm.hasStaticColumns() &&
- cfm.getStaticColumnNameBuilder().build().compareTo(reversed ? slice.finish : slice.start) >= 0;
++ slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end());
+ }
+
+ public boolean hasStaticSlice(CFMetaData cfm)
+ {
+ for (ColumnSlice slice : slices)
+ if (sliceIncludesStatics(slice, cfm))
+ return true;
+
+ return false;
+ }
+
+ /**
+ * Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the
+ * remainder of the normal data.
+ *
+ * This should only be called when the filter is reversed and the filter is known to cover static columns (through
+ * hasStaticSlice()).
+ *
+ * @return a pair of (static, normal) SliceQueryFilters
+ */
+ public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm)
+ {
+ assert reversed;
+
- ByteBuffer staticSliceEnd = cfm.getStaticColumnNameBuilder().buildAsEndOfRange();
++ Composite staticSliceEnd = cfm.comparator.staticPrefix().end();
+ List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length);
+ for (ColumnSlice slice : slices)
+ {
+ if (sliceIncludesStatics(slice, cfm))
+ nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd));
+ else
+ nonStaticSlices.add(slice);
+ }
+
+ return Pair.create(
- new SliceQueryFilter(staticSliceEnd, ByteBufferUtil.EMPTY_BYTE_BUFFER, true, count, compositesToGroup),
++ new SliceQueryFilter(staticSliceEnd, Composites.EMPTY, true, count, compositesToGroup),
+ new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup));
+ }
- List<ColumnSlice> newSlices = new ArrayList<>(slices.length);
- public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, CFMetaData cfm)
++ public SliceQueryFilter withUpdatedStart(Composite newStart, CFMetaData cfm)
+ {
- Comparator<ByteBuffer> cmp = reversed ? cfm.comparator.reverseComparator : cfm.comparator;
++ Comparator<Composite> cmp = reversed ? cfm.comparator.reverseComparator() : cfm.comparator;
+
+ // Check our slices to see if any fall before the new start (in which case they can be removed) or
+ // if they contain the new start (in which case they should start from the page start). However, if the
+ // slices would include static columns, we need to ensure they are also fetched, and so a separate
+ // slice for the static columns may be required.
+ // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so
+ // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details.
+ List<ColumnSlice> newSlices = new ArrayList<>();
boolean pastNewStart = false;
- for (int i = 0; i < slices.length; i++)
+ for (ColumnSlice slice : slices)
{
- ColumnSlice slice = slices[i];
-
if (pastNewStart)
{
newSlices.add(slice);
continue;
}
- if (slices[i].isBefore(cmp, newStart))
+ if (slice.isBefore(cmp, newStart))
+ {
+ if (!reversed && sliceIncludesStatics(slice, cfm))
- newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
++ newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
+
continue;
+ }
+ else if (slice.includes(cmp, newStart))
+ {
+ if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER))
- newSlices.add(new ColumnSlice(ByteBufferUtil.EMPTY_BYTE_BUFFER, cfm.getStaticColumnNameBuilder().buildAsEndOfRange()));
++ newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end()));
- if (slice.includes(cmp, newStart))
newSlices.add(new ColumnSlice(newStart, slice.finish));
+ }
else
+ {
newSlices.add(slice);
+ }
pastNewStart = true;
}
@@@ -275,15 -319,21 +336,21 @@@
return new ColumnCounter(now);
else if (compositesToGroup == 0)
return new ColumnCounter.GroupByPrefix(now, null, 0);
+ else if (reversed)
- return new ColumnCounter.GroupByPrefixReversed(now, (CompositeType)comparator, compositesToGroup);
++ return new ColumnCounter.GroupByPrefixReversed(now, comparator, compositesToGroup);
else
- return new ColumnCounter.GroupByPrefix(now, (CompositeType)comparator, compositesToGroup);
+ return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup);
}
public void trim(ColumnFamily cf, int trimTo, long now)
{
+ // each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming
+ if (cf.getColumnCount() < trimTo)
+ return;
+
ColumnCounter counter = columnCounter(cf.getComparator(), now);
- Collection<Column> columns = reversed
+ Collection<Cell> cells = reversed
? cf.getReverseSortedColumns()
: cf.getSortedColumns();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index bc64582,155e538..a3991ed
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -319,13 -304,29 +317,29 @@@ abstract class AbstractQueryPager imple
{
ColumnCounter counter = columnCounter();
- // Discard the first 'toDiscard' live
- List<Column> staticColumns = new ArrayList<>(cfm.staticColumns().size());
++ List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size());
+
- // Discard the first 'toDiscard' live, non-static columns
++ // Discard the first 'toDiscard' live, non-static cells
while (iter.hasNext())
{
- Column c = iter.next();
+ Cell c = iter.next();
+
+ // if it's a static column, don't count it and save it to add to the trimmed results
- ColumnDefinition columnDef = cfm.getColumnDefinitionFromColumnName(c.name());
- if (columnDef != null && columnDef.type == ColumnDefinition.Type.STATIC)
++ ColumnDefinition columnDef = cfm.getColumnDefinition(c.name());
++ if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC)
+ {
- staticColumns.add(c);
++ staticCells.add(c);
+ continue;
+ }
+
counter.count(c, tester);
+
+ // once we've discarded the required amount, add the rest
if (counter.live() > toDiscard)
{
- for (Column staticColumn : staticColumns)
- copy.addColumn(staticColumn);
++ for (Cell staticCell : staticCells)
++ copy.addColumn(staticCell);
+
copy.addColumn(c);
while (iter.hasNext())
copy.addColumn(iter.next());
@@@ -355,12 -356,24 +369,24 @@@
return Math.min(liveCount, toDiscard);
}
- protected static Cell firstCell(ColumnFamily cf)
+ /**
+ * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column
+ * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal
+ * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we
+ * need to start from the last non-static cell.
+ */
- protected Column firstNonStaticColumn(ColumnFamily cf)
++ protected Cell firstNonStaticCell(ColumnFamily cf)
{
- return cf.iterator().next();
- for (Column column : cf)
++ for (Cell cell : cf)
+ {
- ColumnDefinition def = cfm.getColumnDefinitionFromColumnName(column.name());
- if (def == null || def.type != ColumnDefinition.Type.STATIC)
- return column;
++ ColumnDefinition def = cfm.getColumnDefinition(cell.name());
++ if (def == null || def.kind != ColumnDefinition.Kind.STATIC)
++ return cell;
+ }
+ return null;
}
- protected static Column lastColumn(ColumnFamily cf)
+ protected static Cell lastCell(ColumnFamily cf)
{
return cf.getReverseSortedColumns().iterator().next();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index cfcd953,3618c56..c9a28e8
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@@ -95,16 -93,16 +95,16 @@@ public class RangeSliceQueryPager exten
return false;
// Same as SliceQueryPager, we ignore a deleted column
- Cell firstCell = isReversed() ? lastCell(first.cf) : firstCell(first.cf);
- Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf);
- return !first.cf.deletionInfo().isDeleted(firstColumn)
- && firstColumn.isLive(timestamp())
- && lastReturnedName.equals(firstColumn.name());
++ Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
+ return !first.cf.deletionInfo().isDeleted(firstCell)
+ && firstCell.isLive(timestamp())
+ && lastReturnedName.equals(firstCell.name());
}
protected boolean recordLast(Row last)
{
lastReturnedKey = last.key;
- lastReturnedName = (isReversed() ? firstCell(last.cf) : lastCell(last.cf)).name();
- lastReturnedName = (isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf)).name();
++ lastReturnedName = (isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf)).name();
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
index 05c05b1,ad5a0bf..18045fe
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@@ -97,18 -92,18 +97,18 @@@ public class SliceQueryPager extends Ab
if (lastReturned == null)
return false;
- Cell firstCell = isReversed() ? lastCell(first.cf) : firstCell(first.cf);
- Column firstColumn = isReversed() ? lastColumn(first.cf) : firstNonStaticColumn(first.cf);
++ Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
// Note: we only return true if the column is the lastReturned *and* it is live. If it is deleted, it is ignored by the
// rest of the paging code (it hasn't been counted as live in particular) and we want to act as if it wasn't there.
- return !first.cf.deletionInfo().isDeleted(firstColumn)
- && firstColumn.isLive(timestamp())
- && lastReturned.equals(firstColumn.name());
+ return !first.cf.deletionInfo().isDeleted(firstCell)
+ && firstCell.isLive(timestamp())
+ && lastReturned.equals(firstCell.name());
}
protected boolean recordLast(Row last)
{
- Cell lastCell = isReversed() ? firstCell(last.cf) : lastCell(last.cf);
- Column lastColumn = isReversed() ? firstNonStaticColumn(last.cf) : lastColumn(last.cf);
- lastReturned = lastColumn.name();
++ Cell lastCell = isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf);
+ lastReturned = lastCell.name();
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a8dce228/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
index cc03e5d,273487a..00718b4
--- a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
@@@ -126,9 -125,11 +126,11 @@@ public class AbstractQueryPagerTes
return cf;
}
- private CFMetaData createMetadata()
+ private static CFMetaData createMetadata()
{
- return new CFMetaData("ks", "cf", ColumnFamilyType.Standard, CellNames.fromAbstractType(Int32Type.instance, false));
- CFMetaData cfm = new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance);
++ CFMetaData cfm = new CFMetaData("ks", "cf", ColumnFamilyType.Standard, CellNames.fromAbstractType(Int32Type.instance, false));
+ cfm.rebuild();
+ return cfm;
}
private static ByteBuffer bb(int i)