You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 10:32:39 UTC
[1/4] Add auto paging capability to the native protocol
Updated Branches:
refs/heads/trunk 40bc4456f -> e48ff2938
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index e334b02..860c404 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+
import com.google.common.collect.ImmutableMap;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -44,6 +45,7 @@ public class QueryMessage extends Message.Request
{
String query = CBUtil.readLongString(body);
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+ int resultPageSize = body.readInt();
List<ByteBuffer> values;
if (body.readable())
{
@@ -56,10 +58,10 @@ public class QueryMessage extends Message.Request
{
values = Collections.emptyList();
}
- return new QueryMessage(query, values, consistency);
+ return new QueryMessage(query, values, consistency, resultPageSize);
}
- public ChannelBuffer encode(QueryMessage msg)
+ public ChannelBuffer encode(QueryMessage msg, int version)
{
// We have:
// - query
@@ -68,11 +70,13 @@ public class QueryMessage extends Message.Request
// - Number of values
// - The values
int vs = msg.values.size();
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
+ CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3 + (vs > 0 ? 1 : 0), 0, vs);
builder.add(CBUtil.longStringToCB(msg.query));
builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
- if (vs > 0 && msg.getVersion() > 1)
+ builder.add(CBUtil.intToCB(msg.resultPageSize));
+ if (vs > 0)
{
+ assert version > 1 : "Version 1 of the protocol do not allow values";
builder.add(CBUtil.shortToCB(vs));
for (ByteBuffer value : msg.values)
builder.addValue(value);
@@ -83,30 +87,35 @@ public class QueryMessage extends Message.Request
public final String query;
public final ConsistencyLevel consistency;
+ public final int resultPageSize;
public final List<ByteBuffer> values;
public QueryMessage(String query, ConsistencyLevel consistency)
{
- this(query, Collections.<ByteBuffer>emptyList(), consistency);
+ this(query, Collections.<ByteBuffer>emptyList(), consistency, -1);
}
- public QueryMessage(String query, List<ByteBuffer> values, ConsistencyLevel consistency)
+ public QueryMessage(String query, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
{
super(Type.QUERY);
this.query = query;
+ this.resultPageSize = resultPageSize;
this.consistency = consistency;
this.values = values;
}
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
{
try
{
+ if (resultPageSize == 0)
+ throw new ProtocolException("The page size cannot be 0");
+
UUID tracingId = null;
if (isTracingRequested())
{
@@ -117,10 +126,16 @@ public class QueryMessage extends Message.Request
if (state.traceNextQuery())
{
state.createTracingSession();
- Tracing.instance.begin("Execute CQL3 query", ImmutableMap.of("query", query));
+
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ builder.put("query", query);
+ if (resultPageSize > 0)
+ builder.put("page_size", Integer.toString(resultPageSize));
+
+ Tracing.instance.begin("Execute CQL3 query", builder.build());
}
- Message.Response response = QueryProcessor.process(query, values, consistency, state);
+ Message.Response response = QueryProcessor.process(query, values, consistency, state, resultPageSize);
if (tracingId != null)
response.setTracingId(tracingId);
@@ -136,6 +151,9 @@ public class QueryMessage extends Message.Request
finally
{
Tracing.instance.stopSession();
+ // Trash the current session id if we won't need it anymore
+ if (!state.hasPager())
+ state.getAndResetCurrentTracingSession();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
index 63899e1..414fdd3 100644
--- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -34,7 +34,7 @@ public class ReadyMessage extends Message.Response
return new ReadyMessage();
}
- public ChannelBuffer encode(ReadyMessage msg)
+ public ChannelBuffer encode(ReadyMessage msg, int version)
{
return ChannelBuffers.EMPTY_BUFFER;
}
@@ -47,7 +47,7 @@ public class ReadyMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index 9969e3a..a6816fb 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -39,7 +39,7 @@ public class RegisterMessage extends Message.Request
return new RegisterMessage(eventTypes);
}
- public ChannelBuffer encode(RegisterMessage msg)
+ public ChannelBuffer encode(RegisterMessage msg, int version)
{
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeShort(msg.eventTypes.size());
@@ -69,7 +69,7 @@ public class RegisterMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index fcff0a8..33975a4 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -40,7 +40,7 @@ public abstract class ResultMessage extends Message.Response
return kind.subcodec.decode(body, version);
}
- public ChannelBuffer encode(ResultMessage msg)
+ public ChannelBuffer encode(ResultMessage msg, int version)
{
ChannelBuffer kcb = ChannelBuffers.buffer(4);
kcb.writeInt(msg.kind.id);
@@ -101,7 +101,7 @@ public abstract class ResultMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
protected abstract ChannelBuffer encodeBody();
@@ -124,7 +124,7 @@ public abstract class ResultMessage extends Message.Response
return new Void();
}
- public ChannelBuffer encode(ResultMessage msg)
+ public ChannelBuffer encode(ResultMessage msg, int version)
{
assert msg instanceof Void;
return ChannelBuffers.EMPTY_BUFFER;
@@ -133,7 +133,7 @@ public abstract class ResultMessage extends Message.Response
protected ChannelBuffer encodeBody()
{
- return subcodec.encode(this);
+ return subcodec.encode(this, getVersion());
}
public CqlResult toThriftResult()
@@ -166,7 +166,7 @@ public abstract class ResultMessage extends Message.Response
return new SetKeyspace(keyspace);
}
- public ChannelBuffer encode(ResultMessage msg)
+ public ChannelBuffer encode(ResultMessage msg, int version)
{
assert msg instanceof SetKeyspace;
return CBUtil.stringToCB(((SetKeyspace)msg).keyspace);
@@ -175,7 +175,7 @@ public abstract class ResultMessage extends Message.Response
protected ChannelBuffer encodeBody()
{
- return subcodec.encode(this);
+ return subcodec.encode(this, getVersion());
}
public CqlResult toThriftResult()
@@ -199,11 +199,11 @@ public abstract class ResultMessage extends Message.Response
return new Rows(ResultSet.codec.decode(body, version));
}
- public ChannelBuffer encode(ResultMessage msg)
+ public ChannelBuffer encode(ResultMessage msg, int version)
{
assert msg instanceof Rows;
Rows rowMsg = (Rows)msg;
- return ResultSet.codec.encode(rowMsg.result);
+ return ResultSet.codec.encode(rowMsg.result, version);
}
};
@@ -217,7 +217,7 @@ public abstract class ResultMessage extends Message.Response
protected ChannelBuffer encodeBody()
{
- return subcodec.encode(this);
+ return subcodec.encode(this, getVersion());
}
public CqlResult toThriftResult()
@@ -243,12 +243,12 @@ public abstract class ResultMessage extends Message.Response
return new Prepared(id, -1, ResultSet.Metadata.codec.decode(body, version));
}
- public ChannelBuffer encode(ResultMessage msg)
+ public ChannelBuffer encode(ResultMessage msg, int version)
{
assert msg instanceof Prepared;
Prepared prepared = (Prepared)msg;
assert prepared.statementId != null;
- return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes), ResultSet.Metadata.codec.encode(prepared.metadata));
+ return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes), ResultSet.Metadata.codec.encode(prepared.metadata, version));
}
};
@@ -278,7 +278,7 @@ public abstract class ResultMessage extends Message.Response
protected ChannelBuffer encodeBody()
{
- return subcodec.encode(this);
+ return subcodec.encode(this, getVersion());
}
public CqlResult toThriftResult()
@@ -347,7 +347,7 @@ public abstract class ResultMessage extends Message.Response
}
- public ChannelBuffer encode(ResultMessage msg)
+ public ChannelBuffer encode(ResultMessage msg, int version)
{
assert msg instanceof SchemaChange;
SchemaChange scm = (SchemaChange)msg;
@@ -361,7 +361,7 @@ public abstract class ResultMessage extends Message.Response
protected ChannelBuffer encodeBody()
{
- return subcodec.encode(this);
+ return subcodec.encode(this, getVersion());
}
public CqlResult toThriftResult()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 66b245b..ea6ae99 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -45,7 +45,7 @@ public class StartupMessage extends Message.Request
return new StartupMessage(CBUtil.readStringMap(body));
}
- public ChannelBuffer encode(StartupMessage msg)
+ public ChannelBuffer encode(StartupMessage msg, int version)
{
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
CBUtil.writeStringMap(cb, msg.options);
@@ -63,7 +63,7 @@ public class StartupMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index 8f7873d..7318112 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -38,7 +38,7 @@ public class SupportedMessage extends Message.Response
return new SupportedMessage(CBUtil.readStringToStringListMap(body));
}
- public ChannelBuffer encode(SupportedMessage msg)
+ public ChannelBuffer encode(SupportedMessage msg, int version)
{
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
CBUtil.writeStringToStringListMap(cb, msg.supported);
@@ -56,7 +56,7 @@ public class SupportedMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 39802cc..e5ee5f3 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -1043,9 +1043,15 @@ public class ColumnFamilyStoreTest extends SchemaLoader
for (int i = 0; i < 4; i++)
cols[i] = column("c" + i, "value", 1);
- putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3]);
- putColsStandard(cfs, Util.dk("b"), cols[0], cols[1], cols[2]);
- putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]);
+ DecoratedKey ka = Util.dk("a");
+ DecoratedKey kb = Util.dk("b");
+ DecoratedKey kc = Util.dk("c");
+
+ RowPosition min = Util.rp("");
+
+ putColsStandard(cfs, ka, cols[0], cols[1], cols[2], cols[3]);
+ putColsStandard(cfs, kb, cols[0], cols[1], cols[2]);
+ putColsStandard(cfs, kc, cols[0], cols[1], cols[2], cols[3]);
cfs.forceBlockingFlush();
SlicePredicate sp = new SlicePredicate();
@@ -1054,58 +1060,87 @@ public class ColumnFamilyStoreTest extends SchemaLoader
sp.getSlice_range().setStart(ArrayUtils.EMPTY_BYTE_ARRAY);
sp.getSlice_range().setFinish(ArrayUtils.EMPTY_BYTE_ARRAY);
- Collection<Row> rows = cfs.getRangeSlice(Util.range("", ""),
- null,
- ThriftValidation.asIFilter(sp, cfs.metadata, null),
- 3,
- System.currentTimeMillis(),
- true,
- true);
- assert rows.size() == 1 : "Expected 1 row, got " + rows;
- Row row = rows.iterator().next();
+ Collection<Row> rows;
+ Row row, row1, row2;
+ IDiskAtomFilter filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+
+ rows = cfs.getRangeSlice(cfs.makeExtendedFilter(Util.range("", ""), filter, null, 3, true, true, System.currentTimeMillis()));
+ assert rows.size() == 1 : "Expected 1 row, got " + toString(rows);
+ row = rows.iterator().next();
assertColumnNames(row, "c0", "c1", "c2");
sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
- rows = cfs.getRangeSlice(Util.range("", ""),
- null,
- ThriftValidation.asIFilter(sp, cfs.metadata, null),
- 3,
- System.currentTimeMillis(),
- true,
- true);
- assert rows.size() == 2 : "Expected 2 rows, got " + rows;
+ filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+ rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(ka, min), filter, null, 3, true, true, System.currentTimeMillis()));
+ assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
Iterator<Row> iter = rows.iterator();
- Row row1 = iter.next();
- Row row2 = iter.next();
+ row1 = iter.next();
+ row2 = iter.next();
assertColumnNames(row1, "c2", "c3");
assertColumnNames(row2, "c0");
sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c0")));
- rows = cfs.getRangeSlice(new Bounds<RowPosition>(row2.key, Util.rp("")),
- null,
- ThriftValidation.asIFilter(sp, cfs.metadata, null),
- 3,
- System.currentTimeMillis(),
- true,
- true);
- assert rows.size() == 1 : "Expected 1 row, got " + rows;
+ filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+ rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(row2.key, min), filter, null, 3, true, true, System.currentTimeMillis()));
+ assert rows.size() == 1 : "Expected 1 row, got " + toString(rows);
row = rows.iterator().next();
assertColumnNames(row, "c0", "c1", "c2");
sp.getSlice_range().setStart(ByteBufferUtil.getArray(ByteBufferUtil.bytes("c2")));
- rows = cfs.getRangeSlice(new Bounds<RowPosition>(row.key, Util.rp("")),
- null,
- ThriftValidation.asIFilter(sp, cfs.metadata, null),
- 3,
- System.currentTimeMillis(),
- true,
- true);
- assert rows.size() == 2 : "Expected 2 rows, got " + rows;
+ filter = ThriftValidation.asIFilter(sp, cfs.metadata, null);
+ rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(row.key, min), filter, null, 3, true, true, System.currentTimeMillis()));
+ assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
iter = rows.iterator();
row1 = iter.next();
row2 = iter.next();
assertColumnNames(row1, "c2");
assertColumnNames(row2, "c0", "c1");
+
+ // Paging within bounds
+ SliceQueryFilter sf = new SliceQueryFilter(ByteBufferUtil.bytes("c1"),
+ ByteBufferUtil.bytes("c2"),
+ false,
+ 0);
+ rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(ka, kc), sf, ByteBufferUtil.bytes("c2"), ByteBufferUtil.bytes("c1"), null, 2, System.currentTimeMillis()));
+ assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
+ iter = rows.iterator();
+ row1 = iter.next();
+ row2 = iter.next();
+ assertColumnNames(row1, "c2");
+ assertColumnNames(row2, "c1");
+
+ rows = cfs.getRangeSlice(cfs.makeExtendedFilter(new Bounds<RowPosition>(kb, kc), sf, ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes("c1"), null, 10, System.currentTimeMillis()));
+ assert rows.size() == 2 : "Expected 2 rows, got " + toString(rows);
+ iter = rows.iterator();
+ row1 = iter.next();
+ row2 = iter.next();
+ assertColumnNames(row1, "c1", "c2");
+ assertColumnNames(row2, "c1");
+ }
+
+ private static String toString(Collection<Row> rows)
+ {
+ try
+ {
+ StringBuilder sb = new StringBuilder();
+ for (Row row : rows)
+ {
+ sb.append("{");
+ sb.append(ByteBufferUtil.string(row.key.key));
+ sb.append(":");
+ if (row.cf != null && !row.cf.isEmpty())
+ {
+ for (Column c : row.cf)
+ sb.append(" ").append(ByteBufferUtil.string(c.name()));
+ }
+ sb.append("} ");
+ }
+ return sb.toString();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
private static void assertColumnNames(Row row, String ... columnNames) throws Exception
@@ -1205,7 +1240,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
IndexExpression expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexOperator.EQ, LongType.instance.decompose(1L));
// explicitly tell to the KeysSearcher to use column limiting for rowsPerQuery to trigger bogus columnsRead--; (CASSANDRA-3996)
- List<Row> rows = store.search(Util.range("", ""), Arrays.asList(expr), new IdentityQueryFilter(), 10, System.currentTimeMillis(), true);
+ List<Row> rows = store.search(store.makeExtendedFilter(Util.range("", ""), new IdentityQueryFilter(), Arrays.asList(expr), 10, true, false, System.currentTimeMillis()));
assert rows.size() == 10;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 1fe7a46..790163d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
@@ -147,7 +148,8 @@ public class CompactionsTest extends SchemaLoader
// check that the shadowed column is gone
SSTableReader sstable = cfs.getSSTables().iterator().next();
- SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter(), System.currentTimeMillis()), key);
+ Range keyRange = new Range<RowPosition>(key, sstable.partitioner.getMinimumToken().maxKeyBound());
+ SSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
OnDiskAtomIterator iter = scanner.next();
assertEquals(key, iter.getKey());
assert iter.next() instanceof RangeTombstone;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index c14e860..8c4c305 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -30,8 +30,10 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
@@ -140,7 +142,7 @@ public class TTLExpiryTest extends SchemaLoader
cfs.enableAutoCompaction(true);
assertEquals(1, cfs.getSSTables().size());
SSTableReader sstable = cfs.getSSTables().iterator().next();
- SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Standard1", new IdentityQueryFilter(), System.currentTimeMillis()));
+ SSTableScanner scanner = sstable.getScanner(DataRange.allData(sstable.partitioner));
assertTrue(scanner.hasNext());
while(scanner.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/test/unit/org/apache/cassandra/service/QueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/QueryPagerTest.java b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
new file mode 100644
index 0000000..1d18321
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/QueryPagerTest.java
@@ -0,0 +1,283 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service;
+
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static junit.framework.Assert.*;
+import static org.apache.cassandra.Util.bounds;
+import static org.apache.cassandra.Util.range;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class QueryPagerTest extends SchemaLoader
+{
+ private static final String KS = "Keyspace1";
+ private static final String CF = "Standard1";
+
+ private static String string(ByteBuffer bb)
+ {
+ try
+ {
+ return ByteBufferUtil.string(bb);
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @BeforeClass
+ public static void addData()
+ {
+ cfs().clearUnsafe();
+
+ int nbKeys = 10;
+ int nbCols = 10;
+
+ /*
+ * Creates the following data:
+ * k1: c1 ... cn
+ * ...
+ * ki: c1 ... cn
+ */
+ for (int i = 0; i < nbKeys; i++)
+ {
+ RowMutation rm = new RowMutation(KS, bytes("k" + i));
+ ColumnFamily cf = rm.addOrGet(CF);
+
+ for (int j = 0; j < nbCols; j++)
+ cf.addColumn(Util.column("c" + j, "", 0));
+
+ rm.applyUnsafe();
+ }
+ }
+
+ private static ColumnFamilyStore cfs()
+ {
+ return Table.open(KS).getColumnFamilyStore(CF);
+ }
+
+ private static String toString(List<Row> rows)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (Row row : rows)
+ sb.append(string(row.key.key)).append(":").append(toString(row.cf)).append("\n");
+ return sb.toString();
+ }
+
+ private static String toString(ColumnFamily cf)
+ {
+ if (cf == null)
+ return "";
+
+ StringBuilder sb = new StringBuilder();
+ for (Column c : cf)
+ sb.append(" ").append(string(c.name()));
+ return sb.toString();
+ }
+
+ private static ReadCommand namesQuery(String key, String... names)
+ {
+ SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(cfs().metadata.comparator);
+ for (String name : names)
+ s.add(bytes(name));
+ return new SliceByNamesReadCommand(KS, bytes(key), CF, System.currentTimeMillis(), new NamesQueryFilter(s, true));
+ }
+
+ private static ReadCommand sliceQuery(String key, String start, String end, int count)
+ {
+ SliceQueryFilter filter = new SliceQueryFilter(bytes(start), bytes(end), false, count);
+ return new SliceFromReadCommand(KS, bytes(key), CF, System.currentTimeMillis(), filter);
+ }
+
+ private static RangeSliceCommand rangeNamesQuery(AbstractBounds<RowPosition> range, int count, String... names)
+ {
+ SortedSet<ByteBuffer> s = new TreeSet<ByteBuffer>(cfs().metadata.comparator);
+ for (String name : names)
+ s.add(bytes(name));
+ return new RangeSliceCommand(KS, CF, System.currentTimeMillis(), new NamesQueryFilter(s, true), range, count);
+ }
+
+ private static RangeSliceCommand rangeSliceQuery(AbstractBounds<RowPosition> range, int count, String start, String end)
+ {
+ SliceQueryFilter filter = new SliceQueryFilter(bytes(start), bytes(end), false, Integer.MAX_VALUE);
+ return new RangeSliceCommand(KS, CF, System.currentTimeMillis(), filter, range, count);
+ }
+
+ private static void assertRow(Row r, String key, String... names)
+ {
+ assertEquals(key, string(r.key.key));
+ assertNotNull(r.cf);
+ assertEquals(toString(r.cf), names.length, r.cf.getColumnCount());
+ int i = 0;
+ for (Column c : r.cf)
+ {
+ String expected = names[i++];
+ assertEquals("column " + i + " doesn't match: " + toString(r.cf), expected, string(c.name()));
+ }
+ }
+
+ @Test
+ public void NamesQueryTest() throws Exception
+ {
+ QueryPager pager = QueryPagers.localPager(namesQuery("k0", "c1", "c5", "c7", "c8"));
+
+ assertFalse(pager.isExhausted());
+ List<Row> page = pager.fetchPage(3);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k0", "c1", "c5", "c7", "c8");
+
+ assertTrue(pager.isExhausted());
+ }
+
+ @Test
+ public void SliceQueryTest() throws Exception
+ {
+ QueryPager pager = QueryPagers.localPager(sliceQuery("k0", "c1", "c8", 10));
+
+ List<Row> page;
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(3);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k0", "c1", "c2", "c3");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(3);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k0", "c4", "c5", "c6");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(3);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k0", "c7", "c8");
+
+ assertTrue(pager.isExhausted());
+ }
+
+ @Test
+ public void MultiQueryTest() throws Exception
+ {
+ QueryPager pager = QueryPagers.localPager(new Pageable.ReadCommands(new ArrayList<ReadCommand>() {{
+ add(sliceQuery("k1", "c2", "c6", 10));
+ add(sliceQuery("k4", "c3", "c5", 10));
+ }}));
+
+ List<Row> page;
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(3);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k1", "c2", "c3", "c4");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(4);
+ assertEquals(toString(page), 2, page.size());
+ assertRow(page.get(0), "k1", "c5", "c6");
+ assertRow(page.get(1), "k4", "c3", "c4");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(3);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k4", "c5");
+
+ assertTrue(pager.isExhausted());
+ }
+
+ @Test
+ public void RangeNamesQueryTest() throws Exception
+ {
+ QueryPager pager = QueryPagers.localPager(rangeNamesQuery(range("k0", "k5"), 100, "c1", "c4", "c8"));
+
+ List<Row> page;
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(3);
+ assertEquals(toString(page), 3, page.size());
+ for (int i = 1; i <= 3; i++)
+ assertRow(page.get(i-1), "k" + i, "c1", "c4", "c8");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(3);
+ assertEquals(toString(page), 2, page.size());
+ for (int i = 4; i <= 5; i++)
+ assertRow(page.get(i-4), "k" + i, "c1", "c4", "c8");
+
+ assertTrue(pager.isExhausted());
+ }
+
+ @Test
+ public void RangeSliceQueryTest() throws Exception
+ {
+ QueryPager pager = QueryPagers.localPager(rangeSliceQuery(range("k1", "k5"), 100, "c1", "c7"));
+
+ List<Row> page;
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(5);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k2", "c1", "c2", "c3", "c4", "c5");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(4);
+ assertEquals(toString(page), 2, page.size());
+ assertRow(page.get(0), "k2", "c6", "c7");
+ assertRow(page.get(1), "k3", "c1", "c2");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(6);
+ assertEquals(toString(page), 2, page.size());
+ assertRow(page.get(0), "k3", "c3", "c4", "c5", "c6", "c7");
+ assertRow(page.get(1), "k4", "c1");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(5);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k4", "c2", "c3", "c4", "c5", "c6");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(5);
+ assertEquals(toString(page), 2, page.size());
+ assertRow(page.get(0), "k4", "c7");
+ assertRow(page.get(1), "k5", "c1", "c2", "c3", "c4");
+
+ assertFalse(pager.isExhausted());
+ page = pager.fetchPage(5);
+ assertEquals(toString(page), 1, page.size());
+ assertRow(page.get(0), "k5", "c5", "c6", "c7");
+
+ assertTrue(pager.isExhausted());
+ }
+}
[2/4] Add auto paging capability to the native protocol
Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
new file mode 100644
index 0000000..460bc44
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+abstract class AbstractQueryPager implements QueryPager
+{
+ private final ConsistencyLevel consistencyLevel;
+ private final boolean localQuery;
+
+ protected final CFMetaData cfm;
+ protected final IDiskAtomFilter columnFilter;
+ private final long timestamp;
+
+ private volatile int remaining;
+ private volatile boolean exhausted;
+ private volatile boolean lastWasRecorded;
+
+ protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
+ int toFetch,
+ boolean localQuery,
+ String keyspace,
+ String columnFamily,
+ IDiskAtomFilter columnFilter,
+ long timestamp)
+ {
+ this.consistencyLevel = consistencyLevel;
+ this.localQuery = localQuery;
+
+ this.cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+ this.columnFilter = columnFilter;
+ this.timestamp = timestamp;
+
+ this.remaining = toFetch;
+ }
+
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ if (isExhausted())
+ return Collections.emptyList();
+
+ int currentPageSize = nextPageSize(pageSize);
+ List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
+
+ if (rows.isEmpty())
+ {
+ exhausted = true;
+ return Collections.emptyList();
+ }
+
+ int liveCount = getPageLiveCount(rows);
+ remaining -= liveCount;
+
+ // If we've got less than requested, there is no more query to do (but
+ // we still need to return the current page)
+ if (liveCount < currentPageSize)
+ exhausted = true;
+
+ // If it's not the first query and the first column is the last one returned (likely
+ // but not certain since paging can race with deletes/expiration), then remove the
+ // first column.
+ if (containsPreviousLast(rows.get(0)))
+ {
+ rows = discardFirst(rows);
+ remaining++;
+ }
+ // Otherwise, if 'lastWasRecorded', we queried for one more than the page size,
+ // so if the page was is full, trim the last entry
+ else if (lastWasRecorded && !exhausted)
+ {
+ // We've asked for one more than necessary
+ rows = discardLast(rows);
+ remaining++;
+ }
+
+ if (!isExhausted())
+ lastWasRecorded = recordLast(rows.get(rows.size() - 1));
+
+ return rows;
+ }
+
+ private List<Row> filterEmpty(List<Row> result)
+ {
+ boolean doCopy = false;
+ for (Row row : result)
+ {
+ if (row.cf == null || row.cf.getColumnCount() == 0)
+ {
+ List<Row> newResult = new ArrayList<Row>(result.size() - 1);
+ for (Row row2 : result)
+ {
+ if (row.cf == null || row.cf.getColumnCount() == 0)
+ continue;
+
+ newResult.add(row2);
+ }
+ return newResult;
+ }
+ }
+ return result;
+ }
+
+ public boolean isExhausted()
+ {
+ return exhausted || remaining == 0;
+ }
+
+ public int maxRemaining()
+ {
+ return remaining;
+ }
+
+ public long timestamp()
+ {
+ return timestamp;
+ }
+
+ private int nextPageSize(int pageSize)
+ {
+ return Math.min(remaining, pageSize) + (lastWasRecorded ? 1 : 0);
+ }
+
+ public ColumnCounter columnCounter()
+ {
+ return columnFilter.columnCounter(cfm.comparator, timestamp);
+ }
+
+ protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
+ protected abstract boolean containsPreviousLast(Row first);
+ protected abstract boolean recordLast(Row last);
+
+ private List<Row> discardFirst(List<Row> rows)
+ {
+ Row first = rows.get(0);
+ ColumnFamily newCf = discardFirst(first.cf);
+
+ int count = newCf.getColumnCount();
+ List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+ if (count != 0)
+ newRows.add(new Row(first.key, newCf));
+ newRows.addAll(rows.subList(1, rows.size()));
+
+ return newRows;
+ }
+
+ private List<Row> discardLast(List<Row> rows)
+ {
+ Row last = rows.get(rows.size() - 1);
+ ColumnFamily newCf = discardLast(last.cf);
+
+ int count = newCf.getColumnCount();
+ List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+ newRows.addAll(rows.subList(0, rows.size() - 1));
+ if (count != 0)
+ newRows.add(new Row(last.key, newCf));
+
+ return newRows;
+ }
+
+ private int getPageLiveCount(List<Row> page)
+ {
+ int count = 0;
+ for (Row row : page)
+ count += columnCounter().countAll(row.cf).live();
+ return count;
+ }
+
+ private ColumnFamily discardFirst(ColumnFamily cf)
+ {
+ ColumnFamily copy = cf.cloneMeShallow();
+ ColumnCounter counter = columnCounter();
+
+ Iterator<Column> iter = cf.iterator();
+ // Discard the first live
+ while (iter.hasNext())
+ {
+ Column c = iter.next();
+ counter.count(c, cf);
+ if (counter.live() > 1)
+ {
+ copy.addColumn(c);
+ while (iter.hasNext())
+ copy.addColumn(iter.next());
+ }
+ }
+ return copy;
+ }
+
+ private ColumnFamily discardLast(ColumnFamily cf)
+ {
+ ColumnFamily copy = cf.cloneMeShallow();
+ // Redoing the counting like that is not extremely efficient, but
+ // discardLast is only called in case of a race between paging and
+ // a deletion, which is pretty unlikely, so probably not a big deal
+ int liveCount = columnCounter().countAll(cf).live();
+
+ ColumnCounter counter = columnCounter();
+ // Discard the first live
+ for (Column c : cf)
+ {
+ counter.count(c, cf);
+ if (counter.live() < liveCount)
+ copy.addColumn(c);
+ }
+ return copy;
+ }
+
+ protected static ByteBuffer firstName(ColumnFamily cf)
+ {
+ return cf.iterator().next().name();
+ }
+
+ protected static ByteBuffer lastName(ColumnFamily cf)
+ {
+ return cf.getReverseSortedColumns().iterator().next().name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
new file mode 100644
index 0000000..ef82535
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
+/**
+ * Pager over a list of ReadCommand.
+ *
+ * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before
+ * returning results from the next one, but if the result returned by each command is small (compared to pageSize),
+ * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize
+ * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which
+ * defeats the purpose of paging.
+ *
+ * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
+ * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
+ * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't
+ * blow out memory.
+ */
+class MultiPartitionPager implements QueryPager
+{
+ private final SinglePartitionPager[] pagers;
+ private final long timestamp;
+
+ private volatile int current;
+
+ MultiPartitionPager(List<ReadCommand> commands, final ConsistencyLevel consistencyLevel, final boolean localQuery)
+ {
+ this.pagers = new SinglePartitionPager[commands.size()];
+
+ long tstamp = -1;
+ for (int i = 0; i < commands.size(); i++)
+ {
+ ReadCommand command = commands.get(i);
+ if (tstamp == -1)
+ tstamp = command.timestamp;
+ else if (tstamp != command.timestamp)
+ throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
+
+ pagers[i] = command instanceof SliceFromReadCommand
+ ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery)
+ : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery);
+ }
+ timestamp = tstamp;
+ }
+
+ public boolean isExhausted()
+ {
+ while (current < pagers.length)
+ {
+ if (!pagers[current].isExhausted())
+ return false;
+
+ current++;
+ }
+ return true;
+ }
+
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ int remaining = pageSize;
+ List<Row> result = new ArrayList<Row>();
+
+ while (!isExhausted() && remaining > 0)
+ {
+ // Exhausted also sets us on the first non-exhausted pager
+ List<Row> page = pagers[current].fetchPage(remaining);
+ if (page.isEmpty())
+ continue;
+
+ Row row = page.get(0);
+ remaining -= pagers[current].columnCounter().countAll(row.cf).live();
+ result.add(row);
+ }
+
+ return result;
+ }
+
+ public int maxRemaining()
+ {
+ int max = 0;
+ for (int i = current; i < pagers.length; i++)
+ max += pagers[i].maxRemaining();
+ return max;
+ }
+
+ public long timestamp()
+ {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
new file mode 100644
index 0000000..82e7376
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceByNamesReadCommand.
+ */
+public class NamesQueryPager implements SinglePartitionPager
+{
+ private final SliceByNamesReadCommand command;
+ private final ConsistencyLevel consistencyLevel;
+ private final boolean localQuery;
+
+ private volatile boolean queried;
+
+ /**
+ * For now, we'll only use this in CQL3. In there, as name query can never
+ * yield more than one CQL3 row, there is no need for paging and so this is straight-forward.
+ *
+ * For thrift, we could imagine needing to page, though even then it's very
+ * unlikely unless the pageSize is very small.
+ *
+ * In any case we currently assert in fetchPage if it's a "thrift" query (i.e. a query that
+ * count every cell individually) and the names filter asks for more than pageSize columns.
+ */
+ // Don't use directly, use QueryPagers method instead
+ NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ this.command = command;
+ this.consistencyLevel = consistencyLevel;
+ this.localQuery = localQuery;
+ }
+
+ public ColumnCounter columnCounter()
+ {
+ // We know NamesQueryFilter.columnCounter don't care about his argument
+ return command.filter.columnCounter(null, command.timestamp);
+ }
+
+ public boolean isExhausted()
+ {
+ return queried;
+ }
+
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ assert command.filter.countCQL3Rows() || command.filter.columns.size() <= pageSize;
+
+ if (isExhausted())
+ return Collections.<Row>emptyList();
+
+ queried = true;
+ return localQuery
+ ? Collections.singletonList(command.getRow(Table.open(command.table)))
+ : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel);
+ }
+
+ public int maxRemaining()
+ {
+ if (queried)
+ return 0;
+
+ return command.filter.countCQL3Rows() ? 1 : command.filter.columns.size();
+ }
+
+ public long timestamp()
+ {
+ return command.timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/Pageable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/Pageable.java b/src/java/org/apache/cassandra/service/pager/Pageable.java
new file mode 100644
index 0000000..3a69bf4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/Pageable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.ReadCommand;
+
+/**
+ * Marker interface for commands that can be paged.
+ */
+public interface Pageable
+{
+ public static class ReadCommands implements Pageable
+ {
+ public final List<ReadCommand> commands;
+
+ public ReadCommands(List<ReadCommand> commands)
+ {
+ this.commands = commands;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
new file mode 100644
index 0000000..a390859
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Perform a query, paging it by page of a given size.
+ *
+ * This is essentially an iterator of pages. Each call to fetchPage() will
+ * return the next page (i.e. the next list of rows) and isExhausted()
+ * indicates whether there is more page to fetch. The pageSize will
+ * either be in term of cells or in term of CQL3 row, depending on the
+ * parameters of the command we page.
+ *
+ * Please note that the pager might page within rows, so there is no guarantee
+ * that successive pages won't return the same row (though with different
+ * columns every time).
+ *
+ * Also, there is no guarantee that fetchPage() won't return an empty list,
+ * even if isExhausted() return false (but it is guaranteed to return an empty
+ * list *if* isExhausted() return true). Indeed, isExhausted() does *not*
+ * trigger a query so in some (failry rare) case we might not know the paging
+ * is done even though it is.
+ */
+public interface QueryPager
+{
+ /**
+ * Fetches the next page.
+ *
+ * @param pageSize the maximum number of elements to return in the next page.
+ * @return the page of result.
+ */
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException;
+
+ /**
+ * Whether or not this pager is exhausted, i.e. whether or not a call to
+ * fetchPage may return more result.
+ *
+ * @return whether the pager is exhausted.
+ */
+ public boolean isExhausted();
+
+ /**
+ * The maximum number of cells/CQL3 row that we may still have to return.
+ * In other words, that's the initial user limit minus what we've already
+ * returned (note that it's not how many we *will* return, just the upper
+ * limit on it).
+ */
+ public int maxRemaining();
+
+ /**
+ * The timestamp used by the last page.
+ */
+ public long timestamp();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
new file mode 100644
index 0000000..9bc3afd
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Static utility methods to create query pagers.
+ */
+public class QueryPagers
+{
+ private QueryPagers() {};
+
+ private static int maxQueried(ReadCommand command)
+ {
+ if (command instanceof SliceByNamesReadCommand)
+ {
+ NamesQueryFilter filter = ((SliceByNamesReadCommand)command).filter;
+ return filter.countCQL3Rows() ? 1 : filter.columns.size();
+ }
+ else
+ {
+ SliceQueryFilter filter = ((SliceFromReadCommand)command).filter;
+ return filter.count;
+ }
+ }
+
+ public static boolean mayNeedPaging(Pageable command, int pageSize)
+ {
+ if (command instanceof Pageable.ReadCommands)
+ {
+ List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+
+ int maxQueried = 0;
+ for (ReadCommand readCmd : commands)
+ maxQueried += maxQueried(readCmd);
+
+ return maxQueried > pageSize;
+ }
+ else if (command instanceof ReadCommand)
+ {
+ return maxQueried((ReadCommand)command) > pageSize;
+ }
+ else
+ {
+ assert command instanceof RangeSliceCommand;
+ // We can never be sure a range slice won't need paging
+ return true;
+ }
+ }
+
+ private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local)
+ {
+ if (command instanceof SliceByNamesReadCommand)
+ return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local);
+ else
+ return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local);
+ }
+
+ private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, boolean local)
+ {
+ if (command instanceof Pageable.ReadCommands)
+ {
+ List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+ if (commands.size() == 1)
+ return pager(commands.get(0), consistencyLevel, local);
+
+ return new MultiPartitionPager(commands, consistencyLevel, local);
+ }
+ else if (command instanceof ReadCommand)
+ {
+ return pager((ReadCommand)command, consistencyLevel, local);
+ }
+ else
+ {
+ assert command instanceof RangeSliceCommand;
+ RangeSliceCommand rangeCommand = (RangeSliceCommand)command;
+ if (rangeCommand.predicate instanceof NamesQueryFilter)
+ return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local);
+ else
+ return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local);
+ }
+ }
+
+ public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel)
+ {
+ return pager(command, consistencyLevel, false);
+ }
+
+ public static QueryPager localPager(Pageable command)
+ {
+ return pager(command, null, true);
+ }
+
+ /**
+ * Convenience method to (locally) page an internal row.
+ * Used to 2ndary index a wide row without dying.
+ */
+ public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize)
+ {
+ SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter());
+ final SliceQueryPager pager = new SliceQueryPager(command, null, true);
+
+ return new Iterator<ColumnFamily>()
+ {
+ // We don't use AbstractIterator because we don't want hasNext() to do an actual query
+ public boolean hasNext()
+ {
+ return !pager.isExhausted();
+ }
+
+ public ColumnFamily next()
+ {
+ try
+ {
+ List<Row> rows = pager.fetchPage(pageSize);
+ ColumnFamily cf = rows.isEmpty() ? null : rows.get(0).cf;
+ return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath.
+ */
+ public static int countPaged(String keyspace,
+ String columnFamily,
+ ByteBuffer key,
+ SliceQueryFilter filter,
+ ConsistencyLevel consistencyLevel,
+ final int pageSize,
+ long now) throws RequestValidationException, RequestExecutionException
+ {
+ SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
+ final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, false);
+
+ ColumnCounter counter = filter.columnCounter(Schema.instance.getComparator(keyspace, columnFamily), now);
+ while (!pager.isExhausted())
+ {
+ List<Row> next = pager.fetchPage(pageSize);
+ if (!next.isEmpty())
+ counter.countAll(next.get(0).cf);
+ }
+ return counter.live();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
new file mode 100644
index 0000000..e4d4295
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a name query.
+ *
+ * Note: this only work for NamesQueryFilter that have countCQL3Rows() set,
+ * because this assumes the pageSize is counted in number of internal rows
+ * returned. More precisely, this doesn't do in-row paging so this assumes
+ * that the counter returned by columnCounter() will count 1 for each internal
+ * row.
+ */
+public class RangeNamesQueryPager extends AbstractQueryPager
+{
+ private final RangeSliceCommand command;
+ private volatile RowPosition lastReturnedKey;
+
+ // Don't use directly, use QueryPagers method instead
+ RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+ this.command = command;
+ assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows();
+ }
+
+ protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ throws RequestExecutionException
+ {
+ AbstractRangeCommand pageCmd = command.withUpdatedLimit(pageSize);
+ if (lastReturnedKey != null)
+ pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey));
+
+ return localQuery
+ ? pageCmd.executeLocally()
+ : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+ }
+
+ protected boolean containsPreviousLast(Row first)
+ {
+ // When querying the next page, we create a bound that exclude the lastReturnedKey
+ return false;
+ }
+
+ protected boolean recordLast(Row last)
+ {
+ lastReturnedKey = last.key;
+ // We return false as that means "can that last be in the next query?"
+ return false;
+ }
+
+ private AbstractBounds<RowPosition> makeExcludingKeyBounds(RowPosition lastReturnedKey)
+ {
+ // We return a range that always exclude lastReturnedKey, since we've already
+ // returned it.
+ AbstractBounds<RowPosition> bounds = command.keyRange;
+ if (bounds instanceof Range || bounds instanceof Bounds)
+ {
+ return new Range<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ else
+ {
+ return new ExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
new file mode 100644
index 0000000..578d5c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a slice query.
+ *
+ * Note: this only work for CQL3 queries for now (because thrift queries expect
+ * a different limit on the rows than on the columns, which complicates it).
+ */
+public class RangeSliceQueryPager extends AbstractQueryPager
+{
+ private final RangeSliceCommand command;
+ private volatile RowPosition lastReturnedKey;
+ private volatile ByteBuffer lastReturnedName;
+
+ // Don't use directly, use QueryPagers method instead
+ RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+ this.command = command;
+ assert columnFilter instanceof SliceQueryFilter;
+ }
+
+ protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ throws RequestExecutionException
+ {
+ SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
+ AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey);
+ ByteBuffer start = lastReturnedName == null ? sf.start() : lastReturnedName;
+ PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
+ command.columnFamily,
+ command.timestamp,
+ keyRange,
+ sf,
+ start,
+ sf.finish(),
+ command.rowFilter,
+ pageSize);
+
+ return localQuery
+ ? pageCmd.executeLocally()
+ : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+ }
+
+ protected boolean containsPreviousLast(Row first)
+ {
+ return lastReturnedKey != null
+ && lastReturnedKey.equals(first.key)
+ && lastReturnedName.equals(firstName(first.cf));
+ }
+
+ protected boolean recordLast(Row last)
+ {
+ lastReturnedKey = last.key;
+ lastReturnedName = lastName(last.cf);
+ return true;
+ }
+
+ private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
+ {
+ // We always include lastReturnedKey since we may still be paging within a row,
+ // and PagedRangeCommand will move over if we're not anyway
+ AbstractBounds<RowPosition> bounds = command.keyRange;
+ if (bounds instanceof Range || bounds instanceof Bounds)
+ {
+ return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ else
+ {
+ return new IncludingExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
new file mode 100644
index 0000000..693a20e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import org.apache.cassandra.db.filter.ColumnCounter;
+
+/**
+ * Common interface to single partition queries (by slice and by name).
+ *
+ * For use by MultiPartitionPager.
+ */
+public interface SinglePartitionPager extends QueryPager
+{
+ public ColumnCounter columnCounter();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
new file mode 100644
index 0000000..58ef3c4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceFromReadCommand.
+ */
+public class SliceQueryPager extends AbstractQueryPager implements SinglePartitionPager
+{
+ private final SliceFromReadCommand command;
+
+ private volatile ByteBuffer lastReturned;
+
+ // Don't use directly, use QueryPagers method instead
+ SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ super(consistencyLevel, command.filter.count, localQuery, command.table, command.cfName, command.filter, command.timestamp);
+ this.command = command;
+ }
+
+ protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ throws RequestValidationException, RequestExecutionException
+ {
+ SliceQueryFilter filter = command.filter.withUpdatedCount(pageSize);
+ if (lastReturned != null)
+ filter = filter.withUpdatedStart(lastReturned, cfm.comparator);
+
+ ReadCommand pageCmd = command.withUpdatedFilter(filter);
+ return localQuery
+ ? Collections.singletonList(pageCmd.getRow(Table.open(command.table)))
+ : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel);
+ }
+
+ protected boolean containsPreviousLast(Row first)
+ {
+ return lastReturned != null && lastReturned.equals(firstName(first.cf));
+ }
+
+ protected boolean recordLast(Row last)
+ {
+ lastReturned = lastName(last.cf);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 55688a8..a15927e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.cql.CQLStatement;
import org.apache.cassandra.cql.QueryProcessor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -60,6 +61,7 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -368,31 +370,25 @@ public class CassandraServer implements Cassandra.Iface
}
}
- private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
- List<ByteBuffer> keys,
- ColumnParent column_parent,
- long timestamp,
- SlicePredicate predicate,
- ConsistencyLevel consistency_level)
- throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+ private SliceQueryFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range)
{
- CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
- ThriftValidation.validateColumnParent(metadata, column_parent);
- ThriftValidation.validatePredicate(metadata, column_parent, predicate);
-
- org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
- consistencyLevel.validateForRead(keyspace);
+ SliceQueryFilter filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
+ if (metadata.isSuper())
+ filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, parent.bufferForSuper_column(), filter);
+ return filter;
+ }
- List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+ private IDiskAtomFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SlicePredicate predicate)
+ {
IDiskAtomFilter filter;
if (predicate.column_names != null)
{
if (metadata.isSuper())
{
CompositeType type = (CompositeType)metadata.comparator;
- SortedSet s = new TreeSet<ByteBuffer>(column_parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
+ SortedSet s = new TreeSet<ByteBuffer>(parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
s.addAll(predicate.column_names);
- filter = SuperColumns.fromSCNamesFilter(type, column_parent.bufferForSuper_column(), new NamesQueryFilter(s));
+ filter = SuperColumns.fromSCNamesFilter(type, parent.bufferForSuper_column(), new NamesQueryFilter(s));
}
else
{
@@ -403,11 +399,28 @@ public class CassandraServer implements Cassandra.Iface
}
else
{
- SliceRange range = predicate.slice_range;
- filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
- if (metadata.isSuper())
- filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, column_parent.bufferForSuper_column(), filter);
+ filter = toInternalFilter(metadata, parent, predicate.slice_range);
}
+ return filter;
+ }
+
+ private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
+ List<ByteBuffer> keys,
+ ColumnParent column_parent,
+ long timestamp,
+ SlicePredicate predicate,
+ ConsistencyLevel consistency_level)
+ throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+ {
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+ ThriftValidation.validateColumnParent(metadata, column_parent);
+ ThriftValidation.validatePredicate(metadata, column_parent, predicate);
+
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
+
+ List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+ IDiskAtomFilter filter = toInternalFilter(metadata, column_parent, predicate);
for (ByteBuffer key: keys)
{
@@ -530,46 +543,22 @@ public class CassandraServer implements Cassandra.Iface
pageSize = COUNT_PAGE_SIZE;
}
- int totalCount = 0;
- List<ColumnOrSuperColumn> columns;
-
- if (predicate.slice_range == null)
- {
- predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- Integer.MAX_VALUE);
- }
-
- final int requestedCount = predicate.slice_range.count;
- int remaining = requestedCount;
- int pages = 0;
- while (true)
- {
- predicate.slice_range.count = Math.min(pageSize, Math.max(2, remaining)); // fetch at least two columns
- columns = getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level);
- if (columns.isEmpty())
- break;
-
- ByteBuffer firstName = getName(columns.get(0));
- int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size() : columns.size() - 1;
-
- totalCount += newColumns;
- // if we over-counted, just return original limit
- if (totalCount > requestedCount)
- return requestedCount;
- remaining -= newColumns;
- pages++;
- // We're done if either:
- // - We've querying the number of columns requested by the user
- // - last fetched page only contains the column we already fetched
- if (remaining == 0 || ((columns.size() == 1) && (firstName.equals(predicate.slice_range.start))))
- break;
- else
- predicate.slice_range.start = getName(columns.get(columns.size() - 1));
- }
+ SliceRange sliceRange = predicate.slice_range == null
+ ? new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE)
+ : predicate.slice_range;
+ SliceQueryFilter filter = toInternalFilter(cfs.metadata, column_parent, sliceRange);
- return totalCount;
+ return QueryPagers.countPaged(keyspace,
+ column_parent.column_family,
+ key,
+ filter,
+ ThriftConversion.fromThrift(consistency_level),
+ pageSize,
+ timestamp);
+ }
+ catch (RequestExecutionException e)
+ {
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -774,8 +763,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null; // makes javac happy -- it can't tell that rethrow always throws
+ throw ThriftConversion.rethrow(e);
}
finally
{
@@ -1218,22 +1206,16 @@ public class CassandraServer implements Cassandra.Iface
bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
}
+ if (range.row_filter != null && !range.row_filter.isEmpty())
+ throw new InvalidRequestException("Cross-row paging is not supported along with index clauses");
+
List<Row> rows;
long now = System.currentTimeMillis();
schedule(DatabaseDescriptor.getRangeRpcTimeout());
try
{
IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, null);
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
- column_family,
- now,
- filter,
- bounds,
- range.row_filter,
- range.count,
- true,
- true),
- consistencyLevel);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, now, filter, bounds, null, range.count, true, true), consistencyLevel);
}
finally
{
@@ -1878,8 +1860,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -1913,8 +1894,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -1994,8 +1974,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -2034,12 +2013,15 @@ public class CassandraServer implements Cassandra.Iface
itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
- return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState.getQueryState(), bindVariables).toThriftResult();
+ return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement,
+ ThriftConversion.fromThrift(cLevel),
+ cState.getQueryState(),
+ bindVariables,
+ -1).toThriftResult();
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 28725f0..8a9ab59 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -45,7 +45,9 @@ public class ThriftConversion
throw new AssertionError();
}
- public static void rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
+ // We never return, but returning a RuntimeException allows to write "throw rethrow(e)" without java complaining
+ // for methods that have a return value.
+ public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
{
if (e instanceof RequestTimeoutException)
throw toThrift((RequestTimeoutException)e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 2250816..2ba21d5 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -22,5 +22,5 @@ import org.jboss.netty.buffer.ChannelBuffer;
public interface CBCodec<T>
{
public T decode(ChannelBuffer body, int version);
- public ChannelBuffer encode(T t);
+ public ChannelBuffer encode(T t, int version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 8e2d765..2f3f3bd 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -35,14 +35,7 @@ import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.transport.messages.CredentialsMessage;
-import org.apache.cassandra.transport.messages.ExecuteMessage;
-import org.apache.cassandra.transport.messages.OptionsMessage;
-import org.apache.cassandra.transport.messages.PrepareMessage;
-import org.apache.cassandra.transport.messages.QueryMessage;
-import org.apache.cassandra.transport.messages.RegisterMessage;
-import org.apache.cassandra.transport.messages.AuthResponse;
-import org.apache.cassandra.transport.messages.StartupMessage;
+import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.utils.Hex;
import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
@@ -116,8 +109,37 @@ public class Client extends SimpleClient
}
else if (msgType.equals("QUERY"))
{
- String query = line.substring(6);
- return new QueryMessage(query, ConsistencyLevel.ONE);
+ line = line.substring(6);
+ // Ugly hack to allow setting a page size, but that's playground code anyway
+ String query = line;
+ int pageSize = -1;
+ if (line.matches(".+ !\\d+$"))
+ {
+ int idx = line.lastIndexOf('!');
+ query = line.substring(0, idx-1);
+ try
+ {
+ pageSize = Integer.parseInt(line.substring(idx+1, line.length()));
+ }
+ catch (NumberFormatException e)
+ {
+ return null;
+ }
+ }
+ return new QueryMessage(query, Collections.<ByteBuffer>emptyList(), ConsistencyLevel.ONE, pageSize);
+ }
+ else if (msgType.equals("NEXT"))
+ {
+ line = line.substring(5);
+ try
+ {
+ int pageSize = Integer.parseInt(line);
+ return new NextMessage(pageSize);
+ }
+ catch (NumberFormatException e)
+ {
+ return null;
+ }
}
else if (msgType.equals("PREPARE"))
{
@@ -145,7 +167,7 @@ public class Client extends SimpleClient
}
values.add(bb);
}
- return new ExecuteMessage(id, values, ConsistencyLevel.ONE);
+ return new ExecuteMessage(id, values, ConsistencyLevel.ONE, -1);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 54da6a2..eca3697 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -73,7 +73,8 @@ public abstract class Message
BATCH (13, Direction.REQUEST, BatchMessage.codec),
AUTH_CHALLENGE (14, Direction.RESPONSE, AuthChallenge.codec),
AUTH_RESPONSE (15, Direction.REQUEST, AuthResponse.codec),
- AUTH_SUCCESS (16, Direction.RESPONSE, AuthSuccess.codec);
+ AUTH_SUCCESS (16, Direction.RESPONSE, AuthSuccess.codec),
+ NEXT (17, Direction.REQUEST, NextMessage.codec);
public final int opcode;
public final Direction direction;
@@ -298,11 +299,11 @@ public abstract class Message
{
assert request.connection() instanceof ServerConnection;
ServerConnection connection = (ServerConnection)request.connection();
- connection.validateNewMessage(request.type, request.getVersion());
+ QueryState qstate = connection.validateNewMessage(request.type, request.getVersion(), request.getStreamId());
logger.debug("Received: {}, v={}", request, request.getVersion());
- Response response = request.execute(connection.getQueryState(request.getStreamId()));
+ Response response = request.execute(qstate);
response.setStreamId(request.getStreamId());
response.setVersion(request.getVersion());
response.attach(connection);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index ec99440..538258d 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -53,7 +53,7 @@ public class ServerConnection extends Connection
this.state = State.UNINITIALIZED;
}
- public QueryState getQueryState(int streamId)
+ private QueryState getQueryState(int streamId)
{
QueryState qState = queryStates.get(streamId);
if (qState == null)
@@ -66,7 +66,7 @@ public class ServerConnection extends Connection
return qState;
}
- public void validateNewMessage(Message.Type type, int version)
+ public QueryState validateNewMessage(Message.Type type, int version, int streamId)
{
switch (state)
{
@@ -86,6 +86,20 @@ public class ServerConnection extends Connection
default:
throw new AssertionError();
}
+
+ QueryState qstate = getQueryState(streamId);
+ if (qstate.hasPager())
+ {
+ if (type != Message.Type.NEXT)
+ qstate.dropPager();
+ }
+ else
+ {
+ if (type == Message.Type.NEXT)
+ throw new ProtocolException("Unexpected NEXT message, paging is not set (or is done)");
+ }
+
+ return qstate;
}
public void applyStateTransition(Message.Type requestType, Message.Type responseType)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 993a490..3a9c286 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -156,7 +156,7 @@ public class SimpleClient
public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
{
- Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel));
+ Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel, -1));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
@@ -170,7 +170,7 @@ public class SimpleClient
public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
{
- Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency));
+ Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency, -1));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index bc90dc5..63df7d0 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -40,7 +40,7 @@ public class AuthChallenge extends Message.Response
}
@Override
- public ChannelBuffer encode(AuthChallenge challenge)
+ public ChannelBuffer encode(AuthChallenge challenge, int version)
{
return CBUtil.valueToCB(challenge.token);
}
@@ -57,7 +57,7 @@ public class AuthChallenge extends Message.Response
@Override
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public byte[] getToken()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 1f8ed9f..8a33a72 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -52,7 +52,7 @@ public class AuthResponse extends Message.Request
}
@Override
- public ChannelBuffer encode(AuthResponse response)
+ public ChannelBuffer encode(AuthResponse response, int version)
{
return CBUtil.valueToCB(response.token);
}
@@ -69,7 +69,7 @@ public class AuthResponse extends Message.Request
@Override
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index ba520bc..13c750a 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -43,7 +43,7 @@ public class AuthSuccess extends Message.Response
}
@Override
- public ChannelBuffer encode(AuthSuccess success)
+ public ChannelBuffer encode(AuthSuccess success, int version)
{
return CBUtil.valueToCB(success.token);
}
@@ -60,7 +60,7 @@ public class AuthSuccess extends Message.Response
@Override
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public byte[] getToken()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index d781f68..292f748 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -35,7 +35,7 @@ public class AuthenticateMessage extends Message.Response
return new AuthenticateMessage(authenticator);
}
- public ChannelBuffer encode(AuthenticateMessage msg)
+ public ChannelBuffer encode(AuthenticateMessage msg, int version)
{
return CBUtil.stringToCB(msg.authenticator);
}
@@ -51,7 +51,7 @@ public class AuthenticateMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 299d8b8..9fb4482 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -73,7 +73,7 @@ public class BatchMessage extends Message.Request
return new BatchMessage(toType(type), queryOrIds, variables, consistency);
}
- public ChannelBuffer encode(BatchMessage msg)
+ public ChannelBuffer encode(BatchMessage msg, int version)
{
// We have:
// - type
@@ -160,7 +160,7 @@ public class BatchMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index ceff5ba..207907a 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -55,7 +55,7 @@ public class CredentialsMessage extends Message.Request
return msg;
}
- public ChannelBuffer encode(CredentialsMessage msg)
+ public ChannelBuffer encode(CredentialsMessage msg, int version)
{
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
@@ -78,7 +78,7 @@ public class CredentialsMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 3243bce..3675f08 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -123,7 +123,7 @@ public class ErrorMessage extends Message.Response
return new ErrorMessage(te);
}
- public ChannelBuffer encode(ErrorMessage msg)
+ public ChannelBuffer encode(ErrorMessage msg, int version)
{
ChannelBuffer ccb = CBUtil.intToCB(msg.error.code().value);
ChannelBuffer mcb = CBUtil.stringToCB(msg.error.getMessage());
@@ -213,7 +213,7 @@ public class ErrorMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 7d67de9..f7a93ae 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -31,7 +31,7 @@ public class EventMessage extends Message.Response
return new EventMessage(Event.deserialize(body));
}
- public ChannelBuffer encode(EventMessage msg)
+ public ChannelBuffer encode(EventMessage msg, int version)
{
return msg.event.serialize();
}
@@ -48,7 +48,7 @@ public class EventMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 8e2b761..7c35e42 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import com.google.common.collect.ImmutableMap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.apache.cassandra.cql3.CQLStatement;
@@ -49,10 +50,11 @@ public class ExecuteMessage extends Message.Request
values.add(CBUtil.readValue(body));
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
- return new ExecuteMessage(id, values, consistency);
+ int resultPageSize = body.readInt();
+ return new ExecuteMessage(id, values, consistency, resultPageSize);
}
- public ChannelBuffer encode(ExecuteMessage msg)
+ public ChannelBuffer encode(ExecuteMessage msg, int version)
{
// We have:
// - statementId
@@ -60,7 +62,7 @@ public class ExecuteMessage extends Message.Request
// - The values
// - options
int vs = msg.values.size();
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
+ CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(4, 0, vs);
builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
builder.add(CBUtil.shortToCB(vs));
@@ -69,6 +71,7 @@ public class ExecuteMessage extends Message.Request
builder.addValue(value);
builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
+ builder.add(CBUtil.intToCB(msg.resultPageSize));
return builder.build();
}
};
@@ -76,23 +79,25 @@ public class ExecuteMessage extends Message.Request
public final MD5Digest statementId;
public final List<ByteBuffer> values;
public final ConsistencyLevel consistency;
+ public final int resultPageSize;
- public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+ public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
{
- this(MD5Digest.wrap(statementId), values, consistency);
+ this(MD5Digest.wrap(statementId), values, consistency, resultPageSize);
}
- public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+ public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
{
super(Message.Type.EXECUTE);
this.statementId = statementId;
this.values = values;
this.consistency = consistency;
+ this.resultPageSize = resultPageSize;
}
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
@@ -104,6 +109,9 @@ public class ExecuteMessage extends Message.Request
if (statement == null)
throw new PreparedQueryNotFoundException(statementId);
+ if (resultPageSize == 0)
+ throw new ProtocolException("The page size cannot be 0");
+
UUID tracingId = null;
if (isTracingRequested())
{
@@ -114,11 +122,16 @@ public class ExecuteMessage extends Message.Request
if (state.traceNextQuery())
{
state.createTracingSession();
+
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ if (resultPageSize > 0)
+ builder.put("page_size", Integer.toString(resultPageSize));
+
// TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support.
- Tracing.instance.begin("Execute CQL3 prepared query", Collections.<String, String>emptyMap());
+ Tracing.instance.begin("Execute CQL3 prepared query", builder.build());
}
- Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values);
+ Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values, resultPageSize);
if (tracingId != null)
response.setTracingId(tracingId);
@@ -132,6 +145,9 @@ public class ExecuteMessage extends Message.Request
finally
{
Tracing.instance.stopSession();
+ // Trash the current session id if we won't need it anymore
+ if (!state.hasPager())
+ state.getAndResetCurrentTracingSession();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/NextMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/NextMessage.java b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
new file mode 100644
index 0000000..d68f603
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class NextMessage extends Message.Request
+{
+ public static final Message.Codec<NextMessage> codec = new Message.Codec<NextMessage>()
+ {
+ public NextMessage decode(ChannelBuffer body, int version)
+ {
+ int resultPageSize = body.readInt();
+ return new NextMessage(resultPageSize);
+ }
+
+ public ChannelBuffer encode(NextMessage msg, int version)
+ {
+ return CBUtil.intToCB(msg.resultPageSize);
+ }
+ };
+
+ public final int resultPageSize;
+
+ public NextMessage(int resultPageSize)
+ {
+ super(Type.NEXT);
+ this.resultPageSize = resultPageSize;
+ }
+
+ public ChannelBuffer encode()
+ {
+ return codec.encode(this, getVersion());
+ }
+
+ public Message.Response execute(QueryState state)
+ {
+ try
+ {
+ if (resultPageSize == 0)
+ throw new ProtocolException("The page size cannot be 0");
+
+ /*
+ * If we had traced the previous page and we are asked to trace this one,
+ * record the previous id to allow linking the trace together.
+ */
+ UUID previousTracingId = state.getAndResetCurrentTracingSession();
+
+ UUID tracingId = null;
+ if (isTracingRequested())
+ {
+ tracingId = UUIDGen.getTimeUUID();
+ state.prepareTracingSession(tracingId);
+ }
+
+ if (state.traceNextQuery())
+ {
+ state.createTracingSession();
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ if (resultPageSize > 0)
+ builder.put("page_size", Integer.toString(resultPageSize));
+ if (previousTracingId != null)
+ builder.put("previous_trace", previousTracingId.toString());
+ Tracing.instance.begin("Continue paged CQL3 query", builder.build());
+ }
+
+ Message.Response response = state.getNextPage(resultPageSize < 0 ? Integer.MAX_VALUE : resultPageSize);
+
+ if (tracingId != null)
+ response.setTracingId(tracingId);
+
+ return response;
+ }
+ catch (Exception e)
+ {
+ if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
+ logger.error("Unexpected error during query", e);
+ return ErrorMessage.fromException(e);
+ }
+ finally
+ {
+ Tracing.instance.stopSession();
+ // Trash the current session id if we won't need it anymore
+ if (!state.hasPager())
+ state.getAndResetCurrentTracingSession();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NEXT " + resultPageSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 6e753d3..5afefb5 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -42,7 +42,7 @@ public class OptionsMessage extends Message.Request
return new OptionsMessage();
}
- public ChannelBuffer encode(OptionsMessage msg)
+ public ChannelBuffer encode(OptionsMessage msg, int version)
{
return ChannelBuffers.EMPTY_BUFFER;
}
@@ -55,7 +55,7 @@ public class OptionsMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 3e7fe51..851f3f8 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -38,7 +38,7 @@ public class PrepareMessage extends Message.Request
return new PrepareMessage(query);
}
- public ChannelBuffer encode(PrepareMessage msg)
+ public ChannelBuffer encode(PrepareMessage msg, int version)
{
return CBUtil.longStringToCB(msg.query);
}
@@ -54,7 +54,7 @@ public class PrepareMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
[3/4] Add auto paging capability to the native protocol
Posted by sl...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/RowIteratorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIteratorFactory.java b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
index c9f715c..c02da1d 100644
--- a/src/java/org/apache/cassandra/db/RowIteratorFactory.java
+++ b/src/java/org/apache/cassandra/db/RowIteratorFactory.java
@@ -45,18 +45,15 @@ public class RowIteratorFactory
* and filtered by the queryfilter.
* @param memtables Memtables pending flush.
* @param sstables SStables to scan through.
- * @param startWith Start at this key
- * @param stopAt Stop and this key
- * @param filter Used to decide which columns to pull out
+ * @param range The data range to fetch
* @param cfs
* @return A row iterator following all the given restrictions
*/
public static CloseableIterator<Row> getIterator(final Iterable<Memtable> memtables,
final Collection<SSTableReader> sstables,
- final RowPosition startWith,
- final RowPosition stopAt,
- final QueryFilter filter,
- final ColumnFamilyStore cfs)
+ final DataRange range,
+ final ColumnFamilyStore cfs,
+ final long now)
{
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
final List<CloseableIterator<OnDiskAtomIterator>> iterators = new ArrayList<CloseableIterator<OnDiskAtomIterator>>();
@@ -64,19 +61,19 @@ public class RowIteratorFactory
// memtables
for (Memtable memtable : memtables)
{
- iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(filter, memtable.getEntryIterator(startWith, stopAt)));
+ iterators.add(new ConvertToColumnIterator<AtomicSortedColumns>(range, memtable.getEntryIterator(range.startKey(), range.stopKey())));
}
for (SSTableReader sstable : sstables)
{
- final SSTableScanner scanner = sstable.getScanner(filter, startWith);
+ final SSTableScanner scanner = sstable.getScanner(range);
iterators.add(scanner);
}
// reduce rows from all sources into a single row
return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<OnDiskAtomIterator, Row>()
{
- private final int gcBefore = cfs.gcBefore(filter.timestamp);
+ private final int gcBefore = cfs.gcBefore(now);
private final List<OnDiskAtomIterator> colIters = new ArrayList<OnDiskAtomIterator>();
private DecoratedKey key;
private ColumnFamily returnCF;
@@ -96,17 +93,16 @@ public class RowIteratorFactory
protected Row getReduced()
{
-
// First check if this row is in the rowCache. If it is we can skip the rest
ColumnFamily cached = cfs.getRawCachedRow(key);
if (cached == null)
{
// not cached: collate
- filter.collateOnDiskAtom(returnCF, colIters, gcBefore);
+ QueryFilter.collateOnDiskAtom(returnCF, colIters, range.columnFilter(key.key), gcBefore, now);
}
else
{
- QueryFilter keyFilter = new QueryFilter(key, filter.cfName, filter.filter, filter.timestamp);
+ QueryFilter keyFilter = new QueryFilter(key, cfs.name, range.columnFilter(key.key), now);
returnCF = cfs.filterColumnFamily(cached, keyFilter);
}
@@ -123,12 +119,12 @@ public class RowIteratorFactory
*/
private static class ConvertToColumnIterator<T extends ColumnFamily> implements CloseableIterator<OnDiskAtomIterator>
{
- private final QueryFilter filter;
+ private final DataRange range;
private final Iterator<Map.Entry<DecoratedKey, T>> iter;
- public ConvertToColumnIterator(QueryFilter filter, Iterator<Map.Entry<DecoratedKey, T>> iter)
+ public ConvertToColumnIterator(DataRange range, Iterator<Map.Entry<DecoratedKey, T>> iter)
{
- this.filter = filter;
+ this.range = range;
this.iter = iter;
}
@@ -151,7 +147,7 @@ public class RowIteratorFactory
{
public OnDiskAtomIterator create()
{
- return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey());
+ return range.columnFilter(entry.getKey().key).getColumnFamilyIterator(entry.getKey(), entry.getValue());
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index 5c42de5..508d1d2 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -106,6 +106,11 @@ public class SliceFromReadCommand extends ReadCommand
return filter;
}
+ public SliceFromReadCommand withUpdatedFilter(SliceQueryFilter newFilter)
+ {
+ return new SliceFromReadCommand(table, key, cfName, timestamp, newFilter);
+ }
+
/**
* The original number of columns requested by the user.
* This can be different from count when the slice command is a retry (see
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java
deleted file mode 100644
index c1933ad..0000000
--- a/src/java/org/apache/cassandra/db/SliceQueryPager.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.filter.*;
-
-public class SliceQueryPager implements Iterator<ColumnFamily>
-{
- public static final int DEFAULT_PAGE_SIZE = 10000;
-
- public final ColumnFamilyStore cfs;
- public final DecoratedKey key;
-
- private ColumnSlice[] slices;
- private boolean exhausted;
-
- public SliceQueryPager(ColumnFamilyStore cfs, DecoratedKey key, ColumnSlice[] slices)
- {
- this.cfs = cfs;
- this.key = key;
- this.slices = slices;
- }
-
- // This will *not* do a query
- public boolean hasNext()
- {
- return !exhausted;
- }
-
- // This might return an empty column family (but never a null one)
- public ColumnFamily next()
- {
- if (exhausted)
- return null;
-
- long now = System.currentTimeMillis();
- SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE);
- QueryFilter filter = new QueryFilter(key, cfs.name, sliceFilter, now);
- ColumnFamily cf = cfs.getColumnFamily(filter);
- if (cf == null || sliceFilter.getLiveCount(cf, now) < DEFAULT_PAGE_SIZE)
- {
- exhausted = true;
- }
- else
- {
- Iterator<Column> iter = cf.getReverseSortedColumns().iterator();
- Column lastColumn = iter.next();
- while (lastColumn.isMarkedForDelete(now))
- lastColumn = iter.next();
-
- int i = 0;
- for (; i < slices.length; ++i)
- {
- ColumnSlice current = slices[i];
- if (cfs.getComparator().compare(lastColumn.name(), current.finish) <= 0)
- break;
- }
- if (i >= slices.length)
- exhausted = true;
- else
- slices = Arrays.copyOfRange(slices, i, slices.length);
- }
- return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 90bfd8d..8528515 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -678,7 +678,8 @@ public class SystemTable
return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()),
null,
new IdentityQueryFilter(),
- Integer.MAX_VALUE);
+ Integer.MAX_VALUE,
+ System.currentTimeMillis());
}
public static Collection<RowMutation> serializeSchema()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 772f842..409076f 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
import org.apache.cassandra.tracing.Tracing;
/**
@@ -50,6 +51,7 @@ import org.apache.cassandra.tracing.Tracing;
public class Table
{
public static final String SYSTEM_KS = "system";
+ private static final int DEFAULT_PAGE_SIZE = 10000;
private static final Logger logger = LoggerFactory.getLogger(Table.class);
@@ -398,7 +400,7 @@ public class Table
switchLock.readLock().lock();
try
{
- SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY);
+ Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
while (pager.hasNext())
{
ColumnFamily cf = pager.next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
index a5d54ce..2fda715 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnCounter.java
@@ -61,6 +61,16 @@ public class ColumnCounter
return ignored;
}
+ public ColumnCounter countAll(ColumnFamily container)
+ {
+ if (container == null)
+ return this;
+
+ for (Column c : container)
+ count(c, container);
+ return this;
+ }
+
public static class GroupByPrefix extends ColumnCounter
{
private final CompositeType type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index af7431c..49503b5 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -89,6 +89,12 @@ public class ColumnSlice
return cmp.compare(start, name) <= 0 && (finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) || cmp.compare(finish, name) >= 0);
}
+ public boolean isBefore(Comparator<ByteBuffer> cmp, ByteBuffer name)
+ {
+ return !finish.equals(ByteBufferUtil.EMPTY_BYTE_BUFFER) && cmp.compare(finish, name) < 0;
+ }
+
+
@Override
public final int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index d061ac2..e0f3ec3 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.filter;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -32,6 +33,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -45,47 +47,36 @@ public abstract class ExtendedFilter
public final ColumnFamilyStore cfs;
public final long timestamp;
- protected final IDiskAtomFilter originalFilter;
+ public final DataRange dataRange;
private final int maxResults;
private final boolean countCQL3Rows;
- private final boolean isPaging;
+ private volatile int currentLimit;
public static ExtendedFilter create(ColumnFamilyStore cfs,
+ DataRange dataRange,
List<IndexExpression> clause,
- IDiskAtomFilter filter,
int maxResults,
- long timestamp,
boolean countCQL3Rows,
- boolean isPaging)
+ long timestamp)
{
if (clause == null || clause.isEmpty())
- {
- return new EmptyClauseFilter(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
- }
- else
- {
- if (isPaging)
- throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses");
- return cfs.getComparator() instanceof CompositeType
- ? new FilterWithCompositeClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows)
- : new FilterWithClauses(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
- }
+ return new EmptyClauseFilter(cfs, dataRange, maxResults, countCQL3Rows, timestamp);
+
+ return new WithClauses(cfs, dataRange, clause, maxResults, countCQL3Rows, timestamp);
}
- protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, long timestamp, boolean countCQL3Rows, boolean isPaging)
+ protected ExtendedFilter(ColumnFamilyStore cfs, DataRange dataRange, int maxResults, boolean countCQL3Rows, long timestamp)
{
assert cfs != null;
- assert filter != null;
+ assert dataRange != null;
this.cfs = cfs;
- this.originalFilter = filter;
+ this.dataRange = dataRange;
this.maxResults = maxResults;
this.timestamp = timestamp;
this.countCQL3Rows = countCQL3Rows;
- this.isPaging = isPaging;
+ this.currentLimit = maxResults;
if (countCQL3Rows)
- originalFilter.updateColumnsLimit(maxResults);
- if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish().remaining() != 0))
- throw new IllegalArgumentException("Cross-row paging is only supported for SliceQueryFilter having an empty finish column");
+ dataRange.updateColumnsLimit(maxResults);
}
public int maxRows()
@@ -98,37 +89,30 @@ public abstract class ExtendedFilter
return countCQL3Rows ? maxResults : Integer.MAX_VALUE;
}
- /**
- * Update the filter if necessary given the number of column already
- * fetched.
- */
- public void updateFilter(int currentColumnsCount)
+ public int currentLimit()
{
- // As soon as we'd done our first call, we want to reset the start column if we're paging
- if (isPaging)
- ((SliceQueryFilter)initialFilter()).setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-
- if (!countCQL3Rows)
- return;
+ return currentLimit;
+ }
- int remaining = maxResults - currentColumnsCount;
- initialFilter().updateColumnsLimit(remaining);
+ public IDiskAtomFilter columnFilter(ByteBuffer key)
+ {
+ return dataRange.columnFilter(key);
}
public int lastCounted(ColumnFamily data)
{
- if (initialFilter() instanceof SliceQueryFilter)
- return ((SliceQueryFilter)initialFilter()).lastCounted();
- else
- return initialFilter().getLiveCount(data, timestamp);
+ return dataRange.getLiveCount(data, timestamp);
}
- /** The initial filter we'll do our first slice with (either the original or a superset of it) */
- public abstract IDiskAtomFilter initialFilter();
-
- public IDiskAtomFilter originalFilter()
+ public void updateFilter(int currentColumnsCount)
{
- return originalFilter;
+ if (!countCQL3Rows)
+ return;
+
+ currentLimit = maxResults - currentColumnsCount;
+ // We propagate that limit to the underlying filter so each internal query don't
+ // fetch more than we needs it to.
+ dataRange.updateColumnsLimit(currentLimit);
}
public abstract List<IndexExpression> getClause();
@@ -138,18 +122,18 @@ public abstract class ExtendedFilter
* @param data the data retrieve by the initial filter
* @return a filter or null if there can't be any columns we missed with our initial filter (typically if it was a names query, or a slice of the entire row)
*/
- public abstract IDiskAtomFilter getExtraFilter(ColumnFamily data);
+ public abstract IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data);
/**
* @return data pruned down to the columns originally asked for
*/
- public abstract ColumnFamily prune(ColumnFamily data);
+ public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data);
/**
* @return true if the provided data satisfies all the expressions from
* the clause of this filter.
*/
- public abstract boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder);
+ public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder);
public static boolean satisfies(int comparison, IndexOperator op)
{
@@ -170,44 +154,54 @@ public abstract class ExtendedFilter
}
}
- private static class FilterWithClauses extends ExtendedFilter
+ public static class WithClauses extends ExtendedFilter
{
- protected final List<IndexExpression> clause;
- protected final IDiskAtomFilter initialFilter;
-
- public FilterWithClauses(ColumnFamilyStore cfs,
- List<IndexExpression> clause,
- IDiskAtomFilter filter,
- int maxResults,
- long timestamp,
- boolean countCQL3Rows)
+ private final List<IndexExpression> clause;
+ private final IDiskAtomFilter optimizedFilter;
+
+ public WithClauses(ColumnFamilyStore cfs,
+ DataRange range,
+ List<IndexExpression> clause,
+ int maxResults,
+ boolean countCQL3Rows,
+ long timestamp)
{
- super(cfs, filter, maxResults, timestamp, countCQL3Rows, false);
+ super(cfs, range, maxResults, countCQL3Rows, timestamp);
assert clause != null;
this.clause = clause;
- this.initialFilter = computeInitialFilter();
+ this.optimizedFilter = computeOptimizedFilter();
}
- /** Sets up the initial filter. */
- protected IDiskAtomFilter computeInitialFilter()
+ /*
+ * Potentially optimize the column filter if we have a change to make it catch all clauses
+ * right away.
+ */
+ private IDiskAtomFilter computeOptimizedFilter()
{
- if (originalFilter instanceof SliceQueryFilter)
+ /*
+ * We shouldn't do the "optimization" for composites as the index names are not valid column names
+ * (which the rest of the method assumes). Said optimization is not useful for composites anyway.
+ * We also don't want to do for paging ranges as the actual filter depends on the row key (it would
+ * probably be possible to make it work but we won't really use it so we don't bother).
+ */
+ if (cfs.getComparator() instanceof CompositeType || dataRange instanceof DataRange.Paging)
+ return null;
+
+ IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since not a paging range
+ if (filter instanceof SliceQueryFilter)
{
// if we have a high chance of getting all the columns in a single index slice (and it's not too costly), do that.
// otherwise, the extraFilter (lazily created) will fetch by name the columns referenced by the additional expressions.
if (cfs.getMaxRowSize() < DatabaseDescriptor.getColumnIndexSize())
{
logger.trace("Expanding slice filter to entire row to cover additional expressions");
- return new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ((SliceQueryFilter) originalFilter).reversed,
- Integer.MAX_VALUE);
+ return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, ((SliceQueryFilter)filter).reversed, Integer.MAX_VALUE);
}
}
else
{
logger.trace("adding columns to original Filter to cover additional expressions");
- assert originalFilter instanceof NamesQueryFilter;
+ assert filter instanceof NamesQueryFilter;
if (!clause.isEmpty())
{
SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
@@ -215,16 +209,17 @@ public abstract class ExtendedFilter
{
columns.add(expr.column_name);
}
- columns.addAll(((NamesQueryFilter) originalFilter).columns);
- return ((NamesQueryFilter)originalFilter).withUpdatedColumns(columns);
+ columns.addAll(((NamesQueryFilter) filter).columns);
+ return ((NamesQueryFilter) filter).withUpdatedColumns(columns);
}
}
- return originalFilter;
+ return null;
}
- public IDiskAtomFilter initialFilter()
+ @Override
+ public IDiskAtomFilter columnFilter(ByteBuffer key)
{
- return initialFilter;
+ return optimizedFilter == null ? dataRange.columnFilter(key) : optimizedFilter;
}
public List<IndexExpression> getClause()
@@ -233,21 +228,13 @@ public abstract class ExtendedFilter
}
/*
- * We may need an extra query only if the original was a slice query (and thus may have miss the expression for the clause).
- * Even then, there is no point in doing an extra query if the original filter grabbed the whole row.
- * Lastly, we only need the extra query if we haven't yet got all the expressions from the clause.
+ * We may need an extra query only if the original query wasn't selecting the row entirely.
+ * Furthermore, we only need the extra query if we haven't yet got all the expressions from the clause.
*/
- private boolean needsExtraQuery(ColumnFamily data)
+ private boolean needsExtraQuery(ByteBuffer rowKey, ColumnFamily data)
{
- if (!(originalFilter instanceof SliceQueryFilter))
- return false;
-
- SliceQueryFilter filter = (SliceQueryFilter)originalFilter;
- // Check if we've fetch the whole row
- if (filter.slices.length == 1
- && filter.start().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- && filter.finish().equals(ByteBufferUtil.EMPTY_BYTE_BUFFER)
- && filter.count == Integer.MAX_VALUE)
+ IDiskAtomFilter filter = columnFilter(rowKey);
+ if (filter instanceof SliceQueryFilter && DataRange.isFullRowSlice((SliceQueryFilter)filter))
return false;
for (IndexExpression expr : clause)
@@ -261,9 +248,18 @@ public abstract class ExtendedFilter
return false;
}
- public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+ public IDiskAtomFilter getExtraFilter(DecoratedKey rowKey, ColumnFamily data)
{
- if (!needsExtraQuery(data))
+ /*
+ * This method assumes the IndexExpression names are valid column names, which is not the
+ * case with composites. This is ok for now however since:
+ * 1) CompositeSearcher doesn't use it.
+ * 2) We don't yet allow non-indexed range slice with filters in CQL3 (i.e. this will never be
+ * called by CFS.filter() for composites).
+ */
+ assert !(cfs.getComparator() instanceof CompositeType);
+
+ if (!needsExtraQuery(rowKey.key, data))
return null;
// Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is
@@ -278,17 +274,19 @@ public abstract class ExtendedFilter
return new NamesQueryFilter(columns);
}
- public ColumnFamily prune(ColumnFamily data)
+ public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
{
- if (initialFilter == originalFilter)
+ if (optimizedFilter == null)
return data;
+
ColumnFamily pruned = data.cloneMeShallow();
- OnDiskAtomIterator iter = originalFilter.getMemtableColumnIterator(data, null);
- originalFilter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
+ IDiskAtomFilter filter = dataRange.columnFilter(rowKey.key);
+ OnDiskAtomIterator iter = filter.getColumnFamilyIterator(rowKey, data);
+ filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), cfs.gcBefore(timestamp), timestamp);
return pruned;
}
- public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+ public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
{
// We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
// where the index returned a row which doesn't have the primary column when we actually read it
@@ -310,7 +308,7 @@ public abstract class ExtendedFilter
}
else
{
- dataValue = extractDataValue(def, rowKey, data, builder);
+ dataValue = extractDataValue(def, rowKey.key, data, builder);
validator = def.getValidator();
}
@@ -346,68 +344,29 @@ public abstract class ExtendedFilter
}
}
- private static class FilterWithCompositeClauses extends FilterWithClauses
- {
- public FilterWithCompositeClauses(ColumnFamilyStore cfs,
- List<IndexExpression> clause,
- IDiskAtomFilter filter,
- int maxResults,
- long timestamp,
- boolean countCQL3Rows)
- {
- super(cfs, clause, filter, maxResults, timestamp, countCQL3Rows);
- }
-
- /*
- * For composites, the index name is not a valid column name (it's only
- * one of the component), which means we should not do the
- * NamesQueryFilter part of FilterWithClauses in particular.
- * Besides, CompositesSearcher doesn't really use the initial filter
- * expect to know the limit set by the user, so create a fake filter
- * with only the count information.
- */
- protected IDiskAtomFilter computeInitialFilter()
- {
- int limit = originalFilter instanceof SliceQueryFilter
- ? ((SliceQueryFilter)originalFilter).count
- : Integer.MAX_VALUE;
- return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, limit);
- }
- }
-
private static class EmptyClauseFilter extends ExtendedFilter
{
- public EmptyClauseFilter(ColumnFamilyStore cfs,
- IDiskAtomFilter filter,
- int maxResults,
- long timestamp,
- boolean countCQL3Rows,
- boolean isPaging)
- {
- super(cfs, filter, maxResults, timestamp, countCQL3Rows, isPaging);
- }
-
- public IDiskAtomFilter initialFilter()
+ public EmptyClauseFilter(ColumnFamilyStore cfs, DataRange range, int maxResults, boolean countCQL3Rows, long timestamp)
{
- return originalFilter;
+ super(cfs, range, maxResults, countCQL3Rows, timestamp);
}
public List<IndexExpression> getClause()
{
- throw new UnsupportedOperationException();
+ return Collections.<IndexExpression>emptyList();
}
- public IDiskAtomFilter getExtraFilter(ColumnFamily data)
+ public IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data)
{
return null;
}
- public ColumnFamily prune(ColumnFamily data)
+ public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data)
{
return data;
}
- public boolean isSatisfiedBy(ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
+ public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
{
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
index 35f71e5..69a8950 100644
--- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java
@@ -41,10 +41,10 @@ import org.apache.cassandra.io.util.FileDataInput;
public interface IDiskAtomFilter
{
/**
- * returns an iterator that returns columns from the given memtable
+ * returns an iterator that returns columns from the given columnFamily
* matching the Filter criteria in sorted order.
*/
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key);
+ public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf);
/**
* Get an iterator that returns columns from the given SSTable using the opened file
@@ -74,6 +74,7 @@ public interface IDiskAtomFilter
public void updateColumnsLimit(int newLimit);
public int getLiveCount(ColumnFamily cf, long now);
+ public ColumnCounter columnCounter(AbstractType<?> comparator, long now);
public IDiskAtomFilter cloneShallow();
public boolean maySelectPrefix(Comparator<ByteBuffer> cmp, ByteBuffer prefix);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
index 297f227..d3f057d 100644
--- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
@@ -27,6 +27,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
@@ -75,9 +76,10 @@ public class NamesQueryFilter implements IDiskAtomFilter
return new NamesQueryFilter(newColumns, countCQL3Rows);
}
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+ public OnDiskAtomIterator getColumnFamilyIterator(DecoratedKey key, ColumnFamily cf)
{
- return Memtable.getNamesIterator(key, cf, this);
+ assert cf != null;
+ return new ByNameColumnIterator(columns.iterator(), cf, key);
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -120,6 +122,8 @@ public class NamesQueryFilter implements IDiskAtomFilter
public int getLiveCount(ColumnFamily cf, long now)
{
+ // Note: we could use columnCounter() but we save the object allocation as it's simple enough
+
if (countCQL3Rows)
return cf.hasOnlyTombstones(now) ? 0 : 1;
@@ -147,6 +151,56 @@ public class NamesQueryFilter implements IDiskAtomFilter
return true;
}
+ public boolean countCQL3Rows()
+ {
+ return countCQL3Rows;
+ }
+
+ public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
+ {
+ return countCQL3Rows
+ ? new ColumnCounter.GroupByPrefix(now, null, 0)
+ : new ColumnCounter(now);
+ }
+
+ private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
+ {
+ private final ColumnFamily cf;
+ private final DecoratedKey key;
+ private final Iterator<ByteBuffer> iter;
+
+ public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
+ {
+ this.iter = iter;
+ this.cf = cf;
+ this.key = key;
+ }
+
+ public ColumnFamily getColumnFamily()
+ {
+ return cf;
+ }
+
+ public DecoratedKey getKey()
+ {
+ return key;
+ }
+
+ protected OnDiskAtom computeNext()
+ {
+ while (iter.hasNext())
+ {
+ ByteBuffer current = iter.next();
+ Column column = cf.getColumn(current);
+ if (column != null)
+ return column;
+ }
+ return endOfData();
+ }
+
+ public void close() throws IOException { }
+ }
+
public static class Serializer implements IVersionedSerializer<NamesQueryFilter>
{
public void serialize(NamesQueryFilter f, DataOutput out, int version) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/QueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
index ac0c632..4f71f3a 100644
--- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/QueryFilter.java
@@ -48,13 +48,13 @@ public class QueryFilter
ColumnFamily cf = memtable.getColumnFamily(key);
if (cf == null)
return null;
- return getMemtableColumnIterator(cf, key);
+ return getColumnFamilyIterator(cf);
}
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+ public OnDiskAtomIterator getColumnFamilyIterator(ColumnFamily cf)
{
assert cf != null;
- return filter.getMemtableColumnIterator(cf, key);
+ return filter.getColumnFamilyIterator(key, cf);
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable)
@@ -69,10 +69,15 @@ public class QueryFilter
public void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, final int gcBefore)
{
+ collateOnDiskAtom(returnCF, toCollate, filter, gcBefore, timestamp);
+ }
+
+ public static void collateOnDiskAtom(final ColumnFamily returnCF, List<? extends Iterator<? extends OnDiskAtom>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
+ {
List<Iterator<Column>> filteredIterators = new ArrayList<Iterator<Column>>(toCollate.size());
for (Iterator<? extends OnDiskAtom> iter : toCollate)
filteredIterators.add(gatherTombstones(returnCF, iter));
- collateColumns(returnCF, filteredIterators, gcBefore);
+ collateColumns(returnCF, filteredIterators, filter, gcBefore, timestamp);
}
/**
@@ -84,7 +89,12 @@ public class QueryFilter
filter.collectReducedColumns(returnCF, columns, gcBefore, timestamp);
}
- public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, final int gcBefore)
+ public void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, int gcBefore)
+ {
+ collateColumns(returnCF, toCollate, filter, gcBefore, timestamp);
+ }
+
+ public static void collateColumns(final ColumnFamily returnCF, List<? extends Iterator<Column>> toCollate, IDiskAtomFilter filter, int gcBefore, long timestamp)
{
final Comparator<Column> fcomp = filter.getColumnComparator(returnCF.getComparator());
// define a 'reduced' iterator that merges columns w/ the same name, which
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
index b76ce04..9cbc49e 100644
--- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
@@ -95,14 +96,74 @@ public class SliceQueryFilter implements IDiskAtomFilter
return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup);
}
+ public SliceQueryFilter withUpdatedStart(ByteBuffer newStart, AbstractType<?> comparator)
+ {
+ Comparator<ByteBuffer> cmp = reversed ? comparator.reverseComparator : comparator;
+
+ List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>();
+ boolean pastNewStart = false;
+ for (int i = 0; i < slices.length; i++)
+ {
+ ColumnSlice slice = slices[i];
+
+ if (pastNewStart)
+ {
+ newSlices.add(slice);
+ continue;
+ }
+
+ if (slices[i].isBefore(cmp, newStart))
+ continue;
+
+ if (slice.includes(cmp, newStart))
+ newSlices.add(new ColumnSlice(newStart, slice.finish));
+ else
+ newSlices.add(slice);
+
+ pastNewStart = true;
+ }
+ return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()]));
+ }
+
public SliceQueryFilter withUpdatedSlice(ByteBuffer start, ByteBuffer finish)
{
return new SliceQueryFilter(new ColumnSlice[]{ new ColumnSlice(start, finish) }, reversed, count, compositesToGroup);
}
- public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key)
+ public OnDiskAtomIterator getColumnFamilyIterator(final DecoratedKey key, final ColumnFamily cf)
{
- return Memtable.getSliceIterator(key, cf, this);
+ assert cf != null;
+ final Iterator<Column> filteredIter = reversed ? cf.reverseIterator(slices) : cf.iterator(slices);
+
+ return new OnDiskAtomIterator()
+ {
+ public ColumnFamily getColumnFamily()
+ {
+ return cf;
+ }
+
+ public DecoratedKey getKey()
+ {
+ return key;
+ }
+
+ public boolean hasNext()
+ {
+ return filteredIter.hasNext();
+ }
+
+ public OnDiskAtom next()
+ {
+ return filteredIter.next();
+ }
+
+ public void close() throws IOException { }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key)
@@ -122,7 +183,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
public void collectReducedColumns(ColumnFamily container, Iterator<Column> reducedColumns, int gcBefore, long now)
{
- columnCounter = getColumnCounter(container, now);
+ columnCounter = columnCounter(container.getComparator(), now);
while (reducedColumns.hasNext())
{
@@ -144,15 +205,11 @@ public class SliceQueryFilter implements IDiskAtomFilter
public int getLiveCount(ColumnFamily cf, long now)
{
- ColumnCounter counter = getColumnCounter(cf, now);
- for (Column column : cf)
- counter.count(column, cf);
- return counter.live();
+ return columnCounter(cf.getComparator(), now).countAll(cf).live();
}
- private ColumnCounter getColumnCounter(ColumnFamily container, long now)
+ public ColumnCounter columnCounter(AbstractType<?> comparator, long now)
{
- AbstractType<?> comparator = container.getComparator();
if (compositesToGroup < 0)
return new ColumnCounter(now);
else if (compositesToGroup == 0)
@@ -163,7 +220,7 @@ public class SliceQueryFilter implements IDiskAtomFilter
public void trim(ColumnFamily cf, int trimTo, long now)
{
- ColumnCounter counter = getColumnCounter(cf, now);
+ ColumnCounter counter = columnCounter(cf.getComparator(), now);
Collection<Column> columns = reversed
? cf.getReverseSortedColumns()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 17ac81f..a40f4bd 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -514,14 +515,9 @@ public class SecondaryIndexManager
* @param columnFilter the column range to restrict to
* @return found indexed rows
*/
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter columnFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows)
+ public List<Row> search(ExtendedFilter filter)
{
- List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
+ List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(filter.getClause());
if (indexSearchers.isEmpty())
return Collections.emptyList();
@@ -530,7 +526,7 @@ public class SecondaryIndexManager
if (indexSearchers.size() > 1)
throw new RuntimeException("Unable to search across multiple secondary index types");
- return indexSearchers.get(0).search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
+ return indexSearchers.get(0).search(filter);
}
public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index ddd79dd..d28afc0 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -21,8 +21,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
@@ -39,12 +38,7 @@ public abstract class SecondaryIndexSearcher
this.baseCfs = indexManager.baseCfs;
}
- public abstract List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows);
+ public abstract List<Row> search(ExtendedFilter filter);
/**
* @return true this index is able to handle given clauses.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index e8c0a09..f9b7b11 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -44,16 +44,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows)
+ public List<Row> search(ExtendedFilter filter)
{
- assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
- return baseCfs.filter(getIndexedIterator(range, filter), filter);
+ assert filter.getClause() != null && !filter.getClause().isEmpty();
+ return baseCfs.filter(getIndexedIterator(filter), filter);
}
private ByteBuffer makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart)
@@ -62,10 +56,11 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return ByteBufferUtil.EMPTY_BYTE_BUFFER;
ColumnNameBuilder builder;
- if (filter.originalFilter() instanceof SliceQueryFilter)
+ IDiskAtomFilter columnFilter = filter.columnFilter(key);
+ if (columnFilter instanceof SliceQueryFilter)
{
- SliceQueryFilter originalFilter = (SliceQueryFilter)filter.originalFilter();
- builder = index.makeIndexColumnNameBuilder(key, isStart ? originalFilter.start() : originalFilter.finish());
+ SliceQueryFilter sqf = (SliceQueryFilter)columnFilter;
+ builder = index.makeIndexColumnNameBuilder(key, isStart ? sqf.start() : sqf.finish());
}
else
{
@@ -74,7 +69,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
return isStart ? builder.build() : builder.buildAsEndOfRange();
}
- private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
{
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
@@ -93,6 +88,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
* possible key having a given token. A fix would be to actually store the token along the key in the
* indexed row.
*/
+ final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -140,12 +136,12 @@ public class CompositesSearcher extends SecondaryIndexSearcher
DecoratedKey currentKey = null;
ColumnFamily data = null;
int columnsCount = 0;
- int limit = ((SliceQueryFilter)filter.initialFilter()).count;
+ int limit = filter.currentLimit();
while (true)
{
// Did we got more columns that needed to respect the user limit?
- // (but we still need to return was fetch already)
+ // (but we still need to return what was fetch already)
if (columnsCount > limit)
return makeReturn(currentKey, data);
@@ -235,7 +231,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
// Check if this entry cannot be a hit due to the original column filter
ByteBuffer start = entry.indexedEntryStart();
- if (!filter.originalFilter().maySelectPrefix(baseComparator, start))
+ if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
continue;
logger.trace("Adding index hit to current row for {}", indexComparator.getString(column.name()));
@@ -256,7 +252,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
assert newData != null : "An entry with not data should have been considered stale";
- if (!filter.isSatisfiedBy(dk.key, newData, entry.indexedEntryNameBuilder))
+ if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder))
continue;
if (data == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index e919d8a..205efb7 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -47,20 +47,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
@Override
- public List<Row> search(AbstractBounds<RowPosition> range,
- List<IndexExpression> clause,
- IDiskAtomFilter dataFilter,
- int maxResults,
- long now,
- boolean countCQL3Rows)
+ public List<Row> search(ExtendedFilter filter)
{
- assert clause != null && !clause.isEmpty();
- ExtendedFilter filter = ExtendedFilter.create(baseCfs, clause, dataFilter, maxResults, now, countCQL3Rows, false);
- return baseCfs.filter(getIndexedIterator(range, filter), filter);
+ assert filter.getClause() != null && !filter.getClause().isEmpty();
+ return baseCfs.filter(getIndexedIterator(filter), filter);
}
- private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final AbstractBounds<RowPosition> range, final ExtendedFilter filter)
+ private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final ExtendedFilter filter)
{
+
// Start with the most-restrictive indexed clause, then apply remaining clauses
// to each row matching that clause.
// TODO: allow merge join instead of just one index + loop
@@ -79,6 +74,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
* possible key having a given token. A fix would be to actually store the token along the key in the
* indexed row.
*/
+ final AbstractBounds<RowPosition> range = filter.dataRange.keyRange();
final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -165,14 +161,14 @@ public class KeysSearcher extends SecondaryIndexSearcher
}
logger.trace("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.initialFilter(), filter.timestamp));
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey), filter.timestamp));
// While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
if (data == null)
data = TreeMapBackedSortedColumns.factory.create(baseCfs.metadata);
// as in CFS.filter - extend the filter to ensure we include the columns
// from the index expressions, just in case they weren't included in the initialFilter
- IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
+ IDiskAtomFilter extraFilter = filter.getExtraFilter(dk, data);
if (extraFilter != null)
{
ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 85f2677..6c10d95 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DataTracker;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
@@ -994,17 +995,12 @@ public class SSTableReader extends SSTable
/**
*
- * @param filter filter to use when reading the columns
+ * @param dataRange filter to use when reading the columns
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public SSTableScanner getScanner(QueryFilter filter)
+ public SSTableScanner getScanner(DataRange dataRange)
{
- return new SSTableScanner(this, filter, null);
- }
-
- public SSTableScanner getScanner(QueryFilter filter, RowPosition startWith)
- {
- return new SSTableScanner(this, filter, startWith, null);
+ return new SSTableScanner(this, dataRange, null);
}
/**
@@ -1018,7 +1014,7 @@ public class SSTableReader extends SSTable
public SSTableScanner getScanner(RateLimiter limiter)
{
- return new SSTableScanner(this, null, limiter);
+ return new SSTableScanner(this, DataRange.allData(partitioner), limiter);
}
/**
@@ -1034,7 +1030,7 @@ public class SSTableReader extends SSTable
Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
if (rangeIterator.hasNext())
- return new SSTableScanner(this, null, range, limiter);
+ return new SSTableScanner(this, DataRange.forKeyRange(range), limiter);
else
return new EmptyCompactionScanner(getFilename());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index fb52a02..66e7189 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -21,9 +21,9 @@ import java.io.IOException;
import java.util.Iterator;
import com.google.common.util.concurrent.RateLimiter;
-
import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowPosition;
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.columniterator.LazyColumnIterator;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.util.FileUtils;
@@ -43,43 +44,46 @@ public class SSTableScanner implements ICompactionScanner
protected final RandomAccessReader dfile;
protected final RandomAccessReader ifile;
public final SSTableReader sstable;
+ private final DataRange dataRange;
+ private final long stopAt;
+
protected Iterator<OnDiskAtomIterator> iterator;
- private final QueryFilter filter;
- private long stopAt;
/**
* @param sstable SSTable to scan; must not be null
- * @param filter filter to use when scanning the columns; may be null
+ * @param filter range of data to fetch; must not be null
* @param limiter background i/o RateLimiter; may be null
*/
- SSTableScanner(SSTableReader sstable, QueryFilter filter, RateLimiter limiter)
+ SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
{
assert sstable != null;
this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader();
this.sstable = sstable;
- this.filter = filter;
- stopAt = dfile.length();
+ this.dataRange = dataRange;
+ this.stopAt = computeStopAt();
+ seekToStart();
}
- public SSTableScanner(SSTableReader sstable, QueryFilter filter, RowPosition startWith, RateLimiter limiter)
+ private void seekToStart()
{
- this(sstable, filter, limiter);
+ if (dataRange.startKey().isMinimum(sstable.partitioner))
+ return;
- long indexPosition = sstable.getIndexScanPosition(startWith);
+ long indexPosition = sstable.getIndexScanPosition(dataRange.startKey());
// -1 means the key is before everything in the sstable. So just start from the beginning.
if (indexPosition == -1)
- indexPosition = 0;
- ifile.seek(indexPosition);
+ return;
+ ifile.seek(indexPosition);
try
{
while (!ifile.isEOF())
{
indexPosition = ifile.getFilePointer();
DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
- int comparison = indexDecoratedKey.compareTo(startWith);
+ int comparison = indexDecoratedKey.compareTo(dataRange.startKey());
if (comparison >= 0)
{
// Found, just read the dataPosition and seek into index and data files
@@ -102,19 +106,14 @@ public class SSTableScanner implements ICompactionScanner
}
- public SSTableScanner(SSTableReader sstable, QueryFilter filter, Range<Token> range, RateLimiter limiter)
+ private long computeStopAt()
{
- this(sstable, filter, range.toRowBounds().left, limiter);
+ AbstractBounds<RowPosition> keyRange = dataRange.keyRange();
+ if (dataRange.stopKey().isMinimum(sstable.partitioner) || (keyRange instanceof Range && ((Range)keyRange).isWrapAround()))
+ return dfile.length();
- if (range.isWrapAround())
- {
- stopAt = dfile.length();
- }
- else
- {
- RowIndexEntry position = sstable.getPosition(range.toRowBounds().right, SSTableReader.Operator.GT);
- stopAt = position == null ? dfile.length() : position.position;
- }
+ RowIndexEntry position = sstable.getPosition(keyRange.toRowBounds().right, SSTableReader.Operator.GT);
+ return position == null ? dfile.length() : position.position;
}
public void close() throws IOException
@@ -202,8 +201,7 @@ public class SSTableScanner implements ICompactionScanner
}
assert !dfile.isEOF();
-
- if (filter == null)
+ if (dataRange.selectsFullRowFor(currentKey.key))
{
dfile.seek(currentEntry.position);
ByteBufferUtil.readWithShortLength(dfile); // key
@@ -217,7 +215,7 @@ public class SSTableScanner implements ICompactionScanner
{
public OnDiskAtomIterator create()
{
- return filter.getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
+ return dataRange.columnFilter(currentKey.key).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 43e2cb2..4ac408e 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -116,6 +116,7 @@ public final class MessagingService implements MessagingServiceMBean
PAXOS_PREPARE,
PAXOS_PROPOSE,
PAXOS_COMMIT,
+ PAGED_RANGE,
// remember to add new verbs at the end, since we serialize by ordinal
UNUSED_1,
UNUSED_2,
@@ -136,6 +137,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.READ, Stage.READ);
put(Verb.RANGE_SLICE, Stage.READ);
put(Verb.INDEX_SCAN, Stage.READ);
+ put(Verb.PAGED_RANGE, Stage.READ);
put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
@@ -188,6 +190,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.READ_REPAIR, RowMutation.serializer);
put(Verb.READ, ReadCommand.serializer);
put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
+ put(Verb.PAGED_RANGE, RangeSliceCommand.serializer);
put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
put(Verb.TREE_REQUEST, ActiveRepairService.TreeRequest.serializer);
put(Verb.TREE_RESPONSE, ActiveRepairService.Validator.serializer);
@@ -216,6 +219,7 @@ public final class MessagingService implements MessagingServiceMBean
put(Verb.READ_REPAIR, WriteResponse.serializer);
put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
+ put(Verb.PAGED_RANGE, RangeSliceReply.serializer);
put(Verb.READ, ReadResponse.serializer);
put(Verb.TRUNCATE, TruncateResponse.serializer);
put(Verb.SNAPSHOT, null);
@@ -293,6 +297,7 @@ public final class MessagingService implements MessagingServiceMBean
Verb.READ_REPAIR,
Verb.READ,
Verb.RANGE_SLICE,
+ Verb.PAGED_RANGE,
Verb.REQUEST_RESPONSE);
// total dropped message counts for server lifetime
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 6483d9b..38d103d 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -17,9 +17,17 @@
*/
package org.apache.cassandra.service;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.util.UUID;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -30,6 +38,7 @@ public class QueryState
private final ClientState clientState;
private volatile long clock;
private volatile UUID preparedTracingSession;
+ private volatile Pager pager;
public QueryState(ClientState clientState)
{
@@ -68,6 +77,13 @@ public class QueryState
this.preparedTracingSession = sessionId;
}
+ public UUID getAndResetCurrentTracingSession()
+ {
+ UUID previous = preparedTracingSession;
+ preparedTracingSession = null;
+ return previous;
+ }
+
public void createTracingSession()
{
if (this.preparedTracingSession == null)
@@ -77,9 +93,53 @@ public class QueryState
else
{
UUID session = this.preparedTracingSession;
- this.preparedTracingSession = null;
Tracing.instance.newSession(session);
}
}
-}
+ public void attachPager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+ {
+ pager = new Pager(queryPager, statement, variables);
+ }
+
+ public boolean hasPager()
+ {
+ return pager != null;
+ }
+
+ public void dropPager()
+ {
+ pager = null;
+ }
+
+ public ResultMessage.Rows getNextPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ assert pager != null; // We've already validated (in ServerConnection) that this should not be null
+
+ int currentLimit = pager.queryPager.maxRemaining();
+ List<Row> page = pager.queryPager.fetchPage(pageSize);
+ ResultMessage.Rows msg = pager.statement.processResults(page, pager.variables, currentLimit, pager.queryPager.timestamp());
+
+ if (pager.queryPager.isExhausted())
+ dropPager();
+ else
+ msg.result.metadata.setHasMorePages();
+
+ return msg;
+ }
+
+ // Groups the actual query pager with the Select Query
+ private static class Pager
+ {
+ private final QueryPager queryPager;
+ private final SelectStatement statement;
+ private final List<ByteBuffer> variables;
+
+ private Pager(QueryPager queryPager, SelectStatement statement, List<ByteBuffer> variables)
+ {
+ this.queryPager = queryPager;
+ this.statement = statement;
+ this.variables = variables;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
index f63fcb1..9a7d1d2 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
@@ -19,39 +19,16 @@ package org.apache.cassandra.service;
import java.util.List;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.RangeSliceCommand;
+import org.apache.cassandra.db.AbstractRangeCommand;
import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tracing.Tracing;
-public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
+public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand>
{
- public static List<Row> executeLocally(RangeSliceCommand command)
- {
- ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
- if (cfs.indexManager.hasIndexFor(command.row_filter))
- return cfs.search(command.range,
- command.row_filter,
- command.predicate,
- command.maxResults,
- command.timestamp,
- command.countCQL3Rows);
- else
- return cfs.getRangeSlice(command.range,
- command.row_filter,
- command.predicate,
- command.maxResults,
- command.timestamp,
- command.countCQL3Rows,
- command.isPaging);
- }
-
- public void doVerb(MessageIn<RangeSliceCommand> message, int id)
+ public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
{
try
{
@@ -60,7 +37,7 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand>
/* Don't service reads! */
throw new RuntimeException("Cannot service reads while bootstrapping!");
}
- RangeSliceReply reply = new RangeSliceReply(executeLocally(message.payload));
+ RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
Tracing.trace("Enqueuing response to {}", message.from);
MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 612f89b..0286bd3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1294,11 +1294,11 @@ public class StorageProxy implements StorageProxyMBean
static class LocalRangeSliceRunnable extends DroppableRunnable
{
- private final RangeSliceCommand command;
+ private final AbstractRangeCommand command;
private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
private final long start = System.nanoTime();
- LocalRangeSliceRunnable(RangeSliceCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+ LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
{
super(MessagingService.Verb.READ);
this.command = command;
@@ -1307,7 +1307,7 @@ public class StorageProxy implements StorageProxyMBean
protected void runMayThrow()
{
- RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
+ RangeSliceReply result = new RangeSliceReply(command.executeLocally());
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
handler.response(result);
}
@@ -1337,7 +1337,7 @@ public class StorageProxy implements StorageProxyMBean
return inter;
}
- public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
+ public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
throws UnavailableException, ReadTimeoutException
{
Tracing.trace("Determining replicas to query");
@@ -1348,11 +1348,9 @@ public class StorageProxy implements StorageProxyMBean
// now scan until we have enough results
try
{
- IDiskAtomFilter commandPredicate = command.predicate;
-
int cql3RowCount = 0;
rows = new ArrayList<Row>();
- List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
+ List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.keyRange);
int i = 0;
AbstractBounds<RowPosition> nextRange = null;
List<InetAddress> nextEndpoints = null;
@@ -1408,15 +1406,7 @@ public class StorageProxy implements StorageProxyMBean
++i;
}
- RangeSliceCommand nodeCmd = new RangeSliceCommand(command.keyspace,
- command.column_family,
- command.timestamp,
- commandPredicate,
- range,
- command.row_filter,
- command.maxResults,
- command.countCQL3Rows,
- command.isPaging);
+ AbstractRangeCommand nodeCmd = command.forSubRange(range);
// collect replies and resolve according to consistency level
RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
@@ -1431,7 +1421,7 @@ public class StorageProxy implements StorageProxyMBean
}
else
{
- MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
+ MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : filteredEndpoints)
{
Tracing.trace("Enqueuing request to {}", endpoint);
@@ -1444,8 +1434,8 @@ public class StorageProxy implements StorageProxyMBean
for (Row row : handler.get())
{
rows.add(row);
- if (nodeCmd.countCQL3Rows)
- cql3RowCount += row.getLiveCount(commandPredicate, command.timestamp);
+ if (nodeCmd.countCQL3Rows())
+ cql3RowCount += row.getLiveCount(command.predicate, command.timestamp);
}
FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
}
@@ -1462,18 +1452,9 @@ public class StorageProxy implements StorageProxyMBean
}
// if we're done, great, otherwise, move to the next range
- int count = nodeCmd.countCQL3Rows ? cql3RowCount : rows.size();
- if (count >= nodeCmd.maxResults)
+ int count = nodeCmd.countCQL3Rows() ? cql3RowCount : rows.size();
+ if (count >= nodeCmd.limit())
break;
-
- // if we are paging and already got some rows, reset the column filter predicate,
- // so we start iterating the next row from the first column
- if (!rows.isEmpty() && command.isPaging)
- {
- // We only allow paging with a slice filter (doesn't make sense otherwise anyway)
- assert commandPredicate instanceof SliceQueryFilter;
- commandPredicate = ((SliceQueryFilter)commandPredicate).withUpdatedSlices(ColumnSlice.ALL_COLUMNS_ARRAY);
- }
}
}
finally
@@ -1483,13 +1464,13 @@ public class StorageProxy implements StorageProxyMBean
return trim(command, rows);
}
- private static List<Row> trim(RangeSliceCommand command, List<Row> rows)
+ private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
{
- // When countCQL3Rows, we let the caller trim the result.
- if (command.countCQL3Rows)
+ // When maxIsColumns, we let the caller trim the result.
+ if (command.countCQL3Rows())
return rows;
else
- return rows.size() > command.maxResults ? rows.subList(0, command.maxResults) : rows;
+ return rows.size() > command.limit() ? rows.subList(0, command.limit()) : rows;
}
/**
[4/4] git commit: Add auto paging capability to the native protocol
Posted by sl...@apache.org.
Add auto paging capability to the native protocol
This also generalize the paging used internally, and pages CQL3 'select
count' operation to avoid OOM.
patch by slebresne; reviewed by iamaleksey for CASSANDRA-4415
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e48ff293
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e48ff293
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e48ff293
Branch: refs/heads/trunk
Commit: e48ff29387547c0837ab381f6e890f8417a1b65c
Parents: 40bc445
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue May 28 10:17:26 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 25 10:30:12 2013 +0200
----------------------------------------------------------------------
doc/native_protocol_v2.spec | 92 +++++-
src/java/org/apache/cassandra/auth/Auth.java | 3 +-
.../cassandra/auth/CassandraAuthorizer.java | 3 +-
.../cassandra/auth/PasswordAuthenticator.java | 3 +-
.../org/apache/cassandra/cql3/CQLStatement.java | 5 +-
.../apache/cassandra/cql3/QueryProcessor.java | 14 +-
.../org/apache/cassandra/cql3/ResultSet.java | 25 +-
.../statements/AuthenticationStatement.java | 2 +-
.../cql3/statements/AuthorizationStatement.java | 2 +-
.../cql3/statements/BatchStatement.java | 2 +-
.../cql3/statements/ModificationStatement.java | 2 +-
.../statements/SchemaAlteringStatement.java | 2 +-
.../cql3/statements/SelectStatement.java | 72 ++++-
.../cql3/statements/TruncateStatement.java | 2 +-
.../cassandra/cql3/statements/UseStatement.java | 2 +-
.../cassandra/db/AbstractRangeCommand.java | 67 +++++
.../apache/cassandra/db/ColumnFamilyStore.java | 123 ++++++--
src/java/org/apache/cassandra/db/DataRange.java | 218 ++++++++++++++
.../cassandra/db/HintedHandOffManager.java | 2 +-
src/java/org/apache/cassandra/db/Memtable.java | 85 ------
.../apache/cassandra/db/PagedRangeCommand.java | 196 +++++++++++++
.../apache/cassandra/db/RangeSliceCommand.java | 115 +++++---
.../org/apache/cassandra/db/ReadCommand.java | 3 +-
.../apache/cassandra/db/RowIteratorFactory.java | 30 +-
.../cassandra/db/SliceFromReadCommand.java | 5 +
.../apache/cassandra/db/SliceQueryPager.java | 88 ------
.../org/apache/cassandra/db/SystemTable.java | 3 +-
src/java/org/apache/cassandra/db/Table.java | 4 +-
.../cassandra/db/filter/ColumnCounter.java | 10 +
.../apache/cassandra/db/filter/ColumnSlice.java | 6 +
.../cassandra/db/filter/ExtendedFilter.java | 237 +++++++---------
.../cassandra/db/filter/IDiskAtomFilter.java | 5 +-
.../cassandra/db/filter/NamesQueryFilter.java | 58 +++-
.../apache/cassandra/db/filter/QueryFilter.java | 20 +-
.../cassandra/db/filter/SliceQueryFilter.java | 77 ++++-
.../db/index/SecondaryIndexManager.java | 12 +-
.../db/index/SecondaryIndexSearcher.java | 10 +-
.../db/index/composites/CompositesSearcher.java | 30 +-
.../cassandra/db/index/keys/KeysSearcher.java | 20 +-
.../cassandra/io/sstable/SSTableReader.java | 16 +-
.../cassandra/io/sstable/SSTableScanner.java | 52 ++--
.../apache/cassandra/net/MessagingService.java | 5 +
.../apache/cassandra/service/QueryState.java | 64 ++++-
.../service/RangeSliceVerbHandler.java | 31 +-
.../apache/cassandra/service/StorageProxy.java | 49 +---
.../service/pager/AbstractQueryPager.java | 245 ++++++++++++++++
.../service/pager/MultiPartitionPager.java | 114 ++++++++
.../service/pager/NamesQueryPager.java | 94 ++++++
.../cassandra/service/pager/Pageable.java | 38 +++
.../cassandra/service/pager/QueryPager.java | 75 +++++
.../cassandra/service/pager/QueryPagers.java | 184 ++++++++++++
.../service/pager/RangeNamesQueryPager.java | 90 ++++++
.../service/pager/RangeSliceQueryPager.java | 99 +++++++
.../service/pager/SinglePartitionPager.java | 30 ++
.../service/pager/SliceQueryPager.java | 72 +++++
.../cassandra/thrift/CassandraServer.java | 142 ++++------
.../cassandra/thrift/ThriftConversion.java | 4 +-
.../org/apache/cassandra/transport/CBCodec.java | 2 +-
.../org/apache/cassandra/transport/Client.java | 44 ++-
.../org/apache/cassandra/transport/Message.java | 7 +-
.../cassandra/transport/ServerConnection.java | 18 +-
.../cassandra/transport/SimpleClient.java | 4 +-
.../transport/messages/AuthChallenge.java | 4 +-
.../transport/messages/AuthResponse.java | 4 +-
.../transport/messages/AuthSuccess.java | 4 +-
.../transport/messages/AuthenticateMessage.java | 4 +-
.../transport/messages/BatchMessage.java | 4 +-
.../transport/messages/CredentialsMessage.java | 4 +-
.../transport/messages/ErrorMessage.java | 4 +-
.../transport/messages/EventMessage.java | 4 +-
.../transport/messages/ExecuteMessage.java | 34 ++-
.../transport/messages/NextMessage.java | 118 ++++++++
.../transport/messages/OptionsMessage.java | 4 +-
.../transport/messages/PrepareMessage.java | 4 +-
.../transport/messages/QueryMessage.java | 36 ++-
.../transport/messages/ReadyMessage.java | 4 +-
.../transport/messages/RegisterMessage.java | 4 +-
.../transport/messages/ResultMessage.java | 28 +-
.../transport/messages/StartupMessage.java | 4 +-
.../transport/messages/SupportedMessage.java | 4 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 113 +++++---
.../db/compaction/CompactionsTest.java | 4 +-
.../cassandra/db/compaction/TTLExpiryTest.java | 4 +-
.../cassandra/service/QueryPagerTest.java | 283 +++++++++++++++++++
84 files changed, 2998 insertions(+), 812 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index 3959a15..2cc771d 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -22,6 +22,7 @@ Table of Contents
4.1.6. EXECUTE
4.1.7. BATCH
4.1.8. REGISTER
+ 4.1.9. NEXT
4.2. Responses
4.2.1. ERROR
4.2.2. READY
@@ -38,8 +39,9 @@ Table of Contents
4.2.8. AUTH_SUCCESS
5. Compression
6. Collection types
- 7. Error codes
- 8. Changes from v1
+ 7. Result paging
+ 8. Error codes
+ 9. Changes from v1
1. Overview
@@ -98,7 +100,7 @@ Table of Contents
0x82 Response frame for this protocol version
This document describe the version 2 of the protocol. For the changes made since
- version 1, see Section 8.
+ version 1, see Section 9.
2.2. flags
@@ -165,6 +167,7 @@ Table of Contents
0x0E AUTH_CHALLENGE
0x0F AUTH_RESPONSE
0x10 AUTH_SUCCESS
+ 0x11 NEXT
Messages are described in Section 4.
@@ -276,10 +279,13 @@ Table of Contents
4.1.4. QUERY
Performs a CQL query. The body of the message must be:
- <query><consistency>[<n><value_1>...<value_n>]
+ <query><consistency><result_page_size>[<n><value_1>...<value_n>]
where:
- <query> the query, [long string].
- <consistency> is the [consistency] level for the operation.
+ - <result_page_size> is an [int] controlling the desired page size of the
+ result (in CQL3 rows). A negative value disable paging of the result. See the
+ section on paging (Section 7) for more details.
- optional: <n> [short], the number of following values.
- optional: <value_1>...<value_n> are [bytes] to use for bound variables in the query.
@@ -302,7 +308,7 @@ Table of Contents
4.1.6. EXECUTE
Executes a prepared query. The body of the message must be:
- <id><n><value_1>....<value_n><consistency>
+ <id><n><value_1>....<value_n><consistency><result_page_size>
where:
- <id> is the prepared query ID. It's the [short bytes] returned as a
response to a PREPARE message.
@@ -310,6 +316,9 @@ Table of Contents
- <value_1>...<value_n> are the [bytes] to use for bound variables in the
prepared query.
- <consistency> is the [consistency] level for the operation.
+ - <result_page_size> is an [int] controlling the desired page size of the
+ result (in CQL3 rows). A negative value disable paging of the result. See the
+ section on paging (Section 7) for more details.
Note that the consistency is ignored by some (prepared) queries (USE, CREATE,
ALTER, TRUNCATE, ...).
@@ -360,6 +369,17 @@ Table of Contents
multiple times the same event messages, wasting bandwidth.
+4.1.9. NEXT
+
+ Request the next page of result if paging was requested by a QUERY or EXECUTE
+ statement and there is more result to fetch (see Section 7 for more details).
+ The body of a NEXT message is a single [int] indicating the number of maximum
+ rows to return with the next page of results (it is equivalent to the
+ <result_page_size> in a QUERY or EXECUTE message).
+
+ The result to a NEXT message will be a RESULT message.
+
+
4.2. Responses
This section describes the content of the frame body for the different
@@ -372,7 +392,7 @@ Table of Contents
Indicates an error processing a request. The body of the message will be an
error code ([int]) followed by a [string] error message. Then, depending on
the exception, more content may follow. The error codes are defined in
- Section 7, along with their additional content if any.
+ Section 8, along with their additional content if any.
4.2.2. READY
@@ -450,6 +470,12 @@ Table of Contents
0x0001 Global_tables_spec: if set, only one table spec (keyspace
and table name) is provided as <global_table_spec>. If not
set, <global_table_spec> is not present.
+ 0x0002 Has_more_pages: indicates whether this is not the last
+ page of results and more should be retrieve using a NEXT
+ message. If not set, this is the laste "page" of result
+ and NEXT cannot and should not be used. If no result
+ paging has been requested in the QUERY/EXECUTE/BATCH
+ message, this will never be set.
- <columns_count> is an [int] representing the number of columns selected
by the query this result is of. It defines the number of <col_spec_i>
elements in and the number of element for each row in <rows_content>.
@@ -623,7 +649,52 @@ Table of Contents
value.
-7. Error codes
+7. Result paging
+
+ The protocol allows for paging the result of queries. For that, the QUERY and
+ EXECUTE messages have a <result_page_size> value that indicate the desired
+ page size in CQL3 rows.
+
+ If a positive value is provided for <result_page_size>, the result set of the
+ RESULT message returned for the query will contain at most the
+ <result_page_size> first rows of the query result. If that first page of result
+ contains the full result set for the query, the RESULT message (of kind `Rows`)
+ will have the Has_more_pages flag *not* set. However, if some results are not
+ part of the first response, the Has_more_pages flag will be set. In that latter
+ case, more rows of the result can be retrieved by sending a NEXT message *with the
+ same stream id than the initial query*. The NEXT message also contains its own
+ <result_page_size> that control how many of the remaining result rows will be
+ sent in response. If the response to this NEXT message still does not contains
+ the full remainder of the query result set (the Has_more_pages is set once more),
+ another NEXT message can be send for more result, etc...
+
+ If a RESULT message has the Has_more_pages flag set and any other message than
+ a NEXT message is send on the same stream id, the query is cancelled and no more
+ of its result can be retrieved.
+
+ Only CQL3 queries that return a result set (RESULT message with a Rows `kind`)
+ support paging. For other type of queries, the <result_page_size> value is
+ ignored.
+
+ The <result_page_size> can be set to a negative value to disable paging (in
+ which case the whole result set will be retuned in the first RESULT message,
+ message that will not have the Has_more_pages flag set). The
+ <result_page_size> value cannot be 0.
+
+ Note to client implementors:
+ - While <result_page_size> can be as low as 1, it will likely be detrimental
+ to performance to pick a value too low. A value below 100 is probably too
+ low for most use cases.
+ - Clients should not rely on the actual size of the result set returned to
+ decide if a NEXT message should be issued. Instead, they should always
+ check the Has_more_pages flag (unless they did not enabled paging for the query
+ obviously). Clients should also not assert that no result will have more than
+ <result_page_size> results. While the current implementation always respect
+ the exact value of <result_page_size>, we reserve ourselves the right to return
+ slightly smaller or bigger pages in the future for performance reasons.
+
+
+8. Error codes
The supported error codes are described below:
0x0000 Server error: something unexpected happened. This indicates a
@@ -715,7 +786,7 @@ Table of Contents
this host. The rest of the ERROR message body will be [short
bytes] representing the unknown ID.
-8. Changes from v1
+9. Changes from v1
* Protocol is versioned to allow old client connects to a newer server, if a
newer client connects to an older server, it needs to check if it gets a
ProtocolException on connection and try connecting with a lower version.
@@ -727,3 +798,8 @@ Table of Contents
removed and replaced by a server/client challenges/responses exchanges (done
through the new AUTH_RESPONSE/AUTH_CHALLENGE messages). See Section 4.2.3 for
details.
+ * Query paging has been added (Section 7): QUERY and EXECUTE message have an
+ additional <result_page_size> [int], a new NEXT message has been added and
+ the Rows kind of RESULT message has an additional flag. Note that paging is
+ optional, and a client that don't want to handle it can always pass -1 for
+ the <result_page_size> in QUERY and EXECUTE.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index c561aab..559e10c 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -232,7 +232,8 @@ public class Auth
{
ResultMessage.Rows rows = selectUserStatement.execute(consistencyForUser(username),
new QueryState(new ClientState(true)),
- Lists.newArrayList(ByteBufferUtil.bytes(username)));
+ Lists.newArrayList(ByteBufferUtil.bytes(username)),
+ -1);
return new UntypedResultSet(rows.result);
}
catch (RequestValidationException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index 396be71..6f490f8 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -74,7 +74,8 @@ public class CassandraAuthorizer implements IAuthorizer
ResultMessage.Rows rows = authorizeStatement.execute(ConsistencyLevel.ONE,
new QueryState(new ClientState(true)),
Lists.newArrayList(ByteBufferUtil.bytes(user.getName()),
- ByteBufferUtil.bytes(resource.getName())));
+ ByteBufferUtil.bytes(resource.getName())),
+ -1);
result = new UntypedResultSet(rows.result);
}
catch (RequestValidationException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 12dbdee..4d37b7e 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -108,7 +108,8 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
{
ResultMessage.Rows rows = authenticateStatement.execute(consistencyForUser(username),
new QueryState(new ClientState(true)),
- Lists.newArrayList(ByteBufferUtil.bytes(username)));
+ Lists.newArrayList(ByteBufferUtil.bytes(username)),
+ -1);
result = new UntypedResultSet(rows.result);
}
catch (RequestValidationException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 63f9cc6..a4abaf1 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -51,11 +51,14 @@ public interface CQLStatement
/**
* Execute the statement and return the resulting result or null if there is no result.
*
+ * @param cl the consistency level for the query
* @param state the current query state
* @param variables the values for bounded variables. The implementation
* can assume that each bound term have a corresponding value.
+ * @param pageSize the initial page size for the result set potentially returned. A negative value
+ * means no paging needs to be done. Statements that do not return result sets can ignore this value.
*/
- public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException, RequestExecutionException;
+ public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException;
/**
* Variante of execute used for internal query against the system tables, and thus only query the local node.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 1b89fe3..1de985b 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -106,30 +106,30 @@ public class QueryProcessor
}
}
- private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+ private static ResultMessage processStatement(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
throws RequestExecutionException, RequestValidationException
{
logger.trace("Process {} @CL.{}", statement, cl);
ClientState clientState = queryState.getClientState();
statement.checkAccess(clientState);
statement.validate(clientState);
- ResultMessage result = statement.execute(cl, queryState, variables);
+ ResultMessage result = statement.execute(cl, queryState, variables, pageSize);
return result == null ? new ResultMessage.Void() : result;
}
public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
throws RequestExecutionException, RequestValidationException
{
- return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState);
+ return process(queryString, Collections.<ByteBuffer>emptyList(), cl, queryState, -1);
}
- public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState)
+ public static ResultMessage process(String queryString, List<ByteBuffer> variables, ConsistencyLevel cl, QueryState queryState, int pageSize)
throws RequestExecutionException, RequestValidationException
{
CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
if (prepared.getBoundsTerms() != variables.size())
throw new InvalidRequestException("Invalid amount of bind variables");
- return processStatement(prepared, cl, queryState, variables);
+ return processStatement(prepared, cl, queryState, variables, pageSize);
}
public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
@@ -228,7 +228,7 @@ public class QueryProcessor
}
}
- public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+ public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
throws RequestExecutionException, RequestValidationException
{
// Check to see if there are any bound variables to verify
@@ -246,7 +246,7 @@ public class QueryProcessor
logger.trace("[{}] '{}'", i+1, variables.get(i));
}
- return processStatement(statement, cl, queryState, variables);
+ return processStatement(statement, cl, queryState, variables, pageSize);
}
public static ResultMessage processBatch(BatchStatement batch, ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/ResultSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ResultSet.java b/src/java/org/apache/cassandra/cql3/ResultSet.java
index 451efb2..df892b7 100644
--- a/src/java/org/apache/cassandra/cql3/ResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/ResultSet.java
@@ -97,7 +97,11 @@ public class ResultSet
String ksName = metadata.names.get(0).ksName;
String cfName = metadata.names.get(0).cfName;
long count = rows.size();
+ return makeCountResult(ksName, cfName, count, alias);
+ }
+ public static ResultSet makeCountResult(String ksName, String cfName, long count, ColumnIdentifier alias)
+ {
ColumnSpecification spec = new ColumnSpecification(ksName, cfName, alias == null ? COUNT_COLUMN : alias, LongType.instance);
Metadata newMetadata = new Metadata(Collections.singletonList(spec));
List<List<ByteBuffer>> newRows = Collections.singletonList(Collections.singletonList(ByteBufferUtil.bytes(count)));
@@ -190,10 +194,10 @@ public class ResultSet
return rs;
}
- public ChannelBuffer encode(ResultSet rs)
+ public ChannelBuffer encode(ResultSet rs, int version)
{
CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, rs.metadata.names.size() * rs.rows.size());
- builder.add(Metadata.codec.encode(rs.metadata));
+ builder.add(Metadata.codec.encode(rs.metadata, version));
builder.add(CBUtil.intToCB(rs.rows.size()));
for (List<ByteBuffer> row : rs.rows)
@@ -241,6 +245,11 @@ public class ResultSet
return true;
}
+ public void setHasMorePages()
+ {
+ flags.add(Flag.HAS_MORE_PAGES);
+ }
+
@Override
public String toString()
{
@@ -251,6 +260,8 @@ public class ResultSet
sb.append("(").append(name.ksName).append(", ").append(name.cfName).append(")");
sb.append(", ").append(name.type).append("]");
}
+ if (flags.contains(Flag.HAS_MORE_PAGES))
+ sb.append(" (to be continued)");
return sb.toString();
}
@@ -286,7 +297,7 @@ public class ResultSet
return new Metadata(flags, names);
}
- public ChannelBuffer encode(Metadata m)
+ public ChannelBuffer encode(Metadata m, int version)
{
boolean globalTablesSpec = m.flags.contains(Flag.GLOBAL_TABLES_SPEC);
@@ -294,6 +305,9 @@ public class ResultSet
CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(1 + m.names.size(), stringCount, 0);
ChannelBuffer header = ChannelBuffers.buffer(8);
+
+ assert version > 1 || !m.flags.contains(Flag.HAS_MORE_PAGES);
+
header.writeInt(Flag.serialize(m.flags));
header.writeInt(m.names.size());
builder.add(header);
@@ -322,13 +336,14 @@ public class ResultSet
public static enum Flag
{
// The order of that enum matters!!
- GLOBAL_TABLES_SPEC;
+ GLOBAL_TABLES_SPEC,
+ HAS_MORE_PAGES;
public static EnumSet<Flag> deserialize(int flags)
{
EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
Flag[] values = Flag.values();
- for (int n = 0; n < 32; n++)
+ for (int n = 0; n < values.length; n++)
{
if ((flags & (1 << n)) != 0)
set.add(values[n]);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 64468af..97d7be5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -40,7 +40,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
return 0;
}
- public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables)
+ public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
throws RequestExecutionException, RequestValidationException
{
return execute(state.getClientState());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index af1bd17..5e317aa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -41,7 +41,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
return 0;
}
- public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables)
+ public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize)
throws RequestValidationException, RequestExecutionException
{
return execute(state.getClientState());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 1436811..6fbab72 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -133,7 +133,7 @@ public class BatchStatement implements CQLStatement
}
}
- public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+ public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
{
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 62bd976..85728bc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -332,7 +332,7 @@ public abstract class ModificationStatement implements CQLStatement
return ifNotExists || (columnConditions != null && !columnConditions.isEmpty());
}
- public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+ public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int pageSize)
throws RequestExecutionException, RequestValidationException
{
if (cl == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 4d40e99..1c5f051 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -68,7 +68,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
public void validate(ClientState state) throws RequestValidationException
{}
- public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestValidationException
+ public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestValidationException
{
announceMigration();
String tableName = cfName == null || columnFamily() == null ? "" : columnFamily();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2be85c9..fac9c27 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -39,6 +39,9 @@ import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.RangeSliceVerbHandler;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.Pageable;
+import org.apache.cassandra.service.pager.QueryPager;
+import org.apache.cassandra.service.pager.QueryPagers;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
@@ -54,6 +57,8 @@ import org.apache.cassandra.utils.Pair;
*/
public class SelectStatement implements CQLStatement
{
+ private static final int DEFAULT_COUNT_PAGE_SIZE = 10000;
+
private final int boundTerms;
public final CFDefinition cfDef;
public final Parameters parameters;
@@ -130,23 +135,78 @@ public class SelectStatement implements CQLStatement
// Nothing to do, all validation has been done by RawStatement.prepare()
}
- public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+ public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws RequestExecutionException, RequestValidationException
{
if (cl == null)
throw new InvalidRequestException("Invalid empty consistency level");
cl.validateForRead(keyspace());
+
int limit = getLimit(variables);
long now = System.currentTimeMillis();
- List<Row> rows = isKeyRange || usesSecondaryIndexing
- ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit, now), cl)
- : StorageProxy.read(getSliceCommands(variables, limit, now), cl);
+ Pageable command = isKeyRange || usesSecondaryIndexing
+ ? getRangeCommand(variables, limit, now)
+ : new Pageable.ReadCommands(getSliceCommands(variables, limit, now));
+
+ // A count query will never be paged for the user, but we always page it internally to avoid OOM.
+ // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
+ if (parameters.isCount && pageSize < 0)
+ pageSize = DEFAULT_COUNT_PAGE_SIZE;
+
+ if (pageSize < 0 || !QueryPagers.mayNeedPaging(command, pageSize))
+ {
+ return execute(command, cl, variables, limit, now);
+ }
+ else
+ {
+ QueryPager pager = QueryPagers.pager(command, cl);
+ return parameters.isCount
+ ? pageCountQuery(pager, variables, pageSize)
+ : setupPaging(pager, state, variables, limit, pageSize);
+ }
+ }
+
+ private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException, RequestExecutionException
+ {
+ List<Row> rows = command instanceof Pageable.ReadCommands
+ ? StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
+ : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
return processResults(rows, variables, limit, now);
}
- private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
+ // TODO: we could probably refactor processResults so it doesn't needs the variables, so we don't have to keep around. But that can wait.
+ private ResultMessage.Rows setupPaging(QueryPager pager, QueryState state, List<ByteBuffer> variables, int limit, int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ List<Row> page = pager.fetchPage(pageSize);
+
+ ResultMessage.Rows msg = processResults(page, variables, limit, pager.timestamp());
+
+ // Don't bother setting up the pager if we actually don't need to.
+ if (pager.isExhausted())
+ return msg;
+
+ state.attachPager(pager, this, variables);
+ msg.result.metadata.setHasMorePages();
+ return msg;
+ }
+
+ private ResultMessage.Rows pageCountQuery(QueryPager pager, List<ByteBuffer> variables, int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ int count = 0;
+ while (!pager.isExhausted())
+ {
+ int maxLimit = pager.maxRemaining();
+ ResultSet rset = process(pager.fetchPage(pageSize), variables, maxLimit, pager.timestamp());
+ count += rset.rows.size();
+ }
+
+ ResultSet result = ResultSet.makeCountResult(keyspace(), columnFamily(), count, parameters.countAlias);
+ return new ResultMessage.Rows(result);
+ }
+
+ public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit, long now) throws RequestValidationException
{
// Even for count, we need to process the result as it'll group some column together in sparse column families
ResultSet rset = process(rows, variables, limit, now);
@@ -169,7 +229,7 @@ public class SelectStatement implements CQLStatement
int limit = getLimit(variables);
long now = System.currentTimeMillis();
List<Row> rows = isKeyRange || usesSecondaryIndexing
- ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit, now))
+ ? getRangeCommand(variables, limit, now).executeLocally()
: readLocally(keyspace(), getSliceCommands(variables, limit, now));
return processResults(rows, variables, limit, now);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 16445f5..a10415a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -54,7 +54,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
}
- public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException, TruncateException
+ public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException, TruncateException
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index 0db80bf..4806314 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -51,7 +51,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
{
}
- public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables) throws InvalidRequestException
+ public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize) throws InvalidRequestException
{
state.getClientState().setKeyspace(keyspace);
return new ResultMessage.SetKeyspace(keyspace);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
new file mode 100644
index 0000000..1258344
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.thrift.IndexExpression;
+
+public abstract class AbstractRangeCommand implements IReadCommand
+{
+ public final String keyspace;
+ public final String columnFamily;
+ public final long timestamp;
+
+ public final AbstractBounds<RowPosition> keyRange;
+ public final IDiskAtomFilter predicate;
+ public final List<IndexExpression> rowFilter;
+
+ public AbstractRangeCommand(String keyspace, String columnFamily, long timestamp, AbstractBounds<RowPosition> keyRange, IDiskAtomFilter predicate, List<IndexExpression> rowFilter)
+ {
+ this.keyspace = keyspace;
+ this.columnFamily = columnFamily;
+ this.timestamp = timestamp;
+ this.keyRange = keyRange;
+ this.predicate = predicate;
+ this.rowFilter = rowFilter;
+ }
+
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
+
+ public abstract MessageOut<? extends AbstractRangeCommand> createMessage();
+ public abstract AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> range);
+ public abstract AbstractRangeCommand withUpdatedLimit(int newLimit);
+
+ public abstract int limit();
+ public abstract boolean countCQL3Rows();
+ public abstract List<Row> executeLocally();
+
+ public long getTimeout()
+ {
+ return DatabaseDescriptor.getRangeRpcTimeout();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 735f627..62a1fdb 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -49,9 +49,11 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -1304,9 +1306,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
{
ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
- OnDiskAtomIterator ci = filter.getMemtableColumnIterator(cached, null);
- filter.collateOnDiskAtom(cf, ci, gcBefore(filter.timestamp));
- return removeDeletedCF(cf, gcBefore(filter.timestamp));
+ OnDiskAtomIterator ci = filter.getColumnFamilyIterator(cached);
+
+ int gcBefore = gcBefore(filter.timestamp);
+ filter.collateOnDiskAtom(cf, ci, gcBefore);
+ return removeDeletedCF(cf, gcBefore);
}
/**
@@ -1466,26 +1470,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
/**
* Iterate over a range of rows and columns from memtables/sstables.
*
- * @param range Either a Bounds, which includes start key, or a Range, which does not.
- * @param columnFilter description of the columns we're interested in for each row
+ * @param range The range of keys and columns within those keys to fetch
*/
- private AbstractScanIterator getSequentialIterator(final AbstractBounds<RowPosition> range,
- IDiskAtomFilter columnFilter,
- long timestamp)
+ private AbstractScanIterator getSequentialIterator(final DataRange range, long now)
{
- assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
-
- final RowPosition startWith = range.left;
- final RowPosition stopAt = range.right;
-
- QueryFilter filter = new QueryFilter(null, name, columnFilter, timestamp);
+ assert !(range.keyRange() instanceof Range) || !((Range)range.keyRange()).isWrapAround() || range.keyRange().right.isMinimum() : range.keyRange();
- final ViewFragment view = markReferenced(range);
- Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.getString(metadata.getKeyValidator()));
+ final ViewFragment view = markReferenced(range.keyRange());
+ Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), range.keyRange().getString(metadata.getKeyValidator()));
try
{
- final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, startWith, stopAt, filter, this);
+ final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables, view.sstables, range, this, now);
// todo this could be pushed into SSTableScanner
return new AbstractScanIterator()
@@ -1499,7 +1495,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Row current = iterator.next();
DecoratedKey key = current.key;
- if (!stopAt.isMinimum() && stopAt.compareTo(key) < 0)
+ if (!range.stopKey().isMinimum() && range.stopKey().compareTo(key) < 0)
return endOfData();
// skipping outside of assigned range
@@ -1527,43 +1523,108 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ @VisibleForTesting
public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
List<IndexExpression> rowFilter,
IDiskAtomFilter columnFilter,
int maxResults)
{
- return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis(), false, false);
+ return getRangeSlice(range, rowFilter, columnFilter, maxResults, System.currentTimeMillis());
}
public List<Row> getRangeSlice(final AbstractBounds<RowPosition> range,
List<IndexExpression> rowFilter,
IDiskAtomFilter columnFilter,
int maxResults,
+ long now)
+ {
+ return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, false, false, now));
+ }
+
+ /**
+ * Allows generic range paging with the slice column filter.
+ * Typically, suppose we have rows A, B, C ... Z having each some columns in [1, 100].
+ * And suppose we want to page throught the query that for all rows returns the columns
+ * within [25, 75]. For that, we need to be able to do a range slice starting at (row r, column c)
+ * and ending at (row Z, column 75), *but* that only return columns in [25, 75].
+ * That is what this method allows. The columnRange is the "window" of columns we are interested
+ * in each row, and columnStart (resp. columnEnd) is the start (resp. end) for the first
+ * (resp. end) requested row.
+ */
+ public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> keyRange,
+ SliceQueryFilter columnRange,
+ ByteBuffer columnStart,
+ ByteBuffer columnStop,
+ List<IndexExpression> rowFilter,
+ int maxResults,
+ long now)
+ {
+ DataRange dataRange = new DataRange.Paging(keyRange, columnRange, columnStart, columnStop, metadata.comparator);
+ return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, true, now);
+ }
+
+ public List<Row> getRangeSlice(AbstractBounds<RowPosition> range,
+ List<IndexExpression> rowFilter,
+ IDiskAtomFilter columnFilter,
+ int maxResults,
long now,
boolean countCQL3Rows,
boolean isPaging)
{
- return filter(getSequentialIterator(range, columnFilter, now),
- ExtendedFilter.create(this, rowFilter, columnFilter, maxResults, now, countCQL3Rows, isPaging));
+ return getRangeSlice(makeExtendedFilter(range, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging, now));
}
+ public ExtendedFilter makeExtendedFilter(AbstractBounds<RowPosition> range,
+ IDiskAtomFilter columnFilter,
+ List<IndexExpression> rowFilter,
+ int maxResults,
+ boolean countCQL3Rows,
+ boolean isPaging,
+ long timestamp)
+ {
+ DataRange dataRange;
+ if (isPaging)
+ {
+ assert columnFilter instanceof SliceQueryFilter;
+ SliceQueryFilter sfilter = (SliceQueryFilter)columnFilter;
+ assert sfilter.slices.length == 1;
+ SliceQueryFilter newFilter = new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, sfilter.isReversed(), sfilter.count);
+ dataRange = new DataRange.Paging(range, newFilter, sfilter.start(), sfilter.finish(), metadata.comparator);
+ }
+ else
+ {
+ dataRange = new DataRange(range, columnFilter);
+ }
+ return ExtendedFilter.create(this, dataRange, rowFilter, maxResults, countCQL3Rows, timestamp);
+ }
+
+ public List<Row> getRangeSlice(ExtendedFilter filter)
+ {
+ return filter(getSequentialIterator(filter.dataRange, filter.timestamp), filter);
+ }
+
+ @VisibleForTesting
public List<Row> search(AbstractBounds<RowPosition> range,
List<IndexExpression> clause,
- IDiskAtomFilter columnFilter,
+ IDiskAtomFilter dataFilter,
int maxResults)
{
- return search(range, clause, columnFilter, maxResults, System.currentTimeMillis(), false);
+ return search(range, clause, dataFilter, maxResults, System.currentTimeMillis());
}
public List<Row> search(AbstractBounds<RowPosition> range,
List<IndexExpression> clause,
- IDiskAtomFilter columnFilter,
+ IDiskAtomFilter dataFilter,
int maxResults,
- long now,
- boolean countCQL3Rows)
+ long now)
+ {
+ return search(makeExtendedFilter(range, dataFilter, clause, maxResults, false, false, now));
+ }
+
+ public List<Row> search(ExtendedFilter filter)
{
- Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator()));
- return indexManager.search(range, clause, columnFilter, maxResults, now, countCQL3Rows);
+ Tracing.trace("Executing indexed scan for {}", filter.dataRange.keyRange().getString(metadata.getKeyValidator()));
+ return indexManager.search(filter);
}
public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter)
@@ -1584,7 +1645,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (rowIterator.needsFiltering())
{
- IDiskAtomFilter extraFilter = filter.getExtraFilter(data);
+ IDiskAtomFilter extraFilter = filter.getExtraFilter(rawRow.key, data);
if (extraFilter != null)
{
ColumnFamily cf = filter.cfs.getColumnFamily(new QueryFilter(rawRow.key, name, extraFilter, filter.timestamp));
@@ -1594,12 +1655,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
removeDroppedColumns(data);
- if (!filter.isSatisfiedBy(rawRow.key.key, data, null))
+ if (!filter.isSatisfiedBy(rawRow.key, data, null))
continue;
logger.trace("{} satisfies all filter expressions", data);
// cut the resultset back to what was requested, if necessary
- data = filter.prune(data);
+ data = filter.prune(rawRow.key, data);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
new file mode 100644
index 0000000..d764d60
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.*;
+
+/**
+ * Groups key range and column filter for range queries.
+ *
+ * The main "trick" of this class is that the column filter can only
+ * be obtained by providing the row key on which the column filter will
+ * be applied (which we always know before actually querying the columns).
+ *
+ * This allows the paging DataRange to return a filter for most rows but a
+ * potentially different ones for the starting and stopping key. Could
+ * allow more fancy stuff in the future too, like column filters that
+ * depend on the actual key value :)
+ */
+public class DataRange
+{
+ private final AbstractBounds<RowPosition> keyRange;
+ protected IDiskAtomFilter columnFilter;
+ protected final boolean selectFullRow;
+
+ public DataRange(AbstractBounds<RowPosition> range, IDiskAtomFilter columnFilter)
+ {
+ assert !(range instanceof Range) || !((Range)range).isWrapAround() || range.right.isMinimum() : range;
+
+ this.keyRange = range;
+ this.columnFilter = columnFilter;
+ this.selectFullRow = columnFilter instanceof SliceQueryFilter
+ ? isFullRowSlice((SliceQueryFilter)columnFilter)
+ : false;
+ }
+
+ public static boolean isFullRowSlice(SliceQueryFilter filter)
+ {
+ return filter.slices.length == 1
+ && filter.start().remaining() == 0
+ && filter.finish().remaining() == 0
+ && filter.count == Integer.MAX_VALUE;
+ }
+
+ public static DataRange allData(IPartitioner partitioner)
+ {
+ return forKeyRange(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken()));
+ }
+
+ public static DataRange forKeyRange(Range<Token> keyRange)
+ {
+ return new DataRange(keyRange.toRowBounds(), new IdentityQueryFilter());
+ }
+
+ public AbstractBounds<RowPosition> keyRange()
+ {
+ return keyRange;
+ }
+
+ public RowPosition startKey()
+ {
+ return keyRange.left;
+ }
+
+ public RowPosition stopKey()
+ {
+ return keyRange.right;
+ }
+
+ public boolean contains(RowPosition pos)
+ {
+ return keyRange.contains(pos);
+ }
+
+ public int getLiveCount(ColumnFamily data, long now)
+ {
+ return columnFilter instanceof SliceQueryFilter
+ ? ((SliceQueryFilter)columnFilter).lastCounted()
+ : columnFilter.getLiveCount(data, now);
+ }
+
+ public boolean selectsFullRowFor(ByteBuffer rowKey)
+ {
+ return selectFullRow;
+ }
+
+ public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+ {
+ return columnFilter;
+ }
+
+ public void updateColumnsLimit(int count)
+ {
+ columnFilter.updateColumnsLimit(count);
+ }
+
+ public static class Paging extends DataRange
+ {
+ private final SliceQueryFilter sliceFilter;
+ private final Comparator<ByteBuffer> comparator;
+ private final ByteBuffer columnStart;
+ private final ByteBuffer columnFinish;
+
+ private Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, Comparator<ByteBuffer> comparator)
+ {
+ super(range, filter);
+
+ this.sliceFilter = filter;
+ this.comparator = comparator;
+ this.columnStart = columnStart;
+ this.columnFinish = columnFinish;
+ }
+
+ public Paging(AbstractBounds<RowPosition> range, SliceQueryFilter filter, ByteBuffer columnStart, ByteBuffer columnFinish, AbstractType<?> comparator)
+ {
+ this(range, filter, columnStart, columnFinish, filter.isReversed() ? comparator.reverseComparator : comparator);
+ }
+
+ @Override
+ public boolean selectsFullRowFor(ByteBuffer rowKey)
+ {
+ // If we initial filter is not the full filter, don't bother
+ if (!selectFullRow)
+ return false;
+
+ if (!equals(startKey(), rowKey) && !equals(stopKey(), rowKey))
+ return selectFullRow;
+
+ return isFullRowSlice((SliceQueryFilter)columnFilter(rowKey));
+ }
+
+ private boolean equals(RowPosition pos, ByteBuffer rowKey)
+ {
+ return pos instanceof DecoratedKey && ((DecoratedKey)pos).key.equals(rowKey);
+ }
+
+ @Override
+ public IDiskAtomFilter columnFilter(ByteBuffer rowKey)
+ {
+ /*
+ * We have that ugly hack that for slice queries, when we ask for
+ * the live count, we reach into the query filter to get the last
+ * counter number of columns to avoid recounting.
+ * Maybe we should just remove that hack, but in the meantime, we
+ * need to keep a reference the last returned filter.
+ */
+ columnFilter = equals(startKey(), rowKey) || equals(stopKey(), rowKey)
+ ? sliceFilter.withUpdatedSlices(slicesForKey(rowKey))
+ : sliceFilter;
+ return columnFilter;
+ }
+
+ private ColumnSlice[] slicesForKey(ByteBuffer key)
+ {
+ // We don't call that until it's necessary, so assume we have to do some hard work
+ ByteBuffer newStart = equals(startKey(), key) ? columnStart : null;
+ ByteBuffer newFinish = equals(stopKey(), key) ? columnFinish : null;
+
+ List<ColumnSlice> newSlices = new ArrayList<ColumnSlice>(sliceFilter.slices.length); // in the common case, we'll have the same number of slices
+
+ for (ColumnSlice slice : sliceFilter.slices)
+ {
+ if (newStart != null)
+ {
+ if (slice.isBefore(comparator, newStart))
+ continue; // we skip that slice
+
+ if (slice.includes(comparator, newStart))
+ slice = new ColumnSlice(newStart, slice.finish);
+
+ // Whether we've updated the slice or not, we don't have to bother about newStart anymore
+ newStart = null;
+ }
+
+ assert newStart == null;
+ if (newFinish != null && !slice.isBefore(comparator, newFinish))
+ {
+ if (slice.includes(comparator, newFinish))
+ newSlices.add(new ColumnSlice(slice.start, newFinish));
+ // In any case, we're done
+ break;
+ }
+ newSlices.add(slice);
+ }
+
+ return newSlices.toArray(new ColumnSlice[newSlices.size()]);
+ }
+
+ @Override
+ public void updateColumnsLimit(int count)
+ {
+ columnFilter.updateColumnsLimit(count);
+ sliceFilter.updateColumnsLimit(count);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index e89c769..3a30701 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -480,7 +480,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
RowPosition minPos = p.getMinimumToken().minKeyBound();
Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
- List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE);
+ List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
for (Row row : rows)
{
UUID hostId = UUIDGen.getUUID(row.key.key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index ad6258a..c31c882 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
-import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.StageManager;
@@ -336,52 +335,6 @@ public class Memtable
return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
}
- /**
- * obtain an iterator of columns in this memtable in the specified order starting from a given column.
- */
- public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)
- {
- assert cf != null;
- final Iterator<Column> filteredIter = filter.reversed ? cf.reverseIterator(filter.slices) : cf.iterator(filter.slices);
-
- return new OnDiskAtomIterator()
- {
- public ColumnFamily getColumnFamily()
- {
- return cf;
- }
-
- public DecoratedKey getKey()
- {
- return key;
- }
-
- public boolean hasNext()
- {
- return filteredIter.hasNext();
- }
-
- public OnDiskAtom next()
- {
- return filteredIter.next();
- }
-
- public void close() throws IOException { }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- public static OnDiskAtomIterator getNamesIterator(final DecoratedKey key, final ColumnFamily cf, final NamesQueryFilter filter)
- {
- assert cf != null;
-
- return new ByNameColumnIterator(filter.columns.iterator(), cf, key);
- }
-
public ColumnFamily getColumnFamily(DecoratedKey key)
{
return rows.get(key);
@@ -392,44 +345,6 @@ public class Memtable
return creationTime;
}
- private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
- {
- private final ColumnFamily cf;
- private final DecoratedKey key;
- private final Iterator<ByteBuffer> iter;
-
- public ByNameColumnIterator(Iterator<ByteBuffer> iter, ColumnFamily cf, DecoratedKey key)
- {
- this.iter = iter;
- this.cf = cf;
- this.key = key;
- }
-
- public ColumnFamily getColumnFamily()
- {
- return cf;
- }
-
- public DecoratedKey getKey()
- {
- return key;
- }
-
- protected OnDiskAtom computeNext()
- {
- while (iter.hasNext())
- {
- ByteBuffer current = iter.next();
- Column column = cf.getColumn(current);
- if (column != null)
- return column;
- }
- return endOfData();
- }
-
- public void close() throws IOException { }
- }
-
class FlushRunnable extends DiskAwareRunnable
{
private final CountDownLatch latch;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
new file mode 100644
index 0000000..0e1fa4f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class PagedRangeCommand extends AbstractRangeCommand
+{
+ public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer();
+
+ public final ByteBuffer start;
+ public final ByteBuffer stop;
+ public final int limit;
+
+ public PagedRangeCommand(String keyspace,
+ String columnFamily,
+ long timestamp,
+ AbstractBounds<RowPosition> keyRange,
+ SliceQueryFilter predicate,
+ ByteBuffer start,
+ ByteBuffer stop,
+ List<IndexExpression> rowFilter,
+ int limit)
+ {
+ super(keyspace, columnFamily, timestamp, keyRange, predicate, rowFilter);
+ this.start = start;
+ this.stop = stop;
+ this.limit = limit;
+ }
+
+ public MessageOut<PagedRangeCommand> createMessage()
+ {
+ return new MessageOut<PagedRangeCommand>(MessagingService.Verb.PAGED_RANGE, this, serializer);
+ }
+
+ public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
+ {
+ ByteBuffer newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
+ ByteBuffer newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
+ return new PagedRangeCommand(keyspace,
+ columnFamily,
+ timestamp,
+ subRange,
+ (SliceQueryFilter)predicate,
+ newStart,
+ newStop,
+ rowFilter,
+ limit);
+ }
+
+ public AbstractRangeCommand withUpdatedLimit(int newLimit)
+ {
+ return new PagedRangeCommand(keyspace,
+ columnFamily,
+ timestamp,
+ keyRange,
+ (SliceQueryFilter)predicate,
+ start,
+ stop,
+ rowFilter,
+ newLimit);
+ }
+
+ public int limit()
+ {
+ return limit;
+ }
+
+ public boolean countCQL3Rows()
+ {
+ return true;
+ }
+
+ public List<Row> executeLocally()
+ {
+ ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+ ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, (SliceQueryFilter)predicate, start, stop, rowFilter, limit, timestamp);
+ if (cfs.indexManager.hasIndexFor(rowFilter))
+ return cfs.search(exFilter);
+ else
+ return cfs.getRangeSlice(exFilter);
+ }
+
+ private static class Serializer implements IVersionedSerializer<PagedRangeCommand>
+ {
+ public void serialize(PagedRangeCommand cmd, DataOutput out, int version) throws IOException
+ {
+ out.writeUTF(cmd.keyspace);
+ out.writeUTF(cmd.columnFamily);
+ out.writeLong(cmd.timestamp);
+
+ AbstractBounds.serializer.serialize(cmd.keyRange, out, version);
+
+ // SliceQueryFilter (the count is not used)
+ SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate;
+ SliceQueryFilter.serializer.serialize(filter, out, version);
+
+ // The start and stop of the page
+ ByteBufferUtil.writeWithShortLength(cmd.start, out);
+ ByteBufferUtil.writeWithShortLength(cmd.stop, out);
+
+ out.writeInt(cmd.rowFilter.size());
+ for (IndexExpression expr : cmd.rowFilter)
+ {
+ ByteBufferUtil.writeWithShortLength(expr.column_name, out);
+ out.writeInt(expr.op.getValue());
+ ByteBufferUtil.writeWithLength(expr.value, out);
+ }
+
+ out.writeInt(cmd.limit);
+ }
+
+ public PagedRangeCommand deserialize(DataInput in, int version) throws IOException
+ {
+ String keyspace = in.readUTF();
+ String columnFamily = in.readUTF();
+ long timestamp = in.readLong();
+
+ AbstractBounds<RowPosition> keyRange = AbstractBounds.serializer.deserialize(in, version).toRowBounds();
+
+ SliceQueryFilter predicate = SliceQueryFilter.serializer.deserialize(in, version);
+
+ ByteBuffer start = ByteBufferUtil.readWithShortLength(in);
+ ByteBuffer stop = ByteBufferUtil.readWithShortLength(in);
+
+ int filterCount = in.readInt();
+ List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount);
+ for (int i = 0; i < filterCount; i++)
+ {
+ IndexExpression expr = new IndexExpression(ByteBufferUtil.readWithShortLength(in),
+ IndexOperator.findByValue(in.readInt()),
+ ByteBufferUtil.readWithShortLength(in));
+ rowFilter.add(expr);
+ }
+
+ int limit = in.readInt();
+ return new PagedRangeCommand(keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit);
+ }
+
+ public long serializedSize(PagedRangeCommand cmd, int version)
+ {
+ long size = 0;
+
+ size += TypeSizes.NATIVE.sizeof(cmd.keyspace);
+ size += TypeSizes.NATIVE.sizeof(cmd.columnFamily);
+ size += TypeSizes.NATIVE.sizeof(cmd.timestamp);
+
+ size += AbstractBounds.serializer.serializedSize(cmd.keyRange, version);
+
+ size += SliceQueryFilter.serializer.serializedSize((SliceQueryFilter)cmd.predicate, version);
+
+ size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size());
+ for (IndexExpression expr : cmd.rowFilter)
+ {
+ size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name);
+ size += TypeSizes.NATIVE.sizeof(expr.op.getValue());
+ size += TypeSizes.NATIVE.sizeofWithLength(expr.value);
+ }
+
+ size += TypeSizes.NATIVE.sizeof(cmd.limit);
+ return size;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/RangeSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
index c7e71f3..c037518 100644
--- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java
+++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java
@@ -23,10 +23,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
@@ -34,25 +35,15 @@ import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.IReadCommand;
+import org.apache.cassandra.service.pager.Pageable;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.utils.ByteBufferUtil;
-public class RangeSliceCommand implements IReadCommand
+public class RangeSliceCommand extends AbstractRangeCommand implements Pageable
{
public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
- public final String keyspace;
-
- public final String column_family;
-
- public final long timestamp;
-
- public final IDiskAtomFilter predicate;
- public final List<IndexExpression> row_filter;
-
- public final AbstractBounds<RowPosition> range;
public final int maxResults;
public final boolean countCQL3Rows;
public final boolean isPaging;
@@ -88,12 +79,7 @@ public class RangeSliceCommand implements IReadCommand
boolean countCQL3Rows,
boolean isPaging)
{
- this.keyspace = keyspace;
- this.column_family = column_family;
- this.timestamp = timestamp;
- this.predicate = predicate;
- this.range = range;
- this.row_filter = row_filter;
+ super(keyspace, column_family, timestamp, range, predicate, row_filter);
this.maxResults = maxResults;
this.countCQL3Rows = countCQL3Rows;
this.isPaging = isPaging;
@@ -104,29 +90,66 @@ public class RangeSliceCommand implements IReadCommand
return new MessageOut<RangeSliceCommand>(MessagingService.Verb.RANGE_SLICE, this, serializer);
}
+ public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
+ {
+ return new RangeSliceCommand(keyspace,
+ columnFamily,
+ timestamp,
+ predicate,
+ subRange,
+ rowFilter,
+ maxResults,
+ countCQL3Rows,
+ isPaging);
+ }
+
+ public AbstractRangeCommand withUpdatedLimit(int newLimit)
+ {
+ return new RangeSliceCommand(keyspace,
+ columnFamily,
+ timestamp,
+ predicate,
+ keyRange,
+ rowFilter,
+ newLimit,
+ countCQL3Rows,
+ isPaging);
+ }
+
+ public int limit()
+ {
+ return maxResults;
+ }
+
+ public boolean countCQL3Rows()
+ {
+ return countCQL3Rows;
+ }
+
+ public List<Row> executeLocally()
+ {
+ ColumnFamilyStore cfs = Table.open(keyspace).getColumnFamilyStore(columnFamily);
+
+ ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp);
+ if (cfs.indexManager.hasIndexFor(rowFilter))
+ return cfs.search(exFilter);
+ else
+ return cfs.getRangeSlice(exFilter);
+ }
+
@Override
public String toString()
{
return "RangeSliceCommand{" +
"keyspace='" + keyspace + '\'' +
- ", column_family='" + column_family + '\'' +
- ", timestamp='" + timestamp + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ ", timestamp=" + timestamp +
", predicate=" + predicate +
- ", range=" + range +
- ", row_filter =" + row_filter +
+ ", range=" + keyRange +
+ ", rowFilter =" + rowFilter +
", maxResults=" + maxResults +
", countCQL3Rows=" + countCQL3Rows +
- '}';
- }
-
- public String getKeyspace()
- {
- return keyspace;
- }
-
- public long getTimeout()
- {
- return DatabaseDescriptor.getRangeRpcTimeout();
+ "}";
}
}
@@ -135,7 +158,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
public void serialize(RangeSliceCommand sliceCommand, DataOutput out, int version) throws IOException
{
out.writeUTF(sliceCommand.keyspace);
- out.writeUTF(sliceCommand.column_family);
+ out.writeUTF(sliceCommand.columnFamily);
if (version >= MessagingService.VERSION_20)
out.writeLong(sliceCommand.timestamp);
@@ -147,7 +170,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
// must extract the super column name from the predicate (and
// modify the predicate accordingly)
ByteBuffer sc = null;
- CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.column_family);
+ CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.getKeyspace(), sliceCommand.columnFamily);
if (metadata.cfType == ColumnFamilyType.Super)
{
SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
@@ -162,21 +185,21 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
IDiskAtomFilter.Serializer.instance.serialize(filter, out, version);
- if (sliceCommand.row_filter == null)
+ if (sliceCommand.rowFilter == null)
{
out.writeInt(0);
}
else
{
- out.writeInt(sliceCommand.row_filter.size());
- for (IndexExpression expr : sliceCommand.row_filter)
+ out.writeInt(sliceCommand.rowFilter.size());
+ for (IndexExpression expr : sliceCommand.rowFilter)
{
ByteBufferUtil.writeWithShortLength(expr.column_name, out);
out.writeInt(expr.op.getValue());
ByteBufferUtil.writeWithShortLength(expr.value, out);
}
}
- AbstractBounds.serializer.serialize(sliceCommand.range, out, version);
+ AbstractBounds.serializer.serialize(sliceCommand.keyRange, out, version);
out.writeInt(sliceCommand.maxResults);
out.writeBoolean(sliceCommand.countCQL3Rows);
out.writeBoolean(sliceCommand.isPaging);
@@ -246,7 +269,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
public long serializedSize(RangeSliceCommand rsc, int version)
{
long size = TypeSizes.NATIVE.sizeof(rsc.keyspace);
- size += TypeSizes.NATIVE.sizeof(rsc.column_family);
+ size += TypeSizes.NATIVE.sizeof(rsc.columnFamily);
if (version >= MessagingService.VERSION_20)
size += TypeSizes.NATIVE.sizeof(rsc.timestamp);
@@ -255,7 +278,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
if (version < MessagingService.VERSION_20)
{
ByteBuffer sc = null;
- CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.column_family);
+ CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily);
if (metadata.cfType == ColumnFamilyType.Super)
{
SuperColumns.SCFilter scFilter = SuperColumns.filterToSC((CompositeType)metadata.comparator, filter);
@@ -276,21 +299,21 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm
size += IDiskAtomFilter.Serializer.instance.serializedSize(filter, version);
- if (rsc.row_filter == null)
+ if (rsc.rowFilter == null)
{
size += TypeSizes.NATIVE.sizeof(0);
}
else
{
- size += TypeSizes.NATIVE.sizeof(rsc.row_filter.size());
- for (IndexExpression expr : rsc.row_filter)
+ size += TypeSizes.NATIVE.sizeof(rsc.rowFilter.size());
+ for (IndexExpression expr : rsc.rowFilter)
{
size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column_name);
size += TypeSizes.NATIVE.sizeof(expr.op.getValue());
size += TypeSizes.NATIVE.sizeofWithLength(expr.value);
}
}
- size += AbstractBounds.serializer.serializedSize(rsc.range, version);
+ size += AbstractBounds.serializer.serializedSize(rsc.keyRange, version);
size += TypeSizes.NATIVE.sizeof(rsc.maxResults);
size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows);
size += TypeSizes.NATIVE.sizeof(rsc.isPaging);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 61a2478..3031da8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -34,8 +34,9 @@ import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.RowDataResolver;
+import org.apache.cassandra.service.pager.Pageable;
-public abstract class ReadCommand implements IReadCommand
+public abstract class ReadCommand implements IReadCommand, Pageable
{
public enum Type
{