You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:35 UTC
[11/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index c330eea..2c16ace 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,156 +17,133 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.*;
-
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.service.ClientState;
abstract class AbstractQueryPager implements QueryPager
{
- private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class);
+ protected final ReadCommand command;
+ protected final DataLimits limits;
- private final ConsistencyLevel consistencyLevel;
- private final boolean localQuery;
+ private int remaining;
- protected final CFMetaData cfm;
- protected final IDiskAtomFilter columnFilter;
- private final long timestamp;
+ // This is the last key we've been reading from (or can still be reading within). This the key for
+ // which remainingInPartition makes sense: if we're starting another key, we should reset remainingInPartition
+ // (and this is done in PagerIterator). This can be null (when we start).
+ private DecoratedKey lastKey;
+ private int remainingInPartition;
- private int remaining;
private boolean exhausted;
- private boolean shouldFetchExtraRow;
- protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
- int toFetch,
- boolean localQuery,
- String keyspace,
- String columnFamily,
- IDiskAtomFilter columnFilter,
- long timestamp)
+ protected AbstractQueryPager(ReadCommand command)
{
- this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp);
+ this.command = command;
+ this.limits = command.limits();
+
+ this.remaining = limits.count();
+ this.remainingInPartition = limits.perPartitionCount();
}
- protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
- int toFetch,
- boolean localQuery,
- CFMetaData cfm,
- IDiskAtomFilter columnFilter,
- long timestamp)
+ public ReadOrderGroup startOrderGroup()
{
- this.consistencyLevel = consistencyLevel;
- this.localQuery = localQuery;
+ return command.startOrderGroup();
+ }
- this.cfm = cfm;
- this.columnFilter = columnFilter;
- this.timestamp = timestamp;
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+ {
+ if (isExhausted())
+ return PartitionIterators.EMPTY;
- this.remaining = toFetch;
+ pageSize = Math.min(pageSize, remaining);
+ return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState), limits.forPaging(pageSize), command.nowInSec());
}
-
- public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
{
if (isExhausted())
- return Collections.emptyList();
+ return PartitionIterators.EMPTY;
- int currentPageSize = nextPageSize(pageSize);
- List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
+ pageSize = Math.min(pageSize, remaining);
+ return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup), limits.forPaging(pageSize), command.nowInSec());
+ }
- if (rows.isEmpty())
- {
- logger.debug("Got empty set of rows, considering pager exhausted");
- exhausted = true;
- return Collections.emptyList();
- }
+ private class PagerIterator extends CountingPartitionIterator
+ {
+ private final DataLimits pageLimits;
- int liveCount = getPageLiveCount(rows);
- logger.debug("Fetched {} live rows", liveCount);
+ private Row lastRow;
- // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may be greater than what asked
- // (currentPageSize). This would throw off the paging logic so we trim the excess. It's not extremely efficient
- // but most of the time there should be nothing or very little to trim.
- if (liveCount > currentPageSize)
+ private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec)
{
- rows = discardLast(rows, liveCount - currentPageSize);
- liveCount = currentPageSize;
+ super(iter, pageLimits, nowInSec);
+ this.pageLimits = pageLimits;
}
- remaining -= liveCount;
-
- // If we've got less than requested, there is no more query to do (but
- // we still need to return the current page)
- if (liveCount < currentPageSize)
+ @Override
+ @SuppressWarnings("resource") // iter is closed by closing the result
+ public RowIterator next()
{
- logger.debug("Got result ({}) smaller than page size ({}), considering pager exhausted", liveCount, currentPageSize);
- exhausted = true;
- }
+ RowIterator iter = super.next();
+ try
+ {
+ DecoratedKey key = iter.partitionKey();
+ if (lastKey == null || !lastKey.equals(key))
+ remainingInPartition = limits.perPartitionCount();
- // If it's not the first query and the first column is the last one returned (likely
- // but not certain since paging can race with deletes/expiration), then remove the
- // first column.
- if (containsPreviousLast(rows.get(0)))
- {
- rows = discardFirst(rows);
- remaining++;
- }
- // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the page size,
- // so if the page is full, trim the last entry
- else if (shouldFetchExtraRow && !exhausted)
- {
- // We've asked for one more than necessary
- rows = discardLast(rows);
- remaining++;
+ lastKey = key;
+ return new RowPagerIterator(iter);
+ }
+ catch (RuntimeException e)
+ {
+ iter.close();
+ throw e;
+ }
}
- logger.debug("Remaining rows to page: {}", remaining);
-
- if (!isExhausted())
- shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1));
+ @Override
+ public void close()
+ {
+ super.close();
+ recordLast(lastKey, lastRow);
- return rows;
- }
+ int counted = counter.counted();
+ remaining -= counted;
+ remainingInPartition -= counter.countedInCurrentPartition();
+ exhausted = counted < pageLimits.count();
+ }
- private List<Row> filterEmpty(List<Row> result)
- {
- for (Row row : result)
+ private class RowPagerIterator extends WrappingRowIterator
{
- if (row.cf == null || !row.cf.hasColumns())
+ RowPagerIterator(RowIterator iter)
{
- List<Row> newResult = new ArrayList<Row>(result.size() - 1);
- for (Row row2 : result)
- {
- if (row2.cf == null || !row2.cf.hasColumns())
- continue;
+ super(iter);
+ }
- newResult.add(row2);
- }
- return newResult;
+ @Override
+ public Row next()
+ {
+ lastRow = super.next();
+ return lastRow;
}
}
- return result;
}
- protected void restoreState(int remaining, boolean shouldFetchExtraRow)
+ protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition)
{
+ this.lastKey = lastKey;
this.remaining = remaining;
- this.shouldFetchExtraRow = shouldFetchExtraRow;
+ this.remainingInPartition = remainingInPartition;
}
public boolean isExhausted()
{
- return exhausted || remaining == 0;
+ return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) && remainingInPartition == 0);
}
public int maxRemaining()
@@ -174,220 +151,11 @@ abstract class AbstractQueryPager implements QueryPager
return remaining;
}
- public long timestamp()
+ protected int remainingInPartition()
{
- return timestamp;
- }
-
- private int nextPageSize(int pageSize)
- {
- return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0);
- }
-
- public ColumnCounter columnCounter()
- {
- return columnFilter.columnCounter(cfm.comparator, timestamp);
- }
-
- protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
-
- /**
- * Checks to see if the first row of a new page contains the last row from the previous page.
- * @param first the first row of the new page
- * @return true if <code>first</code> contains the last from from the previous page and it is live, false otherwise
- */
- protected abstract boolean containsPreviousLast(Row first);
-
- /**
- * Saves the paging state by recording the last seen partition key and cell name (where applicable).
- * @param last the last row in the current page
- * @return true if an extra row should be fetched in the next page,false otherwise
- */
- protected abstract boolean recordLast(Row last);
-
- protected abstract boolean isReversed();
-
- private List<Row> discardFirst(List<Row> rows)
- {
- return discardFirst(rows, 1);
- }
-
- @VisibleForTesting
- List<Row> discardFirst(List<Row> rows, int toDiscard)
- {
- if (toDiscard == 0 || rows.isEmpty())
- return rows;
-
- int i = 0;
- DecoratedKey firstKey = null;
- ColumnFamily firstCf = null;
- while (toDiscard > 0 && i < rows.size())
- {
- Row first = rows.get(i++);
- firstKey = first.key;
- firstCf = first.cf.cloneMeShallow(isReversed());
- toDiscard -= isReversed()
- ? discardLast(first.cf, toDiscard, firstCf)
- : discardFirst(first.cf, toDiscard, firstCf);
- }
-
- // If there is less live data than to discard, all is discarded
- if (toDiscard > 0)
- return Collections.<Row>emptyList();
-
- // i is the index of the first row that we are sure to keep. On top of that,
- // we also keep firstCf is it hasn't been fully emptied by the last iteration above.
- int count = firstCf.getColumnCount();
- int newSize = rows.size() - (count == 0 ? i : i - 1);
- List<Row> newRows = new ArrayList<Row>(newSize);
- if (count != 0)
- newRows.add(new Row(firstKey, firstCf));
- newRows.addAll(rows.subList(i, rows.size()));
-
- return newRows;
+ return remainingInPartition;
}
- private List<Row> discardLast(List<Row> rows)
- {
- return discardLast(rows, 1);
- }
-
- @VisibleForTesting
- List<Row> discardLast(List<Row> rows, int toDiscard)
- {
- if (toDiscard == 0 || rows.isEmpty())
- return rows;
-
- int i = rows.size()-1;
- DecoratedKey lastKey = null;
- ColumnFamily lastCf = null;
- while (toDiscard > 0 && i >= 0)
- {
- Row last = rows.get(i--);
- lastKey = last.key;
- lastCf = last.cf.cloneMeShallow(isReversed());
- toDiscard -= isReversed()
- ? discardFirst(last.cf, toDiscard, lastCf)
- : discardLast(last.cf, toDiscard, lastCf);
- }
-
- // If there is less live data than to discard, all is discarded
- if (toDiscard > 0)
- return Collections.<Row>emptyList();
-
- // i is the index of the last row that we are sure to keep. On top of that,
- // we also keep lastCf is it hasn't been fully emptied by the last iteration above.
- int count = lastCf.getColumnCount();
- int newSize = count == 0 ? i+1 : i+2;
- List<Row> newRows = new ArrayList<Row>(newSize);
- newRows.addAll(rows.subList(0, i+1));
- if (count != 0)
- newRows.add(new Row(lastKey, lastCf));
-
- return newRows;
- }
-
- private int getPageLiveCount(List<Row> page)
- {
- int count = 0;
- for (Row row : page)
- count += columnCounter().countAll(row.cf).live();
- return count;
- }
-
- private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
- {
- boolean isReversed = isReversed();
- DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
- return isReversed
- ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester)
- : discardHead(toDiscard, newCf, cf.iterator(), tester);
- }
-
- private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
- {
- boolean isReversed = isReversed();
- DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
- return isReversed
- ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester)
- : discardTail(cf, toDiscard, newCf, cf.iterator(), tester);
- }
-
- private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
- {
- ColumnCounter counter = columnCounter();
-
- List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size());
-
- // Discard the first 'toDiscard' live, non-static cells
- while (iter.hasNext())
- {
- Cell c = iter.next();
-
- // if it's a static column, don't count it and save it to add to the trimmed results
- ColumnDefinition columnDef = cfm.getColumnDefinition(c.name());
- if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC)
- {
- staticCells.add(c);
- continue;
- }
-
- counter.count(c, tester);
-
- // once we've discarded the required amount, add the rest
- if (counter.live() > toDiscard)
- {
- for (Cell staticCell : staticCells)
- copy.addColumn(staticCell);
-
- copy.addColumn(c);
- while (iter.hasNext())
- copy.addColumn(iter.next());
- }
- }
- return Math.min(counter.live(), toDiscard);
- }
-
- private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
- {
- // Redoing the counting like that is not extremely efficient.
- // This is called only for reversed slices or in the case of a race between
- // paging and a deletion (pretty unlikely), so this is probably acceptable.
- int liveCount = columnCounter().countAll(cf).live();
-
- ColumnCounter counter = columnCounter();
- // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount - toDiscard')
- while (iter.hasNext())
- {
- Cell c = iter.next();
- counter.count(c, tester);
- if (counter.live() > liveCount - toDiscard)
- break;
-
- copy.addColumn(c);
- }
- return Math.min(liveCount, toDiscard);
- }
-
- /**
- * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column
- * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal
- * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we
- * need to start from the last non-static cell.
- */
- protected Cell firstNonStaticCell(ColumnFamily cf)
- {
- for (Cell cell : cf)
- {
- ColumnDefinition def = cfm.getColumnDefinition(cell.name());
- if (def == null || def.kind != ColumnDefinition.Kind.STATIC)
- return cell;
- }
- return null;
- }
-
- protected static Cell lastCell(ColumnFamily cf)
- {
- return cf.getReverseSortedColumns().iterator().next();
- }
+ protected abstract ReadCommand nextPageReadCommand(int pageSize);
+ protected abstract void recordLast(DecoratedKey key, Row row);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 35d0971..4fb1429 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -17,10 +17,14 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.ArrayList;
import java.util.List;
+import com.google.common.collect.AbstractIterator;
+
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.service.ClientState;
@@ -39,53 +43,44 @@ import org.apache.cassandra.service.ClientState;
* cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't
* blow out memory.
*/
-class MultiPartitionPager implements QueryPager
+public class MultiPartitionPager implements QueryPager
{
private final SinglePartitionPager[] pagers;
- private final long timestamp;
+ private final DataLimits limit;
+
+ private final int nowInSec;
private int remaining;
private int current;
- MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state, int limitForQuery)
+ public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state)
{
+ this.limit = group.limits();
+ this.nowInSec = group.nowInSec();
+
int i = 0;
// If it's not the beginning (state != null), we need to find where we were and skip previous commands
// since they are done.
if (state != null)
- for (; i < commands.size(); i++)
- if (commands.get(i).key.equals(state.partitionKey))
+ for (; i < group.commands.size(); i++)
+ if (group.commands.get(i).partitionKey().getKey().equals(state.partitionKey))
break;
- if (i >= commands.size())
+ if (i >= group.commands.size())
{
pagers = null;
- timestamp = -1;
return;
}
- pagers = new SinglePartitionPager[commands.size() - i];
+ pagers = new SinglePartitionPager[group.commands.size() - i];
// 'i' is on the first non exhausted pager for the previous page (or the first one)
- pagers[0] = makePager(commands.get(i), consistencyLevel, cState, localQuery, state);
- timestamp = commands.get(i).timestamp;
+ pagers[0] = group.commands.get(i).getPager(state);
// Following ones haven't been started yet
- for (int j = i + 1; j < commands.size(); j++)
- {
- ReadCommand command = commands.get(j);
- if (command.timestamp != timestamp)
- throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
- pagers[j - i] = makePager(command, consistencyLevel, cState, localQuery, null);
- }
-
- remaining = state == null ? limitForQuery : state.remaining;
- }
+ for (int j = i + 1; j < group.commands.size(); j++)
+ pagers[j - i] = group.commands.get(j).getPager(null);
- private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state)
- {
- return command instanceof SliceFromReadCommand
- ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, localQuery, state)
- : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, localQuery);
+ remaining = state == null ? limit.count() : state.remaining;
}
public PagingState state()
@@ -95,7 +90,7 @@ class MultiPartitionPager implements QueryPager
return null;
PagingState state = pagers[current].state();
- return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining);
+ return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining, Integer.MAX_VALUE);
}
public boolean isExhausted()
@@ -113,35 +108,92 @@ class MultiPartitionPager implements QueryPager
return true;
}
- public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
+ public ReadOrderGroup startOrderGroup()
{
- List<Row> result = new ArrayList<Row>();
-
- int remainingThisQuery = Math.min(remaining, pageSize);
- while (remainingThisQuery > 0 && !isExhausted())
+ // Note that for all pagers, the only difference is the partition key to which it applies, so in practice we
+ // can use any of the sub-pager ReadOrderGroup group to protect the whole pager
+ for (int i = current; i < pagers.length; i++)
{
- // isExhausted has set us on the first non-exhausted pager
- List<Row> page = pagers[current].fetchPage(remainingThisQuery);
- if (page.isEmpty())
- continue;
-
- Row row = page.get(0);
- int fetched = pagers[current].columnCounter().countAll(row.cf).live();
- remaining -= fetched;
- remainingThisQuery -= fetched;
- result.add(row);
+ if (pagers[i] != null)
+ return pagers[i].startOrderGroup();
}
+ throw new AssertionError("Shouldn't be called on an exhausted pager");
+ }
- return result;
+ @SuppressWarnings("resource")
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+ {
+ int toQuery = Math.min(remaining, pageSize);
+ PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null);
+ CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec);
+ iter.setCounter(countingIter.counter());
+ return countingIter;
}
- public int maxRemaining()
+ public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
{
- return remaining;
+ int toQuery = Math.min(remaining, pageSize);
+ PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup);
+ CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec);
+ iter.setCounter(countingIter.counter());
+ return countingIter;
+ }
+
+ private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+ {
+ private final int pageSize;
+ private PartitionIterator result;
+ private DataLimits.Counter counter;
+
+ // For "normal" queries
+ private final ConsistencyLevel consistency;
+ private final ClientState clientState;
+
+ // For internal queries
+ private final ReadOrderGroup orderGroup;
+
+ public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadOrderGroup orderGroup)
+ {
+ this.pageSize = pageSize;
+ this.consistency = consistency;
+ this.clientState = clientState;
+ this.orderGroup = orderGroup;
+ }
+
+ public void setCounter(DataLimits.Counter counter)
+ {
+ this.counter = counter;
+ }
+
+ protected RowIterator computeNext()
+ {
+ while (result == null || !result.hasNext())
+ {
+ // This sets us on the first non-exhausted pager
+ if (isExhausted())
+ return endOfData();
+
+ if (result != null)
+ result.close();
+
+ int toQuery = pageSize - counter.counted();
+ result = consistency == null
+ ? pagers[current].fetchPageInternal(toQuery, orderGroup)
+ : pagers[current].fetchPage(toQuery, consistency, clientState);
+ }
+ return result.next();
+ }
+
+ public void close()
+ {
+ remaining -= counter.counted();
+ if (result != null)
+ result.close();
+ }
}
- public long timestamp()
+ public int maxRemaining()
{
- return timestamp;
+ return remaining;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
deleted file mode 100644
index d03e582..0000000
--- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service.pager;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
-
-/**
- * Pager over a SliceByNamesReadCommand.
- */
-public class NamesQueryPager implements SinglePartitionPager
-{
- private final SliceByNamesReadCommand command;
- private final ConsistencyLevel consistencyLevel;
- private final ClientState state;
- private final boolean localQuery;
-
- private volatile boolean queried;
-
- /**
- * For now, we'll only use this in CQL3. In there, as name query can never
- * yield more than one CQL3 row, there is no need for paging and so this is straight-forward.
- *
- * For thrift, we could imagine needing to page, though even then it's very
- * unlikely unless the pageSize is very small.
- *
- * In any case we currently assert in fetchPage if it's a "thrift" query (i.e. a query that
- * count every cell individually) and the names filter asks for more than pageSize columns.
- */
- // Don't use directly, use QueryPagers method instead
- NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, boolean localQuery)
- {
- this.command = command;
- this.consistencyLevel = consistencyLevel;
- this.state = state;
- this.localQuery = localQuery;
- }
-
- public ByteBuffer key()
- {
- return command.key;
- }
-
- public ColumnCounter columnCounter()
- {
- // We know NamesQueryFilter.columnCounter don't care about his argument
- return command.filter.columnCounter(null, command.timestamp);
- }
-
- public PagingState state()
- {
- return null;
- }
-
- public boolean isExhausted()
- {
- return queried;
- }
-
- public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
- {
- assert command.filter.countCQL3Rows() || command.filter.columns.size() <= pageSize;
-
- if (isExhausted())
- return Collections.<Row>emptyList();
-
- queried = true;
- return localQuery
- ? Collections.singletonList(command.getRow(Keyspace.open(command.ksName)))
- : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel, state);
- }
-
- public int maxRemaining()
- {
- if (queried)
- return 0;
-
- return command.filter.countCQL3Rows() ? 1 : command.filter.columns.size();
- }
-
- public long timestamp()
- {
- return command.timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/Pageable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/Pageable.java b/src/java/org/apache/cassandra/service/pager/Pageable.java
deleted file mode 100644
index d4986f7..0000000
--- a/src/java/org/apache/cassandra/service/pager/Pageable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service.pager;
-
-import java.util.List;
-
-import org.apache.cassandra.db.ReadCommand;
-
-/**
- * Marker interface for commands that can be paged.
- */
-public interface Pageable
-{
- public static class ReadCommands implements Pageable
- {
- public final List<ReadCommand> commands;
-
- public final int limitForQuery;
-
- public ReadCommands(List<ReadCommand> commands, int limitForQuery)
- {
- this.commands = commands;
- this.limitForQuery = limitForQuery;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/PagingState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java
index f168880..685dc3f 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -31,12 +31,14 @@ public class PagingState
public final ByteBuffer partitionKey;
public final ByteBuffer cellName;
public final int remaining;
+ public final int remainingInPartition;
- public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining)
+ public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining, int remainingInPartition)
{
this.partitionKey = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey;
this.cellName = cellName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : cellName;
this.remaining = remaining;
+ this.remainingInPartition = remainingInPartition;
}
public static PagingState deserialize(ByteBuffer bytes)
@@ -50,7 +52,12 @@ public class PagingState
ByteBuffer pk = ByteBufferUtil.readWithShortLength(in);
ByteBuffer cn = ByteBufferUtil.readWithShortLength(in);
int remaining = in.readInt();
- return new PagingState(pk, cn, remaining);
+ // Note that while 'in.available()' is theoretically an estimate of how many bytes are available
+ // without blocking, we know that since we're reading a ByteBuffer it will be exactly how many
+ // bytes remain to be read. And the reason we want to condition this is for backward compatility
+ // as we used to not set this.
+ int remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE;
+ return new PagingState(pk, cn, remaining, remainingInPartition);
}
catch (IOException e)
{
@@ -65,6 +72,7 @@ public class PagingState
ByteBufferUtil.writeWithShortLength(partitionKey, out);
ByteBufferUtil.writeWithShortLength(cellName, out);
out.writeInt(remaining);
+ out.writeInt(remainingInPartition);
return out.buffer();
}
catch (IOException e)
@@ -77,12 +85,16 @@ public class PagingState
{
return 2 + partitionKey.remaining()
+ 2 + cellName.remaining()
- + 4;
+ + 8; // remaining & remainingInPartition
}
@Override
public String toString()
{
- return String.format("PagingState(key=%s, cellname=%s, remaining=%d", ByteBufferUtil.bytesToHex(partitionKey), ByteBufferUtil.bytesToHex(cellName), remaining);
+ return String.format("PagingState(key=%s, cellname=%s, remaining=%d, remainingInPartition=%d",
+ ByteBufferUtil.bytesToHex(partitionKey),
+ ByteBufferUtil.bytesToHex(cellName),
+ remaining,
+ remainingInPartition);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index ab2dad7..a69335d 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -17,11 +17,13 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.List;
-
-import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
/**
* Perform a query, paging it by page of a given size.
@@ -44,13 +46,69 @@ import org.apache.cassandra.exceptions.RequestValidationException;
*/
public interface QueryPager
{
+ public static final QueryPager EMPTY = new QueryPager()
+ {
+ public ReadOrderGroup startOrderGroup()
+ {
+ return ReadOrderGroup.emptyGroup();
+ }
+
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+ {
+ return PartitionIterators.EMPTY;
+ }
+
+ public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
+ {
+ return PartitionIterators.EMPTY;
+ }
+
+ public boolean isExhausted()
+ {
+ return true;
+ }
+
+ public int maxRemaining()
+ {
+ return 0;
+ }
+
+ public PagingState state()
+ {
+ return null;
+ }
+ };
+
/**
* Fetches the next page.
*
* @param pageSize the maximum number of elements to return in the next page.
+ * @param consistency the consistency level to achieve for the query.
+ * @param clientState the {@code ClientState} for the query. In practice, this can be null unless
+ * {@code consistency} is a serial consistency.
+ * @return the page of result.
+ */
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException;
+
+ /**
+ * Starts a new read operation.
+ * <p>
+ * This must be called before {@link fetchPageInternal} and passed to it to protect the read.
+ * The returned object <b>must</b> be closed on all path and it is thus strongly advised to
+ * use it in a try-with-ressource construction.
+ *
+ * @return a newly started order group for this {@code QueryPager}.
+ */
+ public ReadOrderGroup startOrderGroup();
+
+ /**
+ * Fetches the next page internally (in other, this does a local query).
+ *
+ * @param pageSize the maximum number of elements to return in the next page.
+ * @param orderGroup the {@code ReadOrderGroup} protecting the read.
* @return the page of result.
*/
- public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException;
+ public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException;
/**
* Whether or not this pager is exhausted, i.e. whether or not a call to
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index f933ccb..618ca32 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -17,180 +17,47 @@
*/
package org.apache.cassandra.service.pager;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnCounter;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.ClientState;
/**
- * Static utility methods to create query pagers.
+ * Static utility methods for paging.
*/
public class QueryPagers
{
private QueryPagers() {};
- private static int maxQueried(ReadCommand command)
- {
- if (command instanceof SliceByNamesReadCommand)
- {
- NamesQueryFilter filter = ((SliceByNamesReadCommand)command).filter;
- return filter.countCQL3Rows() ? 1 : filter.columns.size();
- }
- else
- {
- SliceQueryFilter filter = ((SliceFromReadCommand)command).filter;
- return filter.count;
- }
- }
-
- public static boolean mayNeedPaging(Pageable command, int pageSize)
- {
- if (command instanceof Pageable.ReadCommands)
- {
- List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
-
- // Using long on purpose, as we could overflow otherwise
- long maxQueried = 0;
- for (ReadCommand readCmd : commands)
- maxQueried += maxQueried(readCmd);
-
- return maxQueried > pageSize;
- }
- else if (command instanceof ReadCommand)
- {
- return maxQueried((ReadCommand)command) > pageSize;
- }
- else
- {
- assert command instanceof RangeSliceCommand;
- RangeSliceCommand rsc = (RangeSliceCommand)command;
- // We don't support paging for thrift in general because the way thrift RangeSliceCommand count rows
- // independently of cells makes things harder (see RangeSliceQueryPager). The one case where we do
- // get a RangeSliceCommand from CQL3 without the countCQL3Rows flag set is for DISTINCT. In that case
- // however, the underlying sliceQueryFilter count is 1, so that the RSC limit is still a limit on the
- // number of CQL3 rows returned.
- assert rsc.countCQL3Rows || (rsc.predicate instanceof SliceQueryFilter && ((SliceQueryFilter)rsc.predicate).count == 1);
- return rsc.maxResults > pageSize;
- }
- }
-
- private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state)
- {
- if (command instanceof SliceByNamesReadCommand)
- return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, local);
- else
- return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, local, state);
- }
-
- private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state)
- {
- if (command instanceof Pageable.ReadCommands)
- {
- List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
- if (commands.size() == 1)
- return pager(commands.get(0), consistencyLevel, cState, local, state);
-
- return new MultiPartitionPager(commands, consistencyLevel, cState, local, state, ((Pageable.ReadCommands) command).limitForQuery);
- }
- else if (command instanceof ReadCommand)
- {
- return pager((ReadCommand)command, consistencyLevel, cState, local, state);
- }
- else
- {
- assert command instanceof RangeSliceCommand;
- RangeSliceCommand rangeCommand = (RangeSliceCommand)command;
- if (rangeCommand.predicate instanceof NamesQueryFilter)
- return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local, state);
- else
- return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local, state);
- }
- }
-
- public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState)
- {
- return pager(command, consistencyLevel, cState, false, null);
- }
-
- public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, PagingState state)
- {
- return pager(command, consistencyLevel, cState, false, state);
- }
-
- public static QueryPager localPager(Pageable command)
- {
- return pager(command, null, null, true, null);
- }
-
- /**
- * Convenience method to (locally) page an internal row.
- * Used to 2ndary index a wide row without dying.
- */
- public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize)
- {
- SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter());
- final SliceQueryPager pager = new SliceQueryPager(command, null, null, true);
-
- return new Iterator<ColumnFamily>()
- {
- // We don't use AbstractIterator because we don't want hasNext() to do an actual query
- public boolean hasNext()
- {
- return !pager.isExhausted();
- }
-
- public ColumnFamily next()
- {
- try
- {
- List<Row> rows = pager.fetchPage(pageSize);
- ColumnFamily cf = rows.isEmpty() ? null : rows.get(0).cf;
- return cf == null ? ArrayBackedSortedColumns.factory.create(cfs.metadata) : cf;
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
- };
- }
-
/**
* Convenience method that count (live) cells/rows for a given slice of a row, but page underneath.
*/
- public static int countPaged(String keyspace,
- String columnFamily,
- ByteBuffer key,
- SliceQueryFilter filter,
+ public static int countPaged(CFMetaData metadata,
+ DecoratedKey key,
+ ColumnFilter columnFilter,
+ ClusteringIndexFilter filter,
+ DataLimits limits,
ConsistencyLevel consistencyLevel,
- ClientState cState,
+ ClientState state,
final int pageSize,
- long now) throws RequestValidationException, RequestExecutionException
+ int nowInSec,
+ boolean isForThrift) throws RequestValidationException, RequestExecutionException
{
- SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
- final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, cState, false);
+ SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
+ final SinglePartitionPager pager = new SinglePartitionPager(command, null);
- ColumnCounter counter = filter.columnCounter(Schema.instance.getCFMetaData(keyspace, columnFamily).comparator, now);
+ int count = 0;
while (!pager.isExhausted())
{
- List<Row> next = pager.fetchPage(pageSize);
- if (!next.isEmpty())
- counter.countAll(next.get(0).cf);
+ try (CountingPartitionIterator iter = new CountingPartitionIterator(pager.fetchPage(pageSize, consistencyLevel, state), limits, nowInSec))
+ {
+ PartitionIterators.consume(iter);
+ count += iter.counter().counted();
+ }
}
- return counter.live();
+ return count;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
index 50d1280..fffb4e1 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java
@@ -17,13 +17,11 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.List;
-
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
/**
@@ -37,25 +35,17 @@ import org.apache.cassandra.service.StorageService;
*/
public class RangeNamesQueryPager extends AbstractQueryPager
{
- private final RangeSliceCommand command;
private volatile DecoratedKey lastReturnedKey;
- // Don't use directly, use QueryPagers method instead
- RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
+ public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state)
{
- super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
- this.command = command;
- assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows();
- }
-
- RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
- {
- this(command, consistencyLevel, localQuery);
+ super(command);
+ assert command.isNamesQuery();
if (state != null)
{
lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
- restoreState(state.remaining, true);
+ restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
}
}
@@ -63,51 +53,36 @@ public class RangeNamesQueryPager extends AbstractQueryPager
{
return lastReturnedKey == null
? null
- : new PagingState(lastReturnedKey.getKey(), null, maxRemaining());
+ : new PagingState(lastReturnedKey.getKey(), null, maxRemaining(), remainingInPartition());
}
- protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ protected ReadCommand nextPageReadCommand(int pageSize)
throws RequestExecutionException
{
- AbstractRangeCommand pageCmd = command.withUpdatedLimit(pageSize);
+ PartitionRangeReadCommand pageCmd = ((PartitionRangeReadCommand)command).withUpdatedLimit(command.limits().forPaging(pageSize));
if (lastReturnedKey != null)
pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey));
- return localQuery
- ? pageCmd.executeLocally()
- : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
- }
-
- protected boolean containsPreviousLast(Row first)
- {
- // When querying the next page, we create a bound that exclude the lastReturnedKey
- return false;
- }
-
- protected boolean recordLast(Row last)
- {
- lastReturnedKey = last.key;
- // We return false as that means "can that last be in the next query?"
- return false;
+ return pageCmd;
}
- protected boolean isReversed()
+ protected void recordLast(DecoratedKey key, Row last)
{
- return false;
+ lastReturnedKey = key;
}
- private AbstractBounds<RowPosition> makeExcludingKeyBounds(RowPosition lastReturnedKey)
+ private AbstractBounds<PartitionPosition> makeExcludingKeyBounds(PartitionPosition lastReturnedKey)
{
// We return a range that always exclude lastReturnedKey, since we've already
// returned it.
- AbstractBounds<RowPosition> bounds = command.keyRange;
+ AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
if (bounds instanceof Range || bounds instanceof Bounds)
{
- return new Range<RowPosition>(lastReturnedKey, bounds.right);
+ return new Range<PartitionPosition>(lastReturnedKey, bounds.right);
}
else
{
- return new ExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+ return new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index c9a28e8..6429be0 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,17 +17,16 @@
*/
package org.apache.cassandra.service.pager;
-import java.util.List;
-
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Pages a RangeSliceCommand whose predicate is a slice query.
*
@@ -36,27 +35,21 @@ import org.apache.cassandra.service.StorageService;
*/
public class RangeSliceQueryPager extends AbstractQueryPager
{
- private final RangeSliceCommand command;
- private volatile DecoratedKey lastReturnedKey;
- private volatile CellName lastReturnedName;
+ private static final Logger logger = LoggerFactory.getLogger(RangeSliceQueryPager.class);
- // Don't use directly, use QueryPagers method instead
- RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery)
- {
- super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp);
- this.command = command;
- assert columnFilter instanceof SliceQueryFilter;
- }
+ private volatile DecoratedKey lastReturnedKey;
+ private volatile Clustering lastReturnedClustering;
- RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state)
+ public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state)
{
- this(command, consistencyLevel, localQuery);
+ super(command);
+ assert !command.isNamesQuery();
if (state != null)
{
lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey);
- lastReturnedName = cfm.comparator.cellFromByteBuffer(state.cellName);
- restoreState(state.remaining, true);
+ lastReturnedClustering = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+ restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
}
}
@@ -64,67 +57,63 @@ public class RangeSliceQueryPager extends AbstractQueryPager
{
return lastReturnedKey == null
? null
- : new PagingState(lastReturnedKey.getKey(), lastReturnedName.toByteBuffer(), maxRemaining());
+ : new PagingState(lastReturnedKey.getKey(), LegacyLayout.encodeClustering(command.metadata(), lastReturnedClustering), maxRemaining(), remainingInPartition());
}
- protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
+ protected ReadCommand nextPageReadCommand(int pageSize)
throws RequestExecutionException
{
- SliceQueryFilter sf = (SliceQueryFilter)columnFilter;
- AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey);
- Composite start = lastReturnedName == null ? sf.start() : lastReturnedName;
- PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace,
- command.columnFamily,
- command.timestamp,
- keyRange,
- sf,
- start,
- sf.finish(),
- command.rowFilter,
- pageSize,
- command.countCQL3Rows);
-
- return localQuery
- ? pageCmd.executeLocally()
- : StorageProxy.getRangeSlice(pageCmd, consistencyLevel);
- }
-
- protected boolean containsPreviousLast(Row first)
- {
- if (lastReturnedKey == null || !lastReturnedKey.equals(first.key))
- return false;
-
- // Same as SliceQueryPager, we ignore a deleted column
- Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
- return !first.cf.deletionInfo().isDeleted(firstCell)
- && firstCell.isLive(timestamp())
- && lastReturnedName.equals(firstCell.name());
- }
+ DataLimits limits;
+ DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange();
+ DataRange pageRange;
+ if (lastReturnedKey == null)
+ {
+ pageRange = fullRange;
+ limits = command.limits().forPaging(pageSize);
+ }
+ else
+ {
+ // We want to include the last returned key only if we haven't achieved our per-partition limit, otherwise, don't bother.
+ boolean includeLastKey = remainingInPartition() > 0;
+ AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey);
+ if (includeLastKey)
+ {
+ pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedClustering, false);
+ limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition());
+ }
+ else
+ {
+ pageRange = fullRange.forSubRange(bounds);
+ limits = command.limits().forPaging(pageSize);
+ }
+ }
- protected boolean recordLast(Row last)
- {
- lastReturnedKey = last.key;
- lastReturnedName = (isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf)).name();
- return true;
+ return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange);
}
- protected boolean isReversed()
+ protected void recordLast(DecoratedKey key, Row last)
{
- return ((SliceQueryFilter)command.predicate).reversed;
+ if (last != null)
+ {
+ lastReturnedKey = key;
+ lastReturnedClustering = last.clustering().takeAlias();
+ }
}
- private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey)
+ private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey)
{
- // We always include lastReturnedKey since we may still be paging within a row,
- // and PagedRangeCommand will move over if we're not anyway
- AbstractBounds<RowPosition> bounds = command.keyRange;
+ AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange();
if (bounds instanceof Range || bounds instanceof Bounds)
{
- return new Bounds<RowPosition>(lastReturnedKey, bounds.right);
+ return includeLastKey
+ ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right)
+ : new Range<PartitionPosition>(lastReturnedKey, bounds.right);
}
else
{
- return new IncludingExcludingBounds<RowPosition>(lastReturnedKey, bounds.right);
+ return includeLastKey
+ ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right)
+ : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 51bbf90..6488641 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -19,15 +19,67 @@ package org.apache.cassandra.service.pager;
import java.nio.ByteBuffer;
-import org.apache.cassandra.db.filter.ColumnCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.service.ClientState;
/**
* Common interface to single partition queries (by slice and by name).
*
* For use by MultiPartitionPager.
*/
-public interface SinglePartitionPager extends QueryPager
+public class SinglePartitionPager extends AbstractQueryPager
{
- public ByteBuffer key();
- public ColumnCounter columnCounter();
+ private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class);
+
+ private final SinglePartitionReadCommand<?> command;
+
+ private volatile Clustering lastReturned;
+
+ public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state)
+ {
+ super(command);
+ this.command = command;
+
+ if (state != null)
+ {
+ lastReturned = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+ restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
+ }
+ }
+
+ public ByteBuffer key()
+ {
+ return command.partitionKey().getKey();
+ }
+
+ public DataLimits limits()
+ {
+ return command.limits();
+ }
+
+ public PagingState state()
+ {
+ return lastReturned == null
+ ? null
+ : new PagingState(null, LegacyLayout.encodeClustering(command.metadata(), lastReturned), maxRemaining(), remainingInPartition());
+ }
+
+ protected ReadCommand nextPageReadCommand(int pageSize)
+ {
+ return command.forPaging(lastReturned, pageSize);
+ }
+
+ protected void recordLast(DecoratedKey key, Row last)
+ {
+ if (last != null)
+ lastReturned = last.clustering().takeAlias();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
deleted file mode 100644
index bc364aa..0000000
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service.pager;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.StorageProxy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Pager over a SliceFromReadCommand.
- */
-public class SliceQueryPager extends AbstractQueryPager implements SinglePartitionPager
-{
- private static final Logger logger = LoggerFactory.getLogger(SliceQueryPager.class);
-
- private final SliceFromReadCommand command;
- private final ClientState cstate;
-
- private volatile Composite lastReturned;
-
- // Don't use directly, use QueryPagers method instead
- SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery)
- {
- super(consistencyLevel, command.filter.count, localQuery, command.ksName, command.cfName, command.filter, command.timestamp);
- this.command = command;
- this.cstate = cstate;
- }
-
- SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery, PagingState state)
- {
- this(command, consistencyLevel, cstate, localQuery);
-
- if (state != null)
- {
- lastReturned = cfm.comparator.fromByteBuffer(state.cellName);
- restoreState(state.remaining, true);
- }
- }
-
- public ByteBuffer key()
- {
- return command.key;
- }
-
- public PagingState state()
- {
- return lastReturned == null
- ? null
- : new PagingState(null, lastReturned.toByteBuffer(), maxRemaining());
- }
-
- protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery)
- throws RequestValidationException, RequestExecutionException
- {
- // For some queries, such as a DISTINCT query on static columns, the limit for slice queries will be lower
- // than the page size (in the static example, it will be 1). We use the min here to ensure we don't fetch
- // more rows than we're supposed to. See CASSANDRA-8108 for more details.
- SliceQueryFilter filter = command.filter.withUpdatedCount(Math.min(command.filter.count, pageSize));
- if (lastReturned != null)
- filter = filter.withUpdatedStart(lastReturned, cfm);
-
- logger.debug("Querying next page of slice query; new filter: {}", filter);
- ReadCommand pageCmd = command.withUpdatedFilter(filter);
- return localQuery
- ? Collections.singletonList(pageCmd.getRow(Keyspace.open(command.ksName)))
- : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel, cstate);
- }
-
- protected boolean containsPreviousLast(Row first)
- {
- if (lastReturned == null)
- return false;
-
- Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf);
- // Note: we only return true if the column is the lastReturned *and* it is live. If it is deleted, it is ignored by the
- // rest of the paging code (it hasn't been counted as live in particular) and we want to act as if it wasn't there.
- return !first.cf.deletionInfo().isDeleted(firstCell)
- && firstCell.isLive(timestamp())
- && lastReturned.equals(firstCell.name());
- }
-
- protected boolean recordLast(Row last)
- {
- Cell lastCell = isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf);
- lastReturned = lastCell.name();
- return true;
- }
-
- protected boolean isReversed()
- {
- return command.filter.reversed;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 45d04f9..6077166 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -24,14 +24,18 @@ package org.apache.cassandra.service.paxos;
import java.io.DataInput;
import java.io.IOException;
import java.util.UUID;
-import java.nio.ByteBuffer;
import com.google.common.base.Objects;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.UUIDSerializer;
@@ -40,34 +44,31 @@ public class Commit
{
public static final CommitSerializer serializer = new CommitSerializer();
- public final ByteBuffer key;
public final UUID ballot;
- public final ColumnFamily update;
+ public final PartitionUpdate update;
- public Commit(ByteBuffer key, UUID ballot, ColumnFamily update)
+ public Commit(UUID ballot, PartitionUpdate update)
{
- assert key != null;
assert ballot != null;
assert update != null;
- this.key = key;
this.ballot = ballot;
this.update = update;
}
- public static Commit newPrepare(ByteBuffer key, CFMetaData metadata, UUID ballot)
+ public static Commit newPrepare(DecoratedKey key, CFMetaData metadata, UUID ballot)
{
- return new Commit(key, ballot, ArrayBackedSortedColumns.factory.create(metadata));
+ return new Commit(ballot, PartitionUpdate.emptyUpdate(metadata, key));
}
- public static Commit newProposal(ByteBuffer key, UUID ballot, ColumnFamily update)
+ public static Commit newProposal(UUID ballot, PartitionUpdate update)
{
- return new Commit(key, ballot, updatesWithPaxosTime(update, ballot));
+ return new Commit(ballot, updatesWithPaxosTime(update, ballot));
}
- public static Commit emptyCommit(ByteBuffer key, CFMetaData metadata)
+ public static Commit emptyCommit(DecoratedKey key, CFMetaData metadata)
{
- return new Commit(key, UUIDGen.minTimeUUID(0), ArrayBackedSortedColumns.factory.create(metadata));
+ return new Commit(UUIDGen.minTimeUUID(0), PartitionUpdate.emptyUpdate(metadata, key));
}
public boolean isAfter(Commit other)
@@ -83,7 +84,7 @@ public class Commit
public Mutation makeMutation()
{
assert update != null;
- return new Mutation(key, update);
+ return new Mutation(update);
}
@Override
@@ -95,7 +96,6 @@ public class Commit
Commit commit = (Commit) o;
if (!ballot.equals(commit.ballot)) return false;
- if (!key.equals(commit.key)) return false;
if (!update.equals(commit.update)) return false;
return true;
@@ -104,52 +104,88 @@ public class Commit
@Override
public int hashCode()
{
- return Objects.hashCode(key, ballot, update);
+ return Objects.hashCode(ballot, update);
}
- private static ColumnFamily updatesWithPaxosTime(ColumnFamily updates, UUID ballot)
+ private static PartitionUpdate updatesWithPaxosTime(PartitionUpdate update, UUID ballot)
{
- ColumnFamily cf = updates.cloneMeShallow();
long t = UUIDGen.microsTimestamp(ballot);
- // For the tombstones, we use t-1 so that when insert a collection literall, the range tombstone that deletes the previous values of
- // the collection and we want that to have a lower timestamp and our new values. Since tombstones wins over normal insert, using t-1
- // should not be a problem in general (see #6069).
- cf.deletionInfo().updateAllTimestamp(t-1);
- for (Cell cell : updates)
- cf.addAtom(cell.withUpdatedTimestamp(t));
- return cf;
+ // Using t-1 for tombstones so deletion doesn't trump newly inserted data (#6069)
+ PartitionUpdate newUpdate = new PartitionUpdate(update.metadata(),
+ update.partitionKey(),
+ update.deletionInfo().updateAllTimestamp(t-1),
+ update.columns(),
+ update.rowCount());
+
+ if (!update.staticRow().isEmpty())
+ copyWithUpdatedTimestamp(update.staticRow(), newUpdate.staticWriter(), t);
+
+ for (Row row : update)
+ copyWithUpdatedTimestamp(row, newUpdate.writer(), t);
+
+ return newUpdate;
+ }
+
+ private static void copyWithUpdatedTimestamp(Row row, Row.Writer writer, long timestamp)
+ {
+ Rows.writeClustering(row.clustering(), writer);
+ writer.writePartitionKeyLivenessInfo(row.primaryKeyLivenessInfo().withUpdatedTimestamp(timestamp));
+ writer.writeRowDeletion(row.deletion());
+
+ for (Cell cell : row)
+ writer.writeCell(cell.column(), cell.isCounterCell(), cell.value(), cell.livenessInfo().withUpdatedTimestamp(timestamp), cell.path());
+
+ for (int i = 0; i < row.columns().complexColumnCount(); i++)
+ {
+ ColumnDefinition c = row.columns().getComplex(i);
+ DeletionTime dt = row.getDeletion(c);
+ // We use t-1 to make sure that on inserting a collection literal, the deletion that comes with it does not
+ // end up deleting the inserted data (see #6069)
+ if (!dt.isLive())
+ writer.writeComplexDeletion(c, new SimpleDeletionTime(timestamp-1, dt.localDeletionTime()));
+ }
+ writer.endOfRow();
}
@Override
public String toString()
{
- return String.format("Commit(%s, %s, %s)", ByteBufferUtil.bytesToHex(key), ballot, update);
+ return String.format("Commit(%s, %s)", ballot, update);
}
public static class CommitSerializer implements IVersionedSerializer<Commit>
{
public void serialize(Commit commit, DataOutputPlus out, int version) throws IOException
{
- ByteBufferUtil.writeWithShortLength(commit.key, out);
+ if (version < MessagingService.VERSION_30)
+ ByteBufferUtil.writeWithShortLength(commit.update.partitionKey().getKey(), out);
+
UUIDSerializer.serializer.serialize(commit.ballot, out, version);
- ColumnFamily.serializer.serialize(commit.update, out, version);
+ PartitionUpdate.serializer.serialize(commit.update, out, version);
}
public Commit deserialize(DataInput in, int version) throws IOException
{
- return new Commit(ByteBufferUtil.readWithShortLength(in),
- UUIDSerializer.serializer.deserialize(in, version),
- ColumnFamily.serializer.deserialize(in,
- ArrayBackedSortedColumns.factory,
- ColumnSerializer.Flag.LOCAL,
- version));
+ DecoratedKey key = null;
+ if (version < MessagingService.VERSION_30)
+ key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+
+ UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
+ PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, key);
+ return new Commit(ballot, update);
}
public long serializedSize(Commit commit, int version)
{
- return 2 + commit.key.remaining()
- + UUIDSerializer.serializer.serializedSize(commit.ballot, version)
- + ColumnFamily.serializer.serializedSize(commit.update, version);
+ TypeSizes sizes = TypeSizes.NATIVE;
+
+ int size = 0;
+ if (version < MessagingService.VERSION_30)
+ size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey(), sizes);
+
+ return size
+ + UUIDSerializer.serializer.serializedSize(commit.ballot, version)
+ + PartitionUpdate.serializer.serializedSize(commit.update, version, sizes);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 01e03f4..20ccb90 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -39,14 +39,14 @@ public class PaxosState
private final Commit accepted;
private final Commit mostRecentCommit;
- public PaxosState(ByteBuffer key, CFMetaData metadata)
+ public PaxosState(DecoratedKey key, CFMetaData metadata)
{
this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
}
public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit)
{
- assert promised.key == accepted.key && accepted.key == mostRecentCommit.key;
+ assert promised.update.partitionKey().equals(accepted.update.partitionKey()) && accepted.update.partitionKey().equals(mostRecentCommit.update.partitionKey());
assert promised.update.metadata() == accepted.update.metadata() && accepted.update.metadata() == mostRecentCommit.update.metadata();
this.promised = promised;
@@ -59,11 +59,11 @@ public class PaxosState
long start = System.nanoTime();
try
{
- Lock lock = LOCKS.get(toPrepare.key);
+ Lock lock = LOCKS.get(toPrepare.update.partitionKey());
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
+ PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata());
if (toPrepare.isAfter(state.promised))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -94,11 +94,11 @@ public class PaxosState
long start = System.nanoTime();
try
{
- Lock lock = LOCKS.get(proposal.key);
+ Lock lock = LOCKS.get(proposal.update.partitionKey());
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
+ PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata());
if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
{
Tracing.trace("Accepting proposal {}", proposal);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index a446b0b..7b5edf2 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
- public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets, ConsistencyLevel consistency)
+ public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency)
{
super(targets, consistency);
// need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected