You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/06/25 10:32:40 UTC
[2/4] Add auto paging capability to the native protocol
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
new file mode 100644
index 0000000..460bc44
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+abstract class AbstractQueryPager implements QueryPager
+{
+ private final ConsistencyLevel consistencyLevel;
+ private final boolean localQuery;
+
+ protected final CFMetaData cfm;
+ protected final IDiskAtomFilter columnFilter;
+ private final long timestamp;
+
+ private volatile int remaining;
+ private volatile boolean exhausted;
+ private volatile boolean lastWasRecorded;
+
+ protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
+ int toFetch,
+ boolean localQuery,
+ String keyspace,
+ String columnFamily,
+ IDiskAtomFilter columnFilter,
+ long timestamp)
+ {
+ this.consistencyLevel = consistencyLevel;
+ this.localQuery = localQuery;
+
+ this.cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+ this.columnFilter = columnFilter;
+ this.timestamp = timestamp;
+
+ this.remaining = toFetch;
+ }
+
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ if (isExhausted())
+ return Collections.emptyList();
+
+ int currentPageSize = nextPageSize(pageSize);
+ List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
+
+ if (rows.isEmpty())
+ {
+ exhausted = true;
+ return Collections.emptyList();
+ }
+
+ int liveCount = getPageLiveCount(rows);
+ remaining -= liveCount;
+
+ // If we've got less than requested, there is no more query to do (but
+ // we still need to return the current page)
+ if (liveCount < currentPageSize)
+ exhausted = true;
+
+ // If it's not the first query and the first column is the last one returned (likely
+ // but not certain since paging can race with deletes/expiration), then remove the
+ // first column.
+ if (containsPreviousLast(rows.get(0)))
+ {
+ rows = discardFirst(rows);
+ remaining++;
+ }
+ // Otherwise, if 'lastWasRecorded', we queried for one more than the page size,
+ // so if the page was is full, trim the last entry
+ else if (lastWasRecorded && !exhausted)
+ {
+ // We've asked for one more than necessary
+ rows = discardLast(rows);
+ remaining++;
+ }
+
+ if (!isExhausted())
+ lastWasRecorded = recordLast(rows.get(rows.size() - 1));
+
+ return rows;
+ }
+
+ private List<Row> filterEmpty(List<Row> result)
+ {
+ boolean doCopy = false;
+ for (Row row : result)
+ {
+ if (row.cf == null || row.cf.getColumnCount() == 0)
+ {
+ List<Row> newResult = new ArrayList<Row>(result.size() - 1);
+ for (Row row2 : result)
+ {
+ if (row.cf == null || row.cf.getColumnCount() == 0)
+ continue;
+
+ newResult.add(row2);
+ }
+ return newResult;
+ }
+ }
+ return result;
+ }
+
+ public boolean isExhausted()
+ {
+ return exhausted || remaining == 0;
+ }
+
+ public int maxRemaining()
+ {
+ return remaining;
+ }
+
+ public long timestamp()
+ {
+ return timestamp;
+ }
+
+ private int nextPageSize(int pageSize)
+ {
+ return Math.min(remaining, pageSize) + (lastWasRecorded ? 1 : 0);
+ }
+
+ public ColumnCounter columnCounter()
+ {
+ return columnFilter.columnCounter(cfm.comparator, timestamp);
+ }
+
+ protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
+ protected abstract boolean containsPreviousLast(Row first);
+ protected abstract boolean recordLast(Row last);
+
+ private List<Row> discardFirst(List<Row> rows)
+ {
+ Row first = rows.get(0);
+ ColumnFamily newCf = discardFirst(first.cf);
+
+ int count = newCf.getColumnCount();
+ List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+ if (count != 0)
+ newRows.add(new Row(first.key, newCf));
+ newRows.addAll(rows.subList(1, rows.size()));
+
+ return newRows;
+ }
+
+ private List<Row> discardLast(List<Row> rows)
+ {
+ Row last = rows.get(rows.size() - 1);
+ ColumnFamily newCf = discardLast(last.cf);
+
+ int count = newCf.getColumnCount();
+ List<Row> newRows = new ArrayList<Row>(count == 0 ? rows.size() - 1 : rows.size());
+ newRows.addAll(rows.subList(0, rows.size() - 1));
+ if (count != 0)
+ newRows.add(new Row(last.key, newCf));
+
+ return newRows;
+ }
+
+ private int getPageLiveCount(List<Row> page)
+ {
+ int count = 0;
+ for (Row row : page)
+ count += columnCounter().countAll(row.cf).live();
+ return count;
+ }
+
+ private ColumnFamily discardFirst(ColumnFamily cf)
+ {
+ ColumnFamily copy = cf.cloneMeShallow();
+ ColumnCounter counter = columnCounter();
+
+ Iterator<Column> iter = cf.iterator();
+ // Discard the first live
+ while (iter.hasNext())
+ {
+ Column c = iter.next();
+ counter.count(c, cf);
+ if (counter.live() > 1)
+ {
+ copy.addColumn(c);
+ while (iter.hasNext())
+ copy.addColumn(iter.next());
+ }
+ }
+ return copy;
+ }
+
+ private ColumnFamily discardLast(ColumnFamily cf)
+ {
+ ColumnFamily copy = cf.cloneMeShallow();
+ // Redoing the counting like that is not extremely efficient, but
+ // discardLast is only called in case of a race between paging and
+ // a deletion, which is pretty unlikely, so probably not a big deal
+ int liveCount = columnCounter().countAll(cf).live();
+
+ ColumnCounter counter = columnCounter();
+ // Discard the first live
+ for (Column c : cf)
+ {
+ counter.count(c, cf);
+ if (counter.live() < liveCount)
+ copy.addColumn(c);
+ }
+ return copy;
+ }
+
+ protected static ByteBuffer firstName(ColumnFamily cf)
+ {
+ return cf.iterator().next().name();
+ }
+
+ protected static ByteBuffer lastName(ColumnFamily cf)
+ {
+ return cf.getReverseSortedColumns().iterator().next().name();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
new file mode 100644
index 0000000..ef82535
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
+/**
+ * Pager over a list of ReadCommand.
+ *
+ * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before
+ * returning results from the next one, but if the result returned by each command is small (compared to pageSize),
+ * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize
+ * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which
+ * defeats the purpose of paging.
+ *
+ * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
+ * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
+ * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't
+ * blow out memory.
+ */
+class MultiPartitionPager implements QueryPager
+{
+ private final SinglePartitionPager[] pagers;
+ private final long timestamp;
+
+ private volatile int current;
+
+ MultiPartitionPager(List<ReadCommand> commands, final ConsistencyLevel consistencyLevel, final boolean localQuery)
+ {
+ this.pagers = new SinglePartitionPager[commands.size()];
+
+ long tstamp = -1;
+ for (int i = 0; i < commands.size(); i++)
+ {
+ ReadCommand command = commands.get(i);
+ if (tstamp == -1)
+ tstamp = command.timestamp;
+ else if (tstamp != command.timestamp)
+ throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
+
+ pagers[i] = command instanceof SliceFromReadCommand
+ ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery)
+ : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery);
+ }
+ timestamp = tstamp;
+ }
+
+ public boolean isExhausted()
+ {
+ while (current < pagers.length)
+ {
+ if (!pagers[current].isExhausted())
+ return false;
+
+ current++;
+ }
+ return true;
+ }
+
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ int remaining = pageSize;
+ List<Row> result = new ArrayList<Row>();
+
+ while (!isExhausted() && remaining > 0)
+ {
+ // Exhausted also sets us on the first non-exhausted pager
+ List<Row> page = pagers[current].fetchPage(remaining);
+ if (page.isEmpty())
+ continue;
+
+ Row row = page.get(0);
+ remaining -= pagers[current].columnCounter().countAll(row.cf).live();
+ result.add(row);
+ }
+
+ return result;
+ }
+
+ public int maxRemaining()
+ {
+ int max = 0;
+ for (int i = current; i < pagers.length; i++)
+ max += pagers[i].maxRemaining();
+ return max;
+ }
+
+ public long timestamp()
+ {
+ return timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
new file mode 100644
index 0000000..82e7376
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceByNamesReadCommand.
+ */
+public class NamesQueryPager implements SinglePartitionPager
+{
+ private final SliceByNamesReadCommand command;
+ private final ConsistencyLevel consistencyLevel;
+ private final boolean localQuery;
+
+ private volatile boolean queried;
+
+ /**
+ * For now, we'll only use this in CQL3. In there, as name query can never
+ * yield more than one CQL3 row, there is no need for paging and so this is straight-forward.
+ *
+ * For thrift, we could imagine needing to page, though even then it's very
+ * unlikely unless the pageSize is very small.
+ *
+ * In any case we currently assert in fetchPage if it's a "thrift" query (i.e. a query that
+ * count every cell individually) and the names filter asks for more than pageSize columns.
+ */
+ // Don't use directly, use QueryPagers method instead
+ NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ this.command = command;
+ this.consistencyLevel = consistencyLevel;
+ this.localQuery = localQuery;
+ }
+
+ public ColumnCounter columnCounter()
+ {
+ // We know NamesQueryFilter.columnCounter don't care about his argument
+ return command.filter.columnCounter(null, command.timestamp);
+ }
+
+ public boolean isExhausted()
+ {
+ return queried;
+ }
+
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ {
+ assert command.filter.countCQL3Rows() || command.filter.columns.size() <= pageSize;
+
+ if (isExhausted())
+ return Collections.<Row>emptyList();
+
+ queried = true;
+ return localQuery
+ ? Collections.singletonList(command.getRow(Table.open(command.table)))
+ : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel);
+ }
+
+ public int maxRemaining()
+ {
+ if (queried)
+ return 0;
+
+ return command.filter.countCQL3Rows() ? 1 : command.filter.columns.size();
+ }
+
+ public long timestamp()
+ {
+ return command.timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/Pageable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/Pageable.java b/src/java/org/apache/cassandra/service/pager/Pageable.java
new file mode 100644
index 0000000..3a69bf4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/Pageable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.ReadCommand;
+
+/**
+ * Marker interface for commands that can be paged.
+ */
+public interface Pageable
+{
+ public static class ReadCommands implements Pageable
+ {
+ public final List<ReadCommand> commands;
+
+ public ReadCommands(List<ReadCommand> commands)
+ {
+ this.commands = commands;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
new file mode 100644
index 0000000..a390859
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Perform a query, paging it by page of a given size.
+ *
+ * This is essentially an iterator of pages. Each call to fetchPage() will
+ * return the next page (i.e. the next list of rows) and isExhausted()
+ * indicates whether there is more page to fetch. The pageSize will
+ * either be in term of cells or in term of CQL3 row, depending on the
+ * parameters of the command we page.
+ *
+ * Please note that the pager might page within rows, so there is no guarantee
+ * that successive pages won't return the same row (though with different
+ * columns every time).
+ *
+ * Also, there is no guarantee that fetchPage() won't return an empty list,
+ * even if isExhausted() return false (but it is guaranteed to return an empty
+ * list *if* isExhausted() return true). Indeed, isExhausted() does *not*
+ * trigger a query so in some (failry rare) case we might not know the paging
+ * is done even though it is.
+ */
+public interface QueryPager
+{
+ /**
+ * Fetches the next page.
+ *
+ * @param pageSize the maximum number of elements to return in the next page.
+ * @return the page of result.
+ */
+ public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException;
+
+ /**
+ * Whether or not this pager is exhausted, i.e. whether or not a call to
+ * fetchPage may return more result.
+ *
+ * @return whether the pager is exhausted.
+ */
+ public boolean isExhausted();
+
+ /**
+ * The maximum number of cells/CQL3 row that we may still have to return.
+ * In other words, that's the initial user limit minus what we've already
+ * returned (note that it's not how many we *will* return, just the upper
+ * limit on it).
+ */
+ public int maxRemaining();
+
+ /**
+ * The timestamp used by the last page.
+ */
+ public long timestamp();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
new file mode 100644
index 0000000..9bc3afd
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+
+/**
+ * Static utility methods to create query pagers.
+ */
+public class QueryPagers
+{
+ private QueryPagers() {};
+
+ private static int maxQueried(ReadCommand command)
+ {
+ if (command instanceof SliceByNamesReadCommand)
+ {
+ NamesQueryFilter filter = ((SliceByNamesReadCommand)command).filter;
+ return filter.countCQL3Rows() ? 1 : filter.columns.size();
+ }
+ else
+ {
+ SliceQueryFilter filter = ((SliceFromReadCommand)command).filter;
+ return filter.count;
+ }
+ }
+
+ public static boolean mayNeedPaging(Pageable command, int pageSize)
+ {
+ if (command instanceof Pageable.ReadCommands)
+ {
+ List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+
+ int maxQueried = 0;
+ for (ReadCommand readCmd : commands)
+ maxQueried += maxQueried(readCmd);
+
+ return maxQueried > pageSize;
+ }
+ else if (command instanceof ReadCommand)
+ {
+ return maxQueried((ReadCommand)command) > pageSize;
+ }
+ else
+ {
+ assert command instanceof RangeSliceCommand;
+ // We can never be sure a range slice won't need paging
+ return true;
+ }
+ }
+
+ private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local)
+ {
+ if (command instanceof SliceByNamesReadCommand)
+ return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local);
+ else
+ return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local);
+ }
+
+ private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, boolean local)
+ {
+ if (command instanceof Pageable.ReadCommands)
+ {
+ List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
+ if (commands.size() == 1)
+ return pager(commands.get(0), consistencyLevel, local);
+
+ return new MultiPartitionPager(commands, consistencyLevel, local);
+ }
+ else if (command instanceof ReadCommand)
+ {
+ return pager((ReadCommand)command, consistencyLevel, local);
+ }
+ else
+ {
+ assert command instanceof RangeSliceCommand;
+ RangeSliceCommand rangeCommand = (RangeSliceCommand)command;
+ if (rangeCommand.predicate instanceof NamesQueryFilter)
+ return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local);
+ else
+ return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local);
+ }
+ }
+
+ public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel)
+ {
+ return pager(command, consistencyLevel, false);
+ }
+
+ public static QueryPager localPager(Pageable command)
+ {
+ return pager(command, null, true);
+ }
+
+ /**
+ * Convenience method to (locally) page an internal row.
+ * Used to 2ndary index a wide row without dying.
+ */
+ public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize)
+ {
+ SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter());
+ final SliceQueryPager pager = new SliceQueryPager(command, null, true);
+
+ return new Iterator<ColumnFamily>()
+ {
+ // We don't use AbstractIterator because we don't want hasNext() to do an actual query
+ public boolean hasNext()
+ {
+ return !pager.isExhausted();
+ }
+
+ public ColumnFamily next()
+ {
+ try
+ {
+ List<Row> rows = pager.fetchPage(pageSize);
+ ColumnFamily cf = rows.isEmpty() ? null : rows.get(0).cf;
+ return cf == null ? EmptyColumns.factory.create(cfs.metadata) : cf;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath.
+ */
+ public static int countPaged(String keyspace,
+ String columnFamily,
+ ByteBuffer key,
+ SliceQueryFilter filter,
+ ConsistencyLevel consistencyLevel,
+ final int pageSize,
+ long now) throws RequestValidationException, RequestExecutionException
+ {
+ SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
+ final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, false);
+
+ ColumnCounter counter = filter.columnCounter(Schema.instance.getComparator(keyspace, columnFamily), now);
+ while (!pager.isExhausted())
+ {
+ List<Row> next = pager.fetchPage(pageSize);
+ if (!next.isEmpty())
+ counter.countAll(next.get(0).cf);
+ }
+ return counter.live();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
new file mode 100644
index 0000000..e4d4295
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a name query.
+ *
+ * Note: this only work for NamesQueryFilter that have countCQL3Rows() set,
+ * because this assumes the pageSize is counted in number of internal rows
+ * returned. More precisely, this doesn't do in-row paging so this assumes
+ * that the counter returned by columnCounter() will count 1 for each internal
+ * row.
+ */
+public class RangeNamesQueryPager extends AbstractQueryPager
+{
+ private final RangeSliceCommand command;
+ private volatile RowPosition lastReturnedKey;
+
+ // Don't use directly, use QueryPagers method instead
+ RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+ this.command = command;
+ assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows();
+ }
+
+ protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ throws RequestExecutionException
+ {
+ AbstractRangeCommand pageCmd = command.withUpdatedLimit(pageSize);
+ if (lastReturnedKey != null)
+ pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey));
+
+ return localQuery
+ ? pageCmd.executeLocally()
+ : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+ }
+
+ protected boolean containsPreviousLast(Row first)
+ {
+ // When querying the next page, we create a bound that exclude the lastReturnedKey
+ return false;
+ }
+
+ protected boolean recordLast(Row last)
+ {
+ lastReturnedKey = last.key;
+ // We return false as that means "can that last be in the next query?"
+ return false;
+ }
+
+ private AbstractBounds<RowPosition> makeExcludingKeyBounds(RowPosition lastReturnedKey)
+ {
+ // We return a range that always exclude lastReturnedKey, since we've already
+ // returned it.
+ AbstractBounds<RowPosition> bounds = command.keyRange;
+ if (bounds instanceof Range || bounds instanceof Bounds)
+ {
+ return new Range<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ else
+ {
+ return new ExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
new file mode 100644
index 0000000..578d5c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pages a RangeSliceCommand whose predicate is a slice query.
+ *
+ * Note: this only work for CQL3 queries for now (because thrift queries expect
+ * a different limit on the rows than on the columns, which complicates it).
+ */
+public class RangeSliceQueryPager extends AbstractQueryPager
+{
+ private final RangeSliceCommand command;
+ private volatile RowPosition lastReturnedKey;
+ private volatile ByteBuffer lastReturnedName;
+
+ // Don't use directly, use QueryPagers method instead
+ RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
+ this.command = command;
+ assert columnFilter instanceof SliceQueryFilter;
+ }
+
+ protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ throws RequestExecutionException
+ {
+ SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
+ AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey);
+ ByteBuffer start = lastReturnedName == null ? sf.start() : lastReturnedName;
+ PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
+ command.columnFamily,
+ command.timestamp,
+ keyRange,
+ sf,
+ start,
+ sf.finish(),
+ command.rowFilter,
+ pageSize);
+
+ return localQuery
+ ? pageCmd.executeLocally()
+ : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
+ }
+
+ protected boolean containsPreviousLast(Row first)
+ {
+ return lastReturnedKey != null
+ && lastReturnedKey.equals(first.key)
+ && lastReturnedName.equals(firstName(first.cf));
+ }
+
+ protected boolean recordLast(Row last)
+ {
+ lastReturnedKey = last.key;
+ lastReturnedName = lastName(last.cf);
+ return true;
+ }
+
+ private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
+ {
+ // We always include lastReturnedKey since we may still be paging within a row,
+ // and PagedRangeCommand will move over if we're not anyway
+ AbstractBounds<RowPosition> bounds = command.keyRange;
+ if (bounds instanceof Range || bounds instanceof Bounds)
+ {
+ return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ else
+ {
+ return new IncludingExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
new file mode 100644
index 0000000..693a20e
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import org.apache.cassandra.db.filter.ColumnCounter;
+
+/**
+ * Common interface to single partition queries (by slice and by name).
+ *
+ * For use by MultiPartitionPager.
+ */
+public interface SinglePartitionPager extends QueryPager
+{
+ public ColumnCounter columnCounter();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
new file mode 100644
index 0000000..58ef3c4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Pager over a SliceFromReadCommand.
+ */
+public class SliceQueryPager extends AbstractQueryPager implements SinglePartitionPager
+{
+ private final SliceFromReadCommand command;
+
+ private volatile ByteBuffer lastReturned;
+
+ // Don't use directly, use QueryPagers method instead
+ SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ {
+ super(consistencyLevel, command.filter.count, localQuery, command.table, command.cfName, command.filter, command.timestamp);
+ this.command = command;
+ }
+
+ protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ throws RequestValidationException, RequestExecutionException
+ {
+ SliceQueryFilter filter = command.filter.withUpdatedCount(pageSize);
+ if (lastReturned != null)
+ filter = filter.withUpdatedStart(lastReturned, cfm.comparator);
+
+ ReadCommand pageCmd = command.withUpdatedFilter(filter);
+ return localQuery
+ ? Collections.singletonList(pageCmd.getRow(Table.open(command.table)))
+ : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel);
+ }
+
+ protected boolean containsPreviousLast(Row first)
+ {
+ return lastReturned != null && lastReturned.equals(firstName(first.cf));
+ }
+
+ protected boolean recordLast(Row last)
+ {
+ lastReturned = lastName(last.cf);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 55688a8..a15927e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.cql.CQLStatement;
import org.apache.cassandra.cql.QueryProcessor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -60,6 +61,7 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.pager.QueryPagers;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -368,31 +370,25 @@ public class CassandraServer implements Cassandra.Iface
}
}
- private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
- List<ByteBuffer> keys,
- ColumnParent column_parent,
- long timestamp,
- SlicePredicate predicate,
- ConsistencyLevel consistency_level)
- throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+ private SliceQueryFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range)
{
- CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
- ThriftValidation.validateColumnParent(metadata, column_parent);
- ThriftValidation.validatePredicate(metadata, column_parent, predicate);
-
- org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
- consistencyLevel.validateForRead(keyspace);
+ SliceQueryFilter filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
+ if (metadata.isSuper())
+ filter = SuperColumns.fromSCSliceFilter((CompositeType)metadata.comparator, parent.bufferForSuper_column(), filter);
+ return filter;
+ }
- List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+ private IDiskAtomFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SlicePredicate predicate)
+ {
IDiskAtomFilter filter;
if (predicate.column_names != null)
{
if (metadata.isSuper())
{
CompositeType type = (CompositeType)metadata.comparator;
- SortedSet s = new TreeSet<ByteBuffer>(column_parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
+ SortedSet s = new TreeSet<ByteBuffer>(parent.isSetSuper_column() ? type.types.get(1) : type.types.get(0));
s.addAll(predicate.column_names);
- filter = SuperColumns.fromSCNamesFilter(type, column_parent.bufferForSuper_column(), new NamesQueryFilter(s));
+ filter = SuperColumns.fromSCNamesFilter(type, parent.bufferForSuper_column(), new NamesQueryFilter(s));
}
else
{
@@ -403,11 +399,28 @@ public class CassandraServer implements Cassandra.Iface
}
else
{
- SliceRange range = predicate.slice_range;
- filter = new SliceQueryFilter(range.start, range.finish, range.reversed, range.count);
- if (metadata.isSuper())
- filter = SuperColumns.fromSCFilter((CompositeType)metadata.comparator, column_parent.bufferForSuper_column(), filter);
+ filter = toInternalFilter(metadata, parent, predicate.slice_range);
}
+ return filter;
+ }
+
+ private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace,
+ List<ByteBuffer> keys,
+ ColumnParent column_parent,
+ long timestamp,
+ SlicePredicate predicate,
+ ConsistencyLevel consistency_level)
+ throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
+ {
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+ ThriftValidation.validateColumnParent(metadata, column_parent);
+ ThriftValidation.validatePredicate(metadata, column_parent, predicate);
+
+ org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
+ consistencyLevel.validateForRead(keyspace);
+
+ List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+ IDiskAtomFilter filter = toInternalFilter(metadata, column_parent, predicate);
for (ByteBuffer key: keys)
{
@@ -530,46 +543,22 @@ public class CassandraServer implements Cassandra.Iface
pageSize = COUNT_PAGE_SIZE;
}
- int totalCount = 0;
- List<ColumnOrSuperColumn> columns;
-
- if (predicate.slice_range == null)
- {
- predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- Integer.MAX_VALUE);
- }
-
- final int requestedCount = predicate.slice_range.count;
- int remaining = requestedCount;
- int pages = 0;
- while (true)
- {
- predicate.slice_range.count = Math.min(pageSize, Math.max(2, remaining)); // fetch at least two columns
- columns = getSliceInternal(keyspace, key, column_parent, timestamp, predicate, consistency_level);
- if (columns.isEmpty())
- break;
-
- ByteBuffer firstName = getName(columns.get(0));
- int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size() : columns.size() - 1;
-
- totalCount += newColumns;
- // if we over-counted, just return original limit
- if (totalCount > requestedCount)
- return requestedCount;
- remaining -= newColumns;
- pages++;
- // We're done if either:
- // - We've querying the number of columns requested by the user
- // - last fetched page only contains the column we already fetched
- if (remaining == 0 || ((columns.size() == 1) && (firstName.equals(predicate.slice_range.start))))
- break;
- else
- predicate.slice_range.start = getName(columns.get(columns.size() - 1));
- }
+ SliceRange sliceRange = predicate.slice_range == null
+ ? new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE)
+ : predicate.slice_range;
+ SliceQueryFilter filter = toInternalFilter(cfs.metadata, column_parent, sliceRange);
- return totalCount;
+ return QueryPagers.countPaged(keyspace,
+ column_parent.column_family,
+ key,
+ filter,
+ ThriftConversion.fromThrift(consistency_level),
+ pageSize,
+ timestamp);
+ }
+ catch (RequestExecutionException e)
+ {
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -774,8 +763,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null; // makes javac happy -- it can't tell that rethrow always throws
+ throw ThriftConversion.rethrow(e);
}
finally
{
@@ -1218,22 +1206,16 @@ public class CassandraServer implements Cassandra.Iface
bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
}
+ if (range.row_filter != null && !range.row_filter.isEmpty())
+ throw new InvalidRequestException("Cross-row paging is not supported along with index clauses");
+
List<Row> rows;
long now = System.currentTimeMillis();
schedule(DatabaseDescriptor.getRangeRpcTimeout());
try
{
IDiskAtomFilter filter = ThriftValidation.asIFilter(predicate, metadata, null);
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace,
- column_family,
- now,
- filter,
- bounds,
- range.row_filter,
- range.count,
- true,
- true),
- consistencyLevel);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, now, filter, bounds, null, range.count, true, true), consistencyLevel);
}
finally
{
@@ -1878,8 +1860,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -1913,8 +1894,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -1994,8 +1974,7 @@ public class CassandraServer implements Cassandra.Iface
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
@@ -2034,12 +2013,15 @@ public class CassandraServer implements Cassandra.Iface
itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId));
logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
- return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState.getQueryState(), bindVariables).toThriftResult();
+ return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement,
+ ThriftConversion.fromThrift(cLevel),
+ cState.getQueryState(),
+ bindVariables,
+ -1).toThriftResult();
}
catch (RequestExecutionException e)
{
- ThriftConversion.rethrow(e);
- return null;
+ throw ThriftConversion.rethrow(e);
}
catch (RequestValidationException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index 28725f0..8a9ab59 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -45,7 +45,9 @@ public class ThriftConversion
throw new AssertionError();
}
- public static void rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
+ // We never return, but returning a RuntimeException allows to write "throw rethrow(e)" without java complaining
+ // for methods that have a return value.
+ public static RuntimeException rethrow(RequestExecutionException e) throws UnavailableException, TimedOutException
{
if (e instanceof RequestTimeoutException)
throw toThrift((RequestTimeoutException)e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/CBCodec.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBCodec.java b/src/java/org/apache/cassandra/transport/CBCodec.java
index 2250816..2ba21d5 100644
--- a/src/java/org/apache/cassandra/transport/CBCodec.java
+++ b/src/java/org/apache/cassandra/transport/CBCodec.java
@@ -22,5 +22,5 @@ import org.jboss.netty.buffer.ChannelBuffer;
public interface CBCodec<T>
{
public T decode(ChannelBuffer body, int version);
- public ChannelBuffer encode(T t);
+ public ChannelBuffer encode(T t, int version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java
index 8e2d765..2f3f3bd 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -35,14 +35,7 @@ import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.transport.messages.CredentialsMessage;
-import org.apache.cassandra.transport.messages.ExecuteMessage;
-import org.apache.cassandra.transport.messages.OptionsMessage;
-import org.apache.cassandra.transport.messages.PrepareMessage;
-import org.apache.cassandra.transport.messages.QueryMessage;
-import org.apache.cassandra.transport.messages.RegisterMessage;
-import org.apache.cassandra.transport.messages.AuthResponse;
-import org.apache.cassandra.transport.messages.StartupMessage;
+import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.utils.Hex;
import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
@@ -116,8 +109,37 @@ public class Client extends SimpleClient
}
else if (msgType.equals("QUERY"))
{
- String query = line.substring(6);
- return new QueryMessage(query, ConsistencyLevel.ONE);
+ line = line.substring(6);
+ // Ugly hack to allow setting a page size, but that's playground code anyway
+ String query = line;
+ int pageSize = -1;
+ if (line.matches(".+ !\\d+$"))
+ {
+ int idx = line.lastIndexOf('!');
+ query = line.substring(0, idx-1);
+ try
+ {
+ pageSize = Integer.parseInt(line.substring(idx+1, line.length()));
+ }
+ catch (NumberFormatException e)
+ {
+ return null;
+ }
+ }
+ return new QueryMessage(query, Collections.<ByteBuffer>emptyList(), ConsistencyLevel.ONE, pageSize);
+ }
+ else if (msgType.equals("NEXT"))
+ {
+ line = line.substring(5);
+ try
+ {
+ int pageSize = Integer.parseInt(line);
+ return new NextMessage(pageSize);
+ }
+ catch (NumberFormatException e)
+ {
+ return null;
+ }
}
else if (msgType.equals("PREPARE"))
{
@@ -145,7 +167,7 @@ public class Client extends SimpleClient
}
values.add(bb);
}
- return new ExecuteMessage(id, values, ConsistencyLevel.ONE);
+ return new ExecuteMessage(id, values, ConsistencyLevel.ONE, -1);
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 54da6a2..eca3697 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -73,7 +73,8 @@ public abstract class Message
BATCH (13, Direction.REQUEST, BatchMessage.codec),
AUTH_CHALLENGE (14, Direction.RESPONSE, AuthChallenge.codec),
AUTH_RESPONSE (15, Direction.REQUEST, AuthResponse.codec),
- AUTH_SUCCESS (16, Direction.RESPONSE, AuthSuccess.codec);
+ AUTH_SUCCESS (16, Direction.RESPONSE, AuthSuccess.codec),
+ NEXT (17, Direction.REQUEST, NextMessage.codec);
public final int opcode;
public final Direction direction;
@@ -298,11 +299,11 @@ public abstract class Message
{
assert request.connection() instanceof ServerConnection;
ServerConnection connection = (ServerConnection)request.connection();
- connection.validateNewMessage(request.type, request.getVersion());
+ QueryState qstate = connection.validateNewMessage(request.type, request.getVersion(), request.getStreamId());
logger.debug("Received: {}, v={}", request, request.getVersion());
- Response response = request.execute(connection.getQueryState(request.getStreamId()));
+ Response response = request.execute(qstate);
response.setStreamId(request.getStreamId());
response.setVersion(request.getVersion());
response.attach(connection);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java
index ec99440..538258d 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -53,7 +53,7 @@ public class ServerConnection extends Connection
this.state = State.UNINITIALIZED;
}
- public QueryState getQueryState(int streamId)
+ private QueryState getQueryState(int streamId)
{
QueryState qState = queryStates.get(streamId);
if (qState == null)
@@ -66,7 +66,7 @@ public class ServerConnection extends Connection
return qState;
}
- public void validateNewMessage(Message.Type type, int version)
+ public QueryState validateNewMessage(Message.Type type, int version, int streamId)
{
switch (state)
{
@@ -86,6 +86,20 @@ public class ServerConnection extends Connection
default:
throw new AssertionError();
}
+
+ QueryState qstate = getQueryState(streamId);
+ if (qstate.hasPager())
+ {
+ if (type != Message.Type.NEXT)
+ qstate.dropPager();
+ }
+ else
+ {
+ if (type == Message.Type.NEXT)
+ throw new ProtocolException("Unexpected NEXT message, paging is not set (or is done)");
+ }
+
+ return qstate;
}
public void applyStateTransition(Message.Type requestType, Message.Type responseType)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 993a490..3a9c286 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -156,7 +156,7 @@ public class SimpleClient
public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel)
{
- Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel));
+ Message.Response msg = execute(new QueryMessage(query, values, consistencyLevel, -1));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
@@ -170,7 +170,7 @@ public class SimpleClient
public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
{
- Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency));
+ Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency, -1));
assert msg instanceof ResultMessage;
return (ResultMessage)msg;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index bc90dc5..63df7d0 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -40,7 +40,7 @@ public class AuthChallenge extends Message.Response
}
@Override
- public ChannelBuffer encode(AuthChallenge challenge)
+ public ChannelBuffer encode(AuthChallenge challenge, int version)
{
return CBUtil.valueToCB(challenge.token);
}
@@ -57,7 +57,7 @@ public class AuthChallenge extends Message.Response
@Override
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public byte[] getToken()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 1f8ed9f..8a33a72 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -52,7 +52,7 @@ public class AuthResponse extends Message.Request
}
@Override
- public ChannelBuffer encode(AuthResponse response)
+ public ChannelBuffer encode(AuthResponse response, int version)
{
return CBUtil.valueToCB(response.token);
}
@@ -69,7 +69,7 @@ public class AuthResponse extends Message.Request
@Override
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index ba520bc..13c750a 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -43,7 +43,7 @@ public class AuthSuccess extends Message.Response
}
@Override
- public ChannelBuffer encode(AuthSuccess success)
+ public ChannelBuffer encode(AuthSuccess success, int version)
{
return CBUtil.valueToCB(success.token);
}
@@ -60,7 +60,7 @@ public class AuthSuccess extends Message.Response
@Override
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public byte[] getToken()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index d781f68..292f748 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -35,7 +35,7 @@ public class AuthenticateMessage extends Message.Response
return new AuthenticateMessage(authenticator);
}
- public ChannelBuffer encode(AuthenticateMessage msg)
+ public ChannelBuffer encode(AuthenticateMessage msg, int version)
{
return CBUtil.stringToCB(msg.authenticator);
}
@@ -51,7 +51,7 @@ public class AuthenticateMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 299d8b8..9fb4482 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -73,7 +73,7 @@ public class BatchMessage extends Message.Request
return new BatchMessage(toType(type), queryOrIds, variables, consistency);
}
- public ChannelBuffer encode(BatchMessage msg)
+ public ChannelBuffer encode(BatchMessage msg, int version)
{
// We have:
// - type
@@ -160,7 +160,7 @@ public class BatchMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index ceff5ba..207907a 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -55,7 +55,7 @@ public class CredentialsMessage extends Message.Request
return msg;
}
- public ChannelBuffer encode(CredentialsMessage msg)
+ public ChannelBuffer encode(CredentialsMessage msg, int version)
{
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
@@ -78,7 +78,7 @@ public class CredentialsMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 3243bce..3675f08 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -123,7 +123,7 @@ public class ErrorMessage extends Message.Response
return new ErrorMessage(te);
}
- public ChannelBuffer encode(ErrorMessage msg)
+ public ChannelBuffer encode(ErrorMessage msg, int version)
{
ChannelBuffer ccb = CBUtil.intToCB(msg.error.code().value);
ChannelBuffer mcb = CBUtil.stringToCB(msg.error.getMessage());
@@ -213,7 +213,7 @@ public class ErrorMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index 7d67de9..f7a93ae 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -31,7 +31,7 @@ public class EventMessage extends Message.Response
return new EventMessage(Event.deserialize(body));
}
- public ChannelBuffer encode(EventMessage msg)
+ public ChannelBuffer encode(EventMessage msg, int version)
{
return msg.event.serialize();
}
@@ -48,7 +48,7 @@ public class EventMessage extends Message.Response
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 8e2b761..7c35e42 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import com.google.common.collect.ImmutableMap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.apache.cassandra.cql3.CQLStatement;
@@ -49,10 +50,11 @@ public class ExecuteMessage extends Message.Request
values.add(CBUtil.readValue(body));
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
- return new ExecuteMessage(id, values, consistency);
+ int resultPageSize = body.readInt();
+ return new ExecuteMessage(id, values, consistency, resultPageSize);
}
- public ChannelBuffer encode(ExecuteMessage msg)
+ public ChannelBuffer encode(ExecuteMessage msg, int version)
{
// We have:
// - statementId
@@ -60,7 +62,7 @@ public class ExecuteMessage extends Message.Request
// - The values
// - options
int vs = msg.values.size();
- CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(3, 0, vs);
+ CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(4, 0, vs);
builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
builder.add(CBUtil.shortToCB(vs));
@@ -69,6 +71,7 @@ public class ExecuteMessage extends Message.Request
builder.addValue(value);
builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
+ builder.add(CBUtil.intToCB(msg.resultPageSize));
return builder.build();
}
};
@@ -76,23 +79,25 @@ public class ExecuteMessage extends Message.Request
public final MD5Digest statementId;
public final List<ByteBuffer> values;
public final ConsistencyLevel consistency;
+ public final int resultPageSize;
- public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+ public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
{
- this(MD5Digest.wrap(statementId), values, consistency);
+ this(MD5Digest.wrap(statementId), values, consistency, resultPageSize);
}
- public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency)
+ public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize)
{
super(Message.Type.EXECUTE);
this.statementId = statementId;
this.values = values;
this.consistency = consistency;
+ this.resultPageSize = resultPageSize;
}
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
@@ -104,6 +109,9 @@ public class ExecuteMessage extends Message.Request
if (statement == null)
throw new PreparedQueryNotFoundException(statementId);
+ if (resultPageSize == 0)
+ throw new ProtocolException("The page size cannot be 0");
+
UUID tracingId = null;
if (isTracingRequested())
{
@@ -114,11 +122,16 @@ public class ExecuteMessage extends Message.Request
if (state.traceNextQuery())
{
state.createTracingSession();
+
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ if (resultPageSize > 0)
+ builder.put("page_size", Integer.toString(resultPageSize));
+
// TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support.
- Tracing.instance.begin("Execute CQL3 prepared query", Collections.<String, String>emptyMap());
+ Tracing.instance.begin("Execute CQL3 prepared query", builder.build());
}
- Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values);
+ Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values, resultPageSize);
if (tracingId != null)
response.setTracingId(tracingId);
@@ -132,6 +145,9 @@ public class ExecuteMessage extends Message.Request
finally
{
Tracing.instance.stopSession();
+ // Trash the current session id if we won't need it anymore
+ if (!state.hasPager())
+ state.getAndResetCurrentTracingSession();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/NextMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/NextMessage.java b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
new file mode 100644
index 0000000..d68f603
--- /dev/null
+++ b/src/java/org/apache/cassandra/transport/messages/NextMessage.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport.messages;
+
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableMap;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.*;
+import org.apache.cassandra.utils.UUIDGen;
+
+public class NextMessage extends Message.Request
+{
+ public static final Message.Codec<NextMessage> codec = new Message.Codec<NextMessage>()
+ {
+ public NextMessage decode(ChannelBuffer body, int version)
+ {
+ int resultPageSize = body.readInt();
+ return new NextMessage(resultPageSize);
+ }
+
+ public ChannelBuffer encode(NextMessage msg, int version)
+ {
+ return CBUtil.intToCB(msg.resultPageSize);
+ }
+ };
+
+ public final int resultPageSize;
+
+ public NextMessage(int resultPageSize)
+ {
+ super(Type.NEXT);
+ this.resultPageSize = resultPageSize;
+ }
+
+ public ChannelBuffer encode()
+ {
+ return codec.encode(this, getVersion());
+ }
+
+ public Message.Response execute(QueryState state)
+ {
+ try
+ {
+ if (resultPageSize == 0)
+ throw new ProtocolException("The page size cannot be 0");
+
+ /*
+ * If we had traced the previous page and we are asked to trace this one,
+ * record the previous id to allow linking the trace together.
+ */
+ UUID previousTracingId = state.getAndResetCurrentTracingSession();
+
+ UUID tracingId = null;
+ if (isTracingRequested())
+ {
+ tracingId = UUIDGen.getTimeUUID();
+ state.prepareTracingSession(tracingId);
+ }
+
+ if (state.traceNextQuery())
+ {
+ state.createTracingSession();
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ if (resultPageSize > 0)
+ builder.put("page_size", Integer.toString(resultPageSize));
+ if (previousTracingId != null)
+ builder.put("previous_trace", previousTracingId.toString());
+ Tracing.instance.begin("Continue paged CQL3 query", builder.build());
+ }
+
+ Message.Response response = state.getNextPage(resultPageSize < 0 ? Integer.MAX_VALUE : resultPageSize);
+
+ if (tracingId != null)
+ response.setTracingId(tracingId);
+
+ return response;
+ }
+ catch (Exception e)
+ {
+ if (!((e instanceof RequestValidationException) || (e instanceof RequestExecutionException)))
+ logger.error("Unexpected error during query", e);
+ return ErrorMessage.fromException(e);
+ }
+ finally
+ {
+ Tracing.instance.stopSession();
+ // Trash the current session id if we won't need it anymore
+ if (!state.hasPager())
+ state.getAndResetCurrentTracingSession();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "NEXT " + resultPageSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 6e753d3..5afefb5 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -42,7 +42,7 @@ public class OptionsMessage extends Message.Request
return new OptionsMessage();
}
- public ChannelBuffer encode(OptionsMessage msg)
+ public ChannelBuffer encode(OptionsMessage msg, int version)
{
return ChannelBuffers.EMPTY_BUFFER;
}
@@ -55,7 +55,7 @@ public class OptionsMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e48ff293/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 3e7fe51..851f3f8 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -38,7 +38,7 @@ public class PrepareMessage extends Message.Request
return new PrepareMessage(query);
}
- public ChannelBuffer encode(PrepareMessage msg)
+ public ChannelBuffer encode(PrepareMessage msg, int version)
{
return CBUtil.longStringToCB(msg.query);
}
@@ -54,7 +54,7 @@ public class PrepareMessage extends Message.Request
public ChannelBuffer encode()
{
- return codec.encode(this);
+ return codec.encode(this, getVersion());
}
public Message.Response execute(QueryState state)