You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/09/18 13:24:26 UTC
[1/2] cassandra git commit: Implement backward compatibility for
paging states
Repository: cassandra
Updated Branches:
refs/heads/trunk bb826240b -> 9967303f6
Implement backward compatibility for paging states
Pre-3.0 nodes serialize a full cellname in the paging state, but doing
so is wasteful for 3.0 nodes. We can't however guarantee to which nodes
a particular paging state will be sent so we need to have a backward
compatibility.
The patch preserve the pre-existing format for the paging state (thus
requiring more work for 3.0 nodes) for the native protocol v3, but
change to a more efficient (for 3.0) format for the protocol v4. It is
then documented that paging states shouldn't be used across protocol
versions.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b99c8631
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b99c8631
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b99c8631
Branch: refs/heads/trunk
Commit: b99c8631586e734532dfd9fa84e7d88551edd229
Parents: 106b1cd
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Sep 8 16:00:58 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 18 13:01:59 2015 +0200
----------------------------------------------------------------------
doc/native_protocol_v3.spec | 3 +
doc/native_protocol_v4.spec | 5 +
.../org/apache/cassandra/cql3/QueryOptions.java | 6 +-
.../apache/cassandra/cql3/QueryProcessor.java | 3 +-
.../org/apache/cassandra/cql3/ResultSet.java | 6 +-
.../cql3/statements/SelectStatement.java | 4 +-
.../org/apache/cassandra/db/Clustering.java | 29 +++
.../org/apache/cassandra/db/LegacyLayout.java | 2 +-
.../cassandra/db/PartitionRangeReadCommand.java | 6 +-
src/java/org/apache/cassandra/db/ReadQuery.java | 5 +-
.../db/SinglePartitionNamesCommand.java | 1 +
.../db/SinglePartitionReadCommand.java | 14 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 48 +++-
.../cassandra/db/rows/ComplexColumnData.java | 5 +
src/java/org/apache/cassandra/db/rows/Row.java | 3 +-
src/java/org/apache/cassandra/db/view/View.java | 5 +-
.../apache/cassandra/db/view/ViewBuilder.java | 3 +-
.../service/pager/AbstractQueryPager.java | 4 +-
.../service/pager/MultiPartitionPager.java | 8 +-
.../cassandra/service/pager/PagingState.java | 220 ++++++++++++++++---
.../cassandra/service/pager/QueryPagers.java | 3 +-
.../service/pager/RangeNamesQueryPager.java | 4 +-
.../service/pager/RangeSliceQueryPager.java | 16 +-
.../service/pager/SinglePartitionPager.java | 16 +-
.../org/apache/cassandra/transport/CBUtil.java | 7 +
.../cassandra/service/QueryPagerTest.java | 97 +++++++-
.../service/pager/PagingStateTest.java | 99 +++++++++
27 files changed, 526 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/doc/native_protocol_v3.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v3.spec b/doc/native_protocol_v3.spec
index 312c8c1..1c8e924 100644
--- a/doc/native_protocol_v3.spec
+++ b/doc/native_protocol_v3.spec
@@ -917,6 +917,9 @@ Table of Contents
<result_page_size> results. While the current implementation always respect
the exact value of <result_page_size>, we reserve ourselves the right to return
slightly smaller or bigger pages in the future for performance reasons.
+ - The <paging_state> is specific to a protocol version and drivers should not
+ send a <paging_state> returned by a node using the protocol v3 to query a node
+ using the protocol v4 for instance.
9. Error codes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index f040323..852ae60 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -1006,6 +1006,9 @@ Table of Contents
<result_page_size> results. While the current implementation always respect
the exact value of <result_page_size>, we reserve ourselves the right to return
slightly smaller or bigger pages in the future for performance reasons.
+ - The <paging_state> is specific to a protocol version and drivers should not
+ send a <paging_state> returned by a node using the protocol v3 to query a node
+ using the protocol v4 for instance.
9. Error codes
@@ -1160,3 +1163,5 @@ Table of Contents
* Add warnings to frames for responses for which the server generated a warning during processing, which the client needs to address.
* Add the date and time data types
* Add the tinyint and smallint data types
+ * The <paging_state> return on the v4 protocol is not compatible with the v3 protocol. In other words, a <paging_state> returned by a
+ node using the protocol v4 should not be used to query a node using the protocol v3 (and vice-versa).
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index ad554ed..e6e80e3 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -384,7 +384,7 @@ public abstract class QueryOptions
if (!flags.isEmpty())
{
int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() : -1;
- PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body)) : null;
+ PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValue(body), version) : null;
ConsistencyLevel serialConsistency = flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
long timestamp = Long.MIN_VALUE;
if (flags.contains(Flag.TIMESTAMP))
@@ -413,7 +413,7 @@ public abstract class QueryOptions
if (flags.contains(Flag.PAGE_SIZE))
dest.writeInt(options.getPageSize());
if (flags.contains(Flag.PAGING_STATE))
- CBUtil.writeValue(options.getPagingState().serialize(), dest);
+ CBUtil.writeValue(options.getPagingState().serialize(version), dest);
if (flags.contains(Flag.SERIAL_CONSISTENCY))
CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest);
if (flags.contains(Flag.TIMESTAMP))
@@ -438,7 +438,7 @@ public abstract class QueryOptions
if (flags.contains(Flag.PAGE_SIZE))
size += 4;
if (flags.contains(Flag.PAGING_STATE))
- size += CBUtil.sizeOfValue(options.getPagingState().serialize());
+ size += CBUtil.sizeOfValue(options.getPagingState().serializedSize(version));
if (flags.contains(Flag.SERIAL_CONSISTENCY))
size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
if (flags.contains(Flag.TIMESTAMP))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 92549f9..8f2a4f4 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.service.*;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.thrift.ThriftClientState;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.*;
import org.github.jamm.MemoryMeter;
@@ -322,7 +323,7 @@ public class QueryProcessor implements QueryHandler
throw new IllegalArgumentException("Only SELECTs can be paged");
SelectStatement select = (SelectStatement)prepared.statement;
- QueryPager pager = select.getQuery(makeInternalOptions(prepared, values), FBUtilities.nowInSeconds()).getPager(null);
+ QueryPager pager = select.getQuery(makeInternalOptions(prepared, values), FBUtilities.nowInSeconds()).getPager(null, Server.CURRENT_VERSION);
return UntypedResultSet.create(select, pager, pageSize);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index ea26f34..9d2fbec 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -325,7 +325,7 @@ public class ResultSet
PagingState state = null;
if (flags.contains(Flag.HAS_MORE_PAGES))
- state = PagingState.deserialize(CBUtil.readValue(body));
+ state = PagingState.deserialize(CBUtil.readValue(body), version);
if (flags.contains(Flag.NO_METADATA))
return new ResultMetadata(flags, null, columnCount, state);
@@ -365,7 +365,7 @@ public class ResultSet
dest.writeInt(m.columnCount);
if (hasMorePages)
- CBUtil.writeValue(m.pagingState.serialize(), dest);
+ CBUtil.writeValue(m.pagingState.serialize(version), dest);
if (!noMetadata)
{
@@ -397,7 +397,7 @@ public class ResultSet
int size = 8;
if (hasMorePages)
- size += CBUtil.sizeOfValue(m.pagingState.serialize());
+ size += CBUtil.sizeOfValue(m.pagingState.serializedSize(version));
if (!noMetadata)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index cb6de2b..170bfdf 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -209,7 +209,7 @@ public class SelectStatement implements CQLStatement
if (pageSize <= 0 || query.limits().count() <= pageSize)
return execute(query, options, state, nowInSec);
- QueryPager pager = query.getPager(options.getPagingState());
+ QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec);
}
@@ -389,7 +389,7 @@ public class SelectStatement implements CQLStatement
}
else
{
- QueryPager pager = query.getPager(options.getPagingState());
+ QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion());
return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index 2fb92d9..6bffd45 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -24,7 +24,9 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.memory.AbstractAllocator;
@@ -138,6 +140,20 @@ public class Clustering extends AbstractClusteringPrefix
ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types);
}
+ public ByteBuffer serialize(Clustering clustering, int version, List<AbstractType<?>> types)
+ {
+ try
+ {
+ DataOutputBuffer buffer = new DataOutputBuffer((int)serializedSize(clustering, version, types));
+ serialize(clustering, buffer, version, types);
+ return buffer.buffer();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Writting to an in-memory buffer shouldn't trigger an IOException", e);
+ }
+ }
+
public long serializedSize(Clustering clustering, int version, List<AbstractType<?>> types)
{
return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types);
@@ -151,5 +167,18 @@ public class Clustering extends AbstractClusteringPrefix
ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types);
return new Clustering(values);
}
+
+ public Clustering deserialize(ByteBuffer in, int version, List<AbstractType<?>> types)
+ {
+ try
+ {
+ DataInputBuffer buffer = new DataInputBuffer(in, true);
+ return deserialize(buffer, version, types);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("Reading from an in-memory buffer shouldn't trigger an IOException", e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 2719105..d57bc6b 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -708,7 +708,7 @@ public abstract class LegacyLayout
Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
{
- private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata).iterator();
+ private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata, false).iterator();
// we don't have (and shouldn't have) row markers for compact tables.
private boolean hasReturnedRowMarker = metadata.isCompactTable();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 965e9af..4e96d81 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -155,12 +155,12 @@ public class PartitionRangeReadCommand extends ReadCommand
return StorageProxy.getRangeSlice(this, consistency);
}
- public QueryPager getPager(PagingState pagingState)
+ public QueryPager getPager(PagingState pagingState, int protocolVersion)
{
if (isNamesQuery())
- return new RangeNamesQueryPager(this, pagingState);
+ return new RangeNamesQueryPager(this, pagingState, protocolVersion);
else
- return new RangeSliceQueryPager(this, pagingState);
+ return new RangeSliceQueryPager(this, pagingState, protocolVersion);
}
protected void recordLatency(TableMetrics metric, long latencyNanos)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index 3ad0f82..3abffd5 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -58,7 +58,7 @@ public interface ReadQuery
return DataLimits.cqlLimits(0);
}
- public QueryPager getPager(PagingState state)
+ public QueryPager getPager(PagingState state, int protocolVersion)
{
return QueryPager.EMPTY;
}
@@ -104,10 +104,11 @@ public interface ReadQuery
*
* @param pagingState the {@code PagingState} to start from if this is a paging continuation. This can be
* {@code null} if this is the start of paging.
+ * @param protocolVersion the protocol version to use for the paging state of that pager.
*
* @return a pager for the query.
*/
- public QueryPager getPager(PagingState pagingState);
+ public QueryPager getPager(PagingState pagingState, int protocolVersion);
/**
* The limits for the query.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index 1b41005..430e4a1 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.utils.memory.HeapAllocator;
public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<ClusteringIndexNamesFilter>
{
private int oldestUnrepairedDeletionTime = Integer.MAX_VALUE;
+
protected SinglePartitionNamesCommand(boolean isDigest,
int digestVersion,
boolean isForThrift,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index cd01748..49cf07c 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -230,14 +230,14 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
return StorageProxy.read(Group.one(this), consistency, clientState);
}
- public SinglePartitionPager getPager(PagingState pagingState)
+ public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
{
- return getPager(this, pagingState);
+ return getPager(this, pagingState, protocolVersion);
}
- private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState)
+ private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState, int protocolVersion)
{
- return new SinglePartitionPager(command, pagingState);
+ return new SinglePartitionPager(command, pagingState, protocolVersion);
}
protected void recordLatency(TableMetrics metric, long latencyNanos)
@@ -495,12 +495,12 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
return limits.filter(PartitionIterators.concat(partitions), nowInSec);
}
- public QueryPager getPager(PagingState pagingState)
+ public QueryPager getPager(PagingState pagingState, int protocolVersion)
{
if (commands.size() == 1)
- return SinglePartitionReadCommand.getPager(commands.get(0), pagingState);
+ return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);
- return new MultiPartitionPager(this, pagingState);
+ return new MultiPartitionPager(this, pagingState, protocolVersion);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 1bc1162..a2a8c5f 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -392,9 +392,9 @@ public class BTreeRow extends AbstractRow
((ComplexColumnData) current).setValue(path, value);
}
- public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata)
+ public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean reversed)
{
- return () -> new CellInLegacyOrderIterator(metadata);
+ return () -> new CellInLegacyOrderIterator(metadata, reversed);
}
private class CellIterator extends AbstractIterator<Cell>
@@ -429,15 +429,17 @@ public class BTreeRow extends AbstractRow
private class CellInLegacyOrderIterator extends AbstractIterator<Cell>
{
private final AbstractType<?> comparator;
+ private final boolean reversed;
private final int firstComplexIdx;
private int simpleIdx;
private int complexIdx;
private Iterator<Cell> complexCells;
private final Object[] data;
- private CellInLegacyOrderIterator(CFMetaData metadata)
+ private CellInLegacyOrderIterator(CFMetaData metadata, boolean reversed)
{
this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR);
+ this.reversed = reversed;
// copy btree into array for simple separate iteration of simple and complex columns
this.data = new Object[BTree.size(btree)];
@@ -448,6 +450,36 @@ public class BTreeRow extends AbstractRow
this.complexIdx = firstComplexIdx;
}
+ private int getSimpleIdx()
+ {
+ return reversed ? firstComplexIdx - simpleIdx - 1 : simpleIdx;
+ }
+
+ private int getSimpleIdxAndIncrement()
+ {
+ int idx = getSimpleIdx();
+ ++simpleIdx;
+ return idx;
+ }
+
+ private int getComplexIdx()
+ {
+ return reversed ? data.length - simpleIdx - 1 : simpleIdx;
+ }
+
+ private int getComplexIdxAndIncrement()
+ {
+ int idx = getComplexIdx();
+ ++complexIdx;
+ return idx;
+ }
+
+ private Iterator<Cell> makeComplexIterator(Object complexData)
+ {
+ ComplexColumnData ccd = (ComplexColumnData)complexData;
+ return reversed ? ccd.reverseIterator() : ccd.iterator();
+ }
+
protected Cell computeNext()
{
while (true)
@@ -465,17 +497,17 @@ public class BTreeRow extends AbstractRow
if (complexIdx >= data.length)
return endOfData();
- complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+ complexCells = makeComplexIterator(data[getComplexIdxAndIncrement()]);
}
else
{
if (complexIdx >= data.length)
- return (Cell)data[simpleIdx++];
+ return (Cell)data[getSimpleIdxAndIncrement()];
- if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0)
- return (Cell)data[simpleIdx++];
+ if (comparator.compare(((ColumnData) data[getSimpleIdx()]).column().name.bytes, ((ColumnData) data[getComplexIdx()]).column().name.bytes) < 0)
+ return (Cell)data[getSimpleIdxAndIncrement()];
else
- complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+ complexCells = makeComplexIterator(data[getComplexIdxAndIncrement()]);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 76ab7e7..fab529b 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -102,6 +102,11 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
return BTree.iterator(cells);
}
+ public Iterator<Cell> reverseIterator()
+ {
+ return BTree.iterator(cells, BTree.Dir.DESC);
+ }
+
public int dataSize()
{
int size = complexDeletion.dataSize();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index a80325f..8a67e9b 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -150,9 +150,10 @@ public interface Row extends Unfiltered, Collection<ColumnData>
* legacy order. It's only ever meaningful for backward/thrift compatibility code.
*
* @param metadata the table this is a row of.
+ * @param reversed if cells should returned in reverse order.
* @return an iterable over the cells of this row in "legacy order".
*/
- public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata);
+ public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean reversed);
/**
* Whether the row stores any (non-live) complex deletion for any complex column.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index fb34ca9..28ec489 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.transport.Server;
/**
* A View copies data from a base table into a view table which can be queried independently from the
@@ -481,7 +482,7 @@ public class View
if (!rowSet.hasTombstonedExisting())
{
- QueryPager pager = command.getPager(null);
+ QueryPager pager = command.getPager(null, Server.CURRENT_VERSION);
// Add all of the rows which were recovered from the query to the row set
while (!pager.isExhausted())
@@ -538,7 +539,7 @@ public class View
for (TemporalRow temporalRow : rowSet)
builder.addSlice(temporalRow.baseSlice());
- QueryPager pager = builder.build().getPager(null);
+ QueryPager pager = builder.build().getPager(null, Server.CURRENT_VERSION);
while (!pager.isExhausted())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 62aa332..f0b01c7 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
@@ -76,7 +77,7 @@ public class ViewBuilder extends CompactionInfo.Holder
private void buildKey(DecoratedKey key)
{
- QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null);
+ QueryPager pager = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata, FBUtilities.nowInSeconds(), key).getPager(null, Server.CURRENT_VERSION);
while (!pager.isExhausted())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 9991277..b92d1e1 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -32,6 +32,7 @@ abstract class AbstractQueryPager implements QueryPager
{
protected final ReadCommand command;
protected final DataLimits limits;
+ protected final int protocolVersion;
private int remaining;
@@ -43,9 +44,10 @@ abstract class AbstractQueryPager implements QueryPager
private boolean exhausted;
- protected AbstractQueryPager(ReadCommand command)
+ protected AbstractQueryPager(ReadCommand command, int protocolVersion)
{
this.command = command;
+ this.protocolVersion = protocolVersion;
this.limits = command.limits();
this.remaining = limits.count();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 5d06df7..ee2db9f 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -53,7 +53,7 @@ public class MultiPartitionPager implements QueryPager
private int remaining;
private int current;
- public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state)
+ public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, int protocolVersion)
{
this.limit = group.limits();
this.nowInSec = group.nowInSec();
@@ -74,11 +74,11 @@ public class MultiPartitionPager implements QueryPager
pagers = new SinglePartitionPager[group.commands.size() - i];
// 'i' is on the first non exhausted pager for the previous page (or the first one)
- pagers[0] = group.commands.get(i).getPager(state);
+ pagers[0] = group.commands.get(i).getPager(state, protocolVersion);
// Following ones haven't been started yet
for (int j = i + 1; j < group.commands.size(); j++)
- pagers[j - i] = group.commands.get(j).getPager(null);
+ pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion);
remaining = state == null ? limit.count() : state.remaining;
}
@@ -90,7 +90,7 @@ public class MultiPartitionPager implements QueryPager
return null;
PagingState state = pagers[current].state();
- return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining, Integer.MAX_VALUE);
+ return new PagingState(pagers[current].key(), state == null ? null : state.rowMark, remaining, Integer.MAX_VALUE);
}
public boolean isExhausted()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 685dc3f..542b6d2 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -20,44 +20,72 @@ package org.apache.cassandra.service.pager;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.LegacyLayout;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.ProtocolException;
import org.apache.cassandra.utils.ByteBufferUtil;
public class PagingState
{
- public final ByteBuffer partitionKey;
- public final ByteBuffer cellName;
+ public final ByteBuffer partitionKey; // Can be null for single partition queries.
+ public final RowMark rowMark; // Can be null if not needed.
public final int remaining;
public final int remainingInPartition;
- public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining, int remainingInPartition)
+ public PagingState(ByteBuffer partitionKey, RowMark rowMark, int remaining, int remainingInPartition)
{
- this.partitionKey = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
- this.cellName = cellName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : cellName;
+ this.partitionKey = partitionKey;
+ this.rowMark = rowMark;
this.remaining = remaining;
this.remainingInPartition = remainingInPartition;
}
- public static PagingState deserialize(ByteBuffer bytes)
+ public static PagingState deserialize(ByteBuffer bytes, int protocolVersion)
{
if (bytes == null)
return null;
try
{
- DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(bytes));
- ByteBuffer pk = ByteBufferUtil.readWithShortLength(in);
- ByteBuffer cn = ByteBufferUtil.readWithShortLength(in);
- int remaining = in.readInt();
- // Note that while 'in.available()' is theoretically an estimate of how many bytes are available
- // without blocking, we know that since we're reading a ByteBuffer it will be exactly how many
- // bytes remain to be read. And the reason we want to condition this is for backward compatility
- // as we used to not set this.
- int remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE;
- return new PagingState(pk, cn, remaining, remainingInPartition);
+ DataInputBuffer in = new DataInputBuffer(bytes, true);
+ ByteBuffer pk;
+ RowMark mark;
+ int remaining, remainingInPartition;
+ if (protocolVersion <= Server.VERSION_3)
+ {
+ pk = ByteBufferUtil.readWithShortLength(in);
+ mark = new RowMark(ByteBufferUtil.readWithShortLength(in), protocolVersion);
+ remaining = in.readInt();
+ // Note that while 'in.available()' is theoretically an estimate of how many bytes are available
+ // without blocking, we know that since we're reading a ByteBuffer it will be exactly how many
+ // bytes remain to be read. And the reason we want to condition this is for backward compatility
+ // as we used to not set this.
+ remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE;
+ }
+ else
+ {
+ pk = ByteBufferUtil.readWithVIntLength(in);
+ mark = new RowMark(ByteBufferUtil.readWithVIntLength(in), protocolVersion);
+ remaining = (int)in.readUnsignedVInt();
+ remainingInPartition = (int)in.readUnsignedVInt();
+ }
+ return new PagingState(pk.hasRemaining() ? pk : null,
+ mark.mark.hasRemaining() ? mark : null,
+ remaining,
+ remainingInPartition);
}
catch (IOException e)
{
@@ -65,14 +93,27 @@ public class PagingState
}
}
- public ByteBuffer serialize()
+ public ByteBuffer serialize(int protocolVersion)
{
- try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize()))
+ assert rowMark == null || protocolVersion == rowMark.protocolVersion;
+ try (DataOutputBuffer out = new DataOutputBufferFixed(serializedSize(protocolVersion)))
{
- ByteBufferUtil.writeWithShortLength(partitionKey, out);
- ByteBufferUtil.writeWithShortLength(cellName, out);
- out.writeInt(remaining);
- out.writeInt(remainingInPartition);
+ ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
+ ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
+ if (protocolVersion <= Server.VERSION_3)
+ {
+ ByteBufferUtil.writeWithShortLength(pk, out);
+ ByteBufferUtil.writeWithShortLength(mark, out);
+ out.writeInt(remaining);
+ out.writeInt(remainingInPartition);
+ }
+ else
+ {
+ ByteBufferUtil.writeWithVIntLength(pk, out);
+ ByteBufferUtil.writeWithVIntLength(mark, out);
+ out.writeUnsignedVInt(remaining);
+ out.writeUnsignedVInt(remainingInPartition);
+ }
return out.buffer();
}
catch (IOException e)
@@ -81,11 +122,42 @@ public class PagingState
}
}
- private int serializedSize()
+ public int serializedSize(int protocolVersion)
{
- return 2 + partitionKey.remaining()
- + 2 + cellName.remaining()
- + 8; // remaining & remainingInPartition
+ assert rowMark == null || protocolVersion == rowMark.protocolVersion;
+ ByteBuffer pk = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
+ ByteBuffer mark = rowMark == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : rowMark.mark;
+ if (protocolVersion <= Server.VERSION_3)
+ {
+ return ByteBufferUtil.serializedSizeWithShortLength(pk)
+ + ByteBufferUtil.serializedSizeWithShortLength(mark)
+ + 8; // remaining & remainingInPartition
+ }
+ else
+ {
+ return ByteBufferUtil.serializedSizeWithVIntLength(pk)
+ + ByteBufferUtil.serializedSizeWithVIntLength(mark)
+ + TypeSizes.sizeofUnsignedVInt(remaining)
+ + TypeSizes.sizeofUnsignedVInt(remainingInPartition);
+ }
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ return Objects.hash(partitionKey, rowMark, remaining, remainingInPartition);
+ }
+
+ @Override
+ public final boolean equals(Object o)
+ {
+ if(!(o instanceof PagingState))
+ return false;
+ PagingState that = (PagingState)o;
+ return Objects.equals(this.partitionKey, that.partitionKey)
+ && Objects.equals(this.rowMark, that.rowMark)
+ && this.remaining == that.remaining
+ && this.remainingInPartition == that.remainingInPartition;
}
@Override
@@ -93,8 +165,102 @@ public class PagingState
{
return String.format("PagingState(key=%s, cellname=%s, remaining=%d, remainingInPartition=%d",
ByteBufferUtil.bytesToHex(partitionKey),
- ByteBufferUtil.bytesToHex(cellName),
+ rowMark,
remaining,
remainingInPartition);
}
+
+ /**
+ * Marks the last row returned by paging, the one from which paging should continue.
+ * This class essentially holds a row clustering, but due to backward compatibility reasons,
+ * we need to actually store the cell name for the last cell of the row we're marking when
+ * the protocol v3 is in use, and this class abstract that complication.
+ *
+ * See CASSANDRA-10254 for more details.
+ */
+ public static class RowMark
+ {
+ // This can be null for convenience if no row is marked.
+ private final ByteBuffer mark;
+ private final int protocolVersion;
+
+ private RowMark(ByteBuffer mark, int protocolVersion)
+ {
+ this.mark = mark;
+ this.protocolVersion = protocolVersion;
+ }
+
+ private static List<AbstractType<?>> makeClusteringTypes(CFMetaData metadata)
+ {
+ // This is the types that will be used when serializing the clustering in the paging state. We can't really use the actual clustering
+ // types however because we can't guarantee that there won't be a schema change between when we send the paging state and get it back,
+ // and said schema change could theoretically change one of the clustering types from a fixed width type to a non-fixed one
+ // (say timestamp -> blob). So we simply use a list of BytesTypes (for both reading and writting), which may be slightly inefficient
+ // for fixed-width types, but avoid any risk during schema changes.
+ int size = metadata.clusteringColumns().size();
+ List<AbstractType<?>> l = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ l.add(BytesType.instance);
+ return l;
+ }
+
+ public static RowMark create(CFMetaData metadata, Row row, int protocolVersion)
+ {
+ ByteBuffer mark;
+ if (protocolVersion <= Server.VERSION_3)
+ {
+ // We need to be backward compatible with 2.1/2.2 nodes paging states. Which means we have to send
+ // the full cellname of the "last" cell in the row we get (since that's how 2.1/2.2 nodes will start after
+ // that last row if they get that paging state).
+ Iterator<Cell> cells = row.cellsInLegacyOrder(metadata, true).iterator();
+ if (!cells.hasNext())
+ {
+ mark = LegacyLayout.encodeClustering(metadata, row.clustering());
+ }
+ else
+ {
+ Cell cell = cells.next();
+ mark = LegacyLayout.encodeCellName(metadata, row.clustering(), cell.column().name.bytes, cell.column().isComplex() ? cell.path().get(0) : null);
+ }
+ }
+ else
+ {
+ // We froze the serialization version to 3.0 as we need to make this this doesn't change (that is, it has to be
+ // fix for a given version of the protocol).
+ mark = Clustering.serializer.serialize(row.clustering(), MessagingService.VERSION_30, makeClusteringTypes(metadata));
+ }
+ return new RowMark(mark, protocolVersion);
+ }
+
+ public Clustering clustering(CFMetaData metadata)
+ {
+ if (mark == null)
+ return null;
+
+ return protocolVersion <= Server.VERSION_3
+ ? LegacyLayout.decodeClustering(metadata, mark)
+ : Clustering.serializer.deserialize(mark, MessagingService.VERSION_30, makeClusteringTypes(metadata));
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ return Objects.hash(mark, protocolVersion);
+ }
+
+ @Override
+ public final boolean equals(Object o)
+ {
+ if(!(o instanceof RowMark))
+ return false;
+ RowMark that = (RowMark)o;
+ return Objects.equals(this.mark, that.mark) && this.protocolVersion == that.protocolVersion;
+ }
+
+ @Override
+ public String toString()
+ {
+ return ByteBufferUtil.bytesToHex(mark);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 618ca32..eee94e6 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.Server;
/**
* Static utility methods for paging.
@@ -47,7 +48,7 @@ public class QueryPagers
boolean isForThrift) throws RequestValidationException, RequestExecutionException
{
SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
- final SinglePartitionPager pager = new SinglePartitionPager(command, null);
+ final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION);
int count = 0;
while (!pager.isExhausted())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
index e085490..9801565 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -35,9 +35,9 @@ public class RangeNamesQueryPager extends AbstractQueryPager
{
private volatile DecoratedKey lastReturnedKey;
- public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state)
+ public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion)
{
- super(command);
+ super(command, protocolVersion);
assert command.isNamesQuery();
if (state != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 87eb018..770875a 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -39,19 +39,17 @@ public class RangeSliceQueryPager extends AbstractQueryPager
private static final Logger logger = LoggerFactory.getLogger(RangeSliceQueryPager.class);
private volatile DecoratedKey lastReturnedKey;
- private volatile Clustering lastReturnedClustering;
+ private volatile PagingState.RowMark lastReturnedRow;
- public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state)
+ public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion)
{
- super(command);
+ super(command, protocolVersion);
assert !command.isNamesQuery();
if (state != null)
{
lastReturnedKey = command.metadata().decorateKey(state.partitionKey);
- lastReturnedClustering = state.cellName.hasRemaining()
- ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
- : null;
+ lastReturnedRow = state.rowMark;
restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
}
}
@@ -60,7 +58,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
{
return lastReturnedKey == null
? null
- : new PagingState(lastReturnedKey.getKey(), LegacyLayout.encodeClustering(command.metadata(), lastReturnedClustering), maxRemaining(), remainingInPartition());
+ : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition());
}
protected ReadCommand nextPageReadCommand(int pageSize)
@@ -81,7 +79,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey);
if (includeLastKey)
{
- pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedClustering, false);
+ pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false);
limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
}
else
@@ -101,7 +99,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
if (last != null)
{
lastReturnedKey = key;
- lastReturnedClustering = last.clustering();
+ lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 28c5206..7057e79 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -37,18 +37,16 @@ public class SinglePartitionPager extends AbstractQueryPager
private final SinglePartitionReadCommand<?> command;
- private volatile Clustering lastReturned;
+ private volatile PagingState.RowMark lastReturned;
- public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state)
+ public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state, int protocolVersion)
{
- super(command);
+ super(command, protocolVersion);
this.command = command;
if (state != null)
{
- lastReturned = state.cellName.hasRemaining()
- ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
- : null;
+ lastReturned = state.rowMark;
restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
}
}
@@ -67,18 +65,18 @@ public class SinglePartitionPager extends AbstractQueryPager
{
return lastReturned == null
? null
- : new PagingState(null, LegacyLayout.encodeClustering(command.metadata(), lastReturned), maxRemaining(), remainingInPartition());
+ : new PagingState(null, lastReturned, maxRemaining(), remainingInPartition());
}
protected ReadCommand nextPageReadCommand(int pageSize)
{
- return command.forPaging(lastReturned, pageSize);
+ return command.forPaging(lastReturned == null ? null : lastReturned.clustering(command.metadata()), pageSize);
}
protected void recordLast(DecoratedKey key, Row last)
{
if (last != null)
- lastReturned = last.clustering();
+ lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion);
}
protected boolean isPreviouslyReturnedPartition(DecoratedKey key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 92e2891..800a9a8 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -430,6 +430,13 @@ public abstract class CBUtil
return 4 + (bytes == null ? 0 : bytes.remaining());
}
+ // The size of serializing a value given the size (in bytes) of said value. The provided size can be negative
+ // to indicate that the value is null.
+ public static int sizeOfValue(int valueSize)
+ {
+ return 4 + (valueSize < 0 ? 0 : valueSize);
+ }
+
public static List<ByteBuffer> readValueList(ByteBuf cb, int protocolVersion)
{
int size = cb.readUnsignedShort();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/test/unit/org/apache/cassandra/service/QueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
index 228b2a9..0f79e84 100644
--- a/test/unit/org/apache/cassandra/service/QueryPagerTest.java
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -37,8 +37,10 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.PagingState;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.transport.Server;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -205,10 +207,19 @@ public class QueryPagerTest
}
}
+ private QueryPager maybeRecreate(QueryPager pager, ReadQuery command, boolean testPagingState, int protocolVersion)
+ {
+ if (!testPagingState)
+ return pager;
+
+ PagingState state = PagingState.deserialize(pager.state().serialize(protocolVersion), protocolVersion);
+ return command.getPager(state, protocolVersion);
+ }
+
@Test
public void namesQueryTest() throws Exception
{
- QueryPager pager = namesQuery("k0", "c1", "c5", "c7", "c8").getPager(null);
+ QueryPager pager = namesQuery("k0", "c1", "c5", "c7", "c8").getPager(null, Server.CURRENT_VERSION);
assertFalse(pager.isExhausted());
List<FilteredPartition> partition = query(pager, 5, 4);
@@ -220,16 +231,29 @@ public class QueryPagerTest
@Test
public void sliceQueryTest() throws Exception
{
- QueryPager pager = sliceQuery("k0", "c1", "c8", 10).getPager(null);
+ sliceQueryTest(false, Server.VERSION_3);
+ sliceQueryTest(true, Server.VERSION_4);
+ sliceQueryTest(false, Server.VERSION_3);
+ sliceQueryTest(true, Server.VERSION_4);
+ }
+
+ public void sliceQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+ {
+ ReadCommand command = sliceQuery("k0", "c1", "c8", 10);
+ QueryPager pager = command.getPager(null, protocolVersion);
assertFalse(pager.isExhausted());
List<FilteredPartition> partition = query(pager, 3);
assertRow(partition.get(0), "k0", "c1", "c2", "c3");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partition = query(pager, 3);
assertRow(partition.get(0), "k0", "c4", "c5", "c6");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partition = query(pager, 3, 2);
assertRow(partition.get(0), "k0", "c7", "c8");
@@ -240,16 +264,29 @@ public class QueryPagerTest
@Test
public void reversedSliceQueryTest() throws Exception
{
- QueryPager pager = sliceQuery("k0", "c1", "c8", true, 10).getPager(null);
+ reversedSliceQueryTest(false, Server.VERSION_3);
+ reversedSliceQueryTest(true, Server.VERSION_4);
+ reversedSliceQueryTest(false, Server.VERSION_3);
+ reversedSliceQueryTest(true, Server.VERSION_4);
+ }
+
+ public void reversedSliceQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+ {
+ ReadCommand command = sliceQuery("k0", "c1", "c8", true, 10);
+ QueryPager pager = command.getPager(null, protocolVersion);
assertFalse(pager.isExhausted());
List<FilteredPartition> partition = query(pager, 3);
assertRow(partition.get(0), "k0", "c6", "c7", "c8");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partition = query(pager, 3);
assertRow(partition.get(0), "k0", "c3", "c4", "c5");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partition = query(pager, 3, 2);
assertRow(partition.get(0), "k0", "c1", "c2");
@@ -260,21 +297,34 @@ public class QueryPagerTest
@Test
public void multiQueryTest() throws Exception
{
- QueryPager pager = new SinglePartitionReadCommand.Group(new ArrayList<SinglePartitionReadCommand<?>>()
+ multiQueryTest(false, Server.VERSION_3);
+ multiQueryTest(true, Server.VERSION_4);
+ multiQueryTest(false, Server.VERSION_3);
+ multiQueryTest(true, Server.VERSION_4);
+ }
+
+ public void multiQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+ {
+ ReadQuery command = new SinglePartitionReadCommand.Group(new ArrayList<SinglePartitionReadCommand<?>>()
{{
add(sliceQuery("k1", "c2", "c6", 10));
add(sliceQuery("k4", "c3", "c5", 10));
- }}, DataLimits.NONE).getPager(null);
+ }}, DataLimits.NONE);
+ QueryPager pager = command.getPager(null, protocolVersion);
assertFalse(pager.isExhausted());
List<FilteredPartition> partition = query(pager, 3);
assertRow(partition.get(0), "k1", "c2", "c3", "c4");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partition = query(pager , 4);
assertRow(partition.get(0), "k1", "c5", "c6");
assertRow(partition.get(1), "k4", "c3", "c4");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partition = query(pager, 3, 1);
assertRow(partition.get(0), "k4", "c5");
@@ -285,13 +335,24 @@ public class QueryPagerTest
@Test
public void rangeNamesQueryTest() throws Exception
{
- QueryPager pager = rangeNamesQuery("k0", "k5", 100, "c1", "c4", "c8").getPager(null);
+ rangeNamesQueryTest(false, Server.VERSION_3);
+ rangeNamesQueryTest(true, Server.VERSION_4);
+ rangeNamesQueryTest(false, Server.VERSION_3);
+ rangeNamesQueryTest(true, Server.VERSION_4);
+ }
+
+ public void rangeNamesQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+ {
+ ReadCommand command = rangeNamesQuery("k0", "k5", 100, "c1", "c4", "c8");
+ QueryPager pager = command.getPager(null, protocolVersion);
assertFalse(pager.isExhausted());
List<FilteredPartition> partitions = query(pager, 3 * 3);
for (int i = 1; i <= 3; i++)
assertRow(partitions.get(i-1), "k" + i, "c1", "c4", "c8");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partitions = query(pager, 3 * 3, 2 * 3);
for (int i = 4; i <= 5; i++)
@@ -303,31 +364,50 @@ public class QueryPagerTest
@Test
public void rangeSliceQueryTest() throws Exception
{
- QueryPager pager = rangeSliceQuery("k1", "k5", 100, "c1", "c7").getPager(null);
+ rangeSliceQueryTest(false, Server.VERSION_3);
+ rangeSliceQueryTest(true, Server.VERSION_4);
+ rangeSliceQueryTest(false, Server.VERSION_3);
+ rangeSliceQueryTest(true, Server.VERSION_4);
+ }
+
+ public void rangeSliceQueryTest(boolean testPagingState, int protocolVersion) throws Exception
+ {
+ ReadCommand command = rangeSliceQuery("k1", "k5", 100, "c1", "c7");
+ QueryPager pager = command.getPager(null, protocolVersion);
assertFalse(pager.isExhausted());
List<FilteredPartition> partitions = query(pager, 5);
assertRow(partitions.get(0), "k2", "c1", "c2", "c3", "c4", "c5");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partitions = query(pager, 4);
assertRow(partitions.get(0), "k2", "c6", "c7");
assertRow(partitions.get(1), "k3", "c1", "c2");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partitions = query(pager, 6);
assertRow(partitions.get(0), "k3", "c3", "c4", "c5", "c6", "c7");
assertRow(partitions.get(1), "k4", "c1");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partitions = query(pager, 5);
assertRow(partitions.get(0), "k4", "c2", "c3", "c4", "c5", "c6");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partitions = query(pager, 5);
assertRow(partitions.get(0), "k4", "c7");
assertRow(partitions.get(1), "k5", "c1", "c2", "c3", "c4");
+ assertFalse(pager.isExhausted());
+ pager = maybeRecreate(pager, command, testPagingState, protocolVersion);
assertFalse(pager.isExhausted());
partitions = query(pager, 5, 3);
assertRow(partitions.get(0), "k5", "c5", "c6", "c7");
@@ -335,7 +415,6 @@ public class QueryPagerTest
assertTrue(pager.isExhausted());
}
-
@Test
public void SliceQueryWithTombstoneTest() throws Exception
{
@@ -350,7 +429,7 @@ public class QueryPagerTest
ReadCommand command = SinglePartitionSliceCommand.create(cfs.metadata, FBUtilities.nowInSeconds(), Util.dk("k0"), Slice.ALL);
- QueryPager pager = command.getPager(null);
+ QueryPager pager = command.getPager(null, Server.CURRENT_VERSION);
for (int i = 0; i < 5; i++)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b99c8631/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java b/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
new file mode 100644
index 0000000..ba82e85
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
@@ -0,0 +1,99 @@
+
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PagingStateTest
+{
+ private PagingState makeSomePagingState(int protocolVersion)
+ {
+ CFMetaData metadata = CFMetaData.Builder.create("ks", "tbl")
+ .addPartitionKey("k", AsciiType.instance)
+ .addClusteringColumn("c1", AsciiType.instance)
+ .addClusteringColumn("c1", Int32Type.instance)
+ .addRegularColumn("myCol", AsciiType.instance)
+ .build();
+
+ ByteBuffer pk = ByteBufferUtil.bytes("someKey");
+
+ ColumnDefinition def = metadata.getColumnDefinition(new ColumnIdentifier("myCol", false));
+ Clustering c = new Clustering(ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes(42));
+ Row row = BTreeRow.singleCellRow(c, BufferCell.live(metadata, def, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, protocolVersion);
+ return new PagingState(pk, mark, 10, 0);
+ }
+
+ @Test
+ public void testSerializationBackwardCompatibility()
+ {
+ /*
+ * Tests that the serialized paging state for the native protocol V3 is backward compatible
+ * with what old nodes generate. For that, it compares the serialized format to the hard-coded
+ * value of the same state generated on a 2.1. For the curious, said hardcoded value has been
+ * generated by the following code:
+ * ByteBuffer pk = ByteBufferUtil.bytes("someKey");
+ * CellName cn = CellNames.compositeSparse(new ByteBuffer[]{ ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes(42) },
+ * new ColumnIdentifier("myCol", false),
+ * false);
+ * PagingState state = new PagingState(pk, cn.toByteBuffer(), 10);
+ * System.out.println("PagingState = " + ByteBufferUtil.bytesToHex(state.serialize()));
+ */
+ PagingState state = makeSomePagingState(Server.VERSION_3);
+
+ String serializedState = ByteBufferUtil.bytesToHex(state.serialize(Server.VERSION_3));
+ // Note that we don't assert exact equality because we know 3.0 nodes include the "remainingInPartition" number
+ // that is not present on 2.1/2.2 nodes. We know this is ok however because we know that 2.1/2.2 nodes will ignore
+ // anything remaining once they have properly deserialized a paging state.
+ assertTrue(serializedState.startsWith("0007736f6d654b65790014000263310000040000002a0000056d79636f6c000000000a"));
+ }
+
+ @Test
+ public void testSerializeDeserializeV3()
+ {
+ PagingState state = makeSomePagingState(Server.VERSION_3);
+ ByteBuffer serialized = state.serialize(Server.VERSION_3);
+ assertEquals(serialized.remaining(), state.serializedSize(Server.VERSION_3));
+ assertEquals(state, PagingState.deserialize(serialized, Server.VERSION_3));
+ }
+
+ @Test
+ public void testSerializeDeserializeV4()
+ {
+ PagingState state = makeSomePagingState(Server.VERSION_4);
+ ByteBuffer serialized = state.serialize(Server.VERSION_4);
+ assertEquals(serialized.remaining(), state.serializedSize(Server.VERSION_4));
+ assertEquals(state, PagingState.deserialize(serialized, Server.VERSION_4));
+ }
+}
[2/2] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9967303f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9967303f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9967303f
Branch: refs/heads/trunk
Commit: 9967303f6e22db40400163d0625f15af74de9a05
Parents: bb82624 b99c863
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Sep 18 13:24:15 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 18 13:24:15 2015 +0200
----------------------------------------------------------------------
doc/native_protocol_v3.spec | 3 +
doc/native_protocol_v4.spec | 5 +
.../org/apache/cassandra/cql3/QueryOptions.java | 6 +-
.../apache/cassandra/cql3/QueryProcessor.java | 3 +-
.../org/apache/cassandra/cql3/ResultSet.java | 6 +-
.../cql3/statements/SelectStatement.java | 4 +-
.../org/apache/cassandra/db/Clustering.java | 29 +++
.../org/apache/cassandra/db/LegacyLayout.java | 2 +-
.../cassandra/db/PartitionRangeReadCommand.java | 6 +-
src/java/org/apache/cassandra/db/ReadQuery.java | 5 +-
.../db/SinglePartitionNamesCommand.java | 1 +
.../db/SinglePartitionReadCommand.java | 14 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 48 +++-
.../cassandra/db/rows/ComplexColumnData.java | 5 +
src/java/org/apache/cassandra/db/rows/Row.java | 3 +-
src/java/org/apache/cassandra/db/view/View.java | 5 +-
.../apache/cassandra/db/view/ViewBuilder.java | 3 +-
.../service/pager/AbstractQueryPager.java | 4 +-
.../service/pager/MultiPartitionPager.java | 8 +-
.../cassandra/service/pager/PagingState.java | 220 ++++++++++++++++---
.../cassandra/service/pager/QueryPagers.java | 3 +-
.../service/pager/RangeNamesQueryPager.java | 4 +-
.../service/pager/RangeSliceQueryPager.java | 16 +-
.../service/pager/SinglePartitionPager.java | 16 +-
.../org/apache/cassandra/transport/CBUtil.java | 7 +
.../cassandra/service/QueryPagerTest.java | 97 +++++++-
.../service/pager/PagingStateTest.java | 99 +++++++++
27 files changed, 526 insertions(+), 96 deletions(-)
----------------------------------------------------------------------