You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/18 11:24:04 UTC
[3/5] git commit: Fix infinite loop when paging queries with IN
Fix infinite loop when paging queries with IN
patch by slebresne; reviewed by iamaleksey for CASSANDRA-6464
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7c32ffbb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7c32ffbb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7c32ffbb
Branch: refs/heads/trunk
Commit: 7c32ffbbfae9959edc89ec5fcf9fced1b75c495b
Parents: f7255b5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Dec 18 11:18:30 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Dec 18 11:18:30 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../service/pager/AbstractQueryPager.java | 6 +-
.../service/pager/MultiPartitionPager.java | 89 +++++++++++++-------
.../service/pager/NamesQueryPager.java | 5 +-
.../cassandra/service/pager/QueryPagers.java | 5 +-
.../service/pager/SinglePartitionPager.java | 3 +
.../service/pager/SliceQueryPager.java | 5 ++
7 files changed, 76 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80ed481..5a124ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
* Expose a total memtable size metric for a CF (CASSANDRA-6391)
* cqlsh: handle symlinks properly (CASSANDRA-6425)
* Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
+ * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
Merged from 1.2:
* Improved error message on bad properties in DDL queries (CASSANDRA-6453)
* Randomize batchlog candidates selection (CASSANDRA-6481)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 9372665..6f6772c 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -40,9 +40,9 @@ abstract class AbstractQueryPager implements QueryPager
protected final IDiskAtomFilter columnFilter;
private final long timestamp;
- private volatile int remaining;
- private volatile boolean exhausted;
- private volatile boolean lastWasRecorded;
+ private int remaining;
+ private boolean exhausted;
+ private boolean lastWasRecorded;
protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
int toFetch,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 2615e9b..35d6752 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -43,44 +43,72 @@ class MultiPartitionPager implements QueryPager
private final SinglePartitionPager[] pagers;
private final long timestamp;
- private volatile int current;
-
- MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, boolean localQuery)
- {
- this(commands, consistencyLevel, localQuery, null);
- }
+ private int remaining;
+ private int current;
MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
{
- this.pagers = new SinglePartitionPager[commands.size()];
+ int i = 0;
+ // If it's not the beginning (state != null), we need to find where we were and skip previous commands
+ // since they are done.
+ if (state != null)
+ for (; i < commands.size(); i++)
+ if (commands.get(i).key.equals(state.partitionKey))
+ break;
+
+ if (i >= commands.size())
+ {
+ pagers = null;
+ timestamp = -1;
+ return;
+ }
+
+ pagers = new SinglePartitionPager[commands.size() - i];
+ // 'i' is on the first non exhausted pager for the previous page (or the first one)
+ pagers[0] = makePager(commands.get(i), consistencyLevel, localQuery, state);
+ timestamp = commands.get(i).timestamp;
- long tstamp = -1;
- for (int i = 0; i < commands.size(); i++)
+ // Following ones haven't been started yet
+ for (int j = i + 1; j < commands.size(); j++)
{
- ReadCommand command = commands.get(i);
- if (tstamp == -1)
- tstamp = command.timestamp;
- else if (tstamp != command.timestamp)
+ ReadCommand command = commands.get(j);
+ if (command.timestamp != timestamp)
throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
-
- PagingState tmpState = state != null && command.key.equals(state.partitionKey) ? state : null;
- pagers[i] = command instanceof SliceFromReadCommand
- ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery, tmpState)
- : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery, tmpState);
+ pagers[j - i] = makePager(command, consistencyLevel, localQuery, null);
}
- timestamp = tstamp;
+ remaining = state == null ? computeRemaining(pagers) : state.remaining;
+ }
+
+ private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+ {
+ return command instanceof SliceFromReadCommand
+ ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery, state)
+ : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery);
+ }
+
+ private static int computeRemaining(SinglePartitionPager[] pagers)
+ {
+ long remaining = 0;
+ for (SinglePartitionPager pager : pagers)
+ remaining += pager.maxRemaining();
+ return remaining > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)remaining;
}
public PagingState state()
{
+ // Sets current to the first non-exhausted pager
+ if (isExhausted())
+ return null;
+
PagingState state = pagers[current].state();
- return state == null
- ? null
- : new PagingState(state.partitionKey, state.cellName, maxRemaining());
+ return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining);
}
public boolean isExhausted()
{
+ if (remaining <= 0 || pagers == null)
+ return true;
+
while (current < pagers.length)
{
if (!pagers[current].isExhausted())
@@ -93,18 +121,20 @@ class MultiPartitionPager implements QueryPager
public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
{
- int remaining = pageSize;
List<Row> result = new ArrayList<Row>();
- while (!isExhausted() && remaining > 0)
+ int remainingThisQuery = pageSize;
+ while (remainingThisQuery > 0 && !isExhausted())
{
- // Exhausted also sets us on the first non-exhausted pager
- List<Row> page = pagers[current].fetchPage(remaining);
+ // isExhausted has set us on the first non-exhausted pager
+ List<Row> page = pagers[current].fetchPage(remainingThisQuery);
if (page.isEmpty())
continue;
Row row = page.get(0);
- remaining -= pagers[current].columnCounter().countAll(row.cf).live();
+ int fetched = pagers[current].columnCounter().countAll(row.cf).live();
+ remaining -= fetched;
+ remainingThisQuery -= fetched;
result.add(row);
}
@@ -113,10 +143,7 @@ class MultiPartitionPager implements QueryPager
public int maxRemaining()
{
- int max = 0;
- for (int i = current; i < pagers.length; i++)
- max += pagers[i].maxRemaining();
- return max;
+ return remaining;
}
public long timestamp()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
index ede1e91..663db22 100644
--- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.service.pager;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -55,9 +56,9 @@ public class NamesQueryPager implements SinglePartitionPager
this.localQuery = localQuery;
}
- NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+ public ByteBuffer key()
{
- this(command, consistencyLevel, localQuery);
+ return command.key;
}
public ColumnCounter columnCounter()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 1601ff6..c353536 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -57,7 +57,8 @@ public class QueryPagers
{
List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
- int maxQueried = 0;
+ // Using long on purpose, as we could overflow otherwise
+ long maxQueried = 0;
for (ReadCommand readCmd : commands)
maxQueried += maxQueried(readCmd);
@@ -78,7 +79,7 @@ public class QueryPagers
private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, boolean local, PagingState state)
{
if (command instanceof SliceByNamesReadCommand)
- return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local, state);
+ return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, local);
else
return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local, state);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 693a20e..51bbf90 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.service.pager;
+import java.nio.ByteBuffer;
+
import org.apache.cassandra.db.filter.ColumnCounter;
/**
@@ -26,5 +28,6 @@ import org.apache.cassandra.db.filter.ColumnCounter;
*/
public interface SinglePartitionPager extends QueryPager
{
+ public ByteBuffer key();
public ColumnCounter columnCounter();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
index e3825a9..cd0c069 100644
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -54,6 +54,11 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
}
}
+ public ByteBuffer key()
+ {
+ return command.key;
+ }
+
public PagingState state()
{
return lastReturned == null