You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/03/19 16:18:52 UTC
[3/3] git commit: Revert "fix KEYS index from skipping results" Needs
test case + backport to 1.0.9
Revert "fix KEYS index from skipping results"
Needs test case + backport to 1.0.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e2706a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e2706a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e2706a7
Branch: refs/heads/cassandra-1.1
Commit: 0e2706a7f1eec0ba161177002e6e25bb58678607
Parents: fe98003
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Mar 19 10:18:01 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Mar 19 10:18:01 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../db/index/MultiRowIndexSearcherIterator.java | 225 ---------------
.../cassandra/db/index/keys/KeysSearcher.java | 136 ++++++++--
3 files changed, 115 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ddeb4c4..cb7a9c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,6 @@
1.1.1-dev
* optimize commitlog checksumming (CASSANDRA-3610)
* identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261)
- * fix KEYS index from skipping results (CASSANDRA-3996)
1.1-dev
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java b/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
deleted file mode 100644
index ba7a023..0000000
--- a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.thrift.IndexExpression;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-
-/**
- * This class is a general searcher that visits rows returned by nextIndexKey();
- */
-public abstract class MultiRowIndexSearcherIterator extends ColumnFamilyStore.AbstractScanIterator
-{
- private static final Logger logger = LoggerFactory.getLogger(MultiRowIndexSearcherIterator.class);
- private static final Iterator<IColumn> EMPTY_ITERATOR = Collections.<IColumn>emptyList().iterator();
-
- /* keys within a row */
- protected final AbstractBounds<RowPosition> range;
- private ByteBuffer lastSeenKey;
- private final ByteBuffer startKey;
- private final ByteBuffer endKey;
-
- private Iterator<IColumn> currentIndexKeyData = null;
- private final QueryPath path;
-
- private final IndexExpression expression;
- private final ExtendedFilter filter;
- protected final ColumnFamilyStore indexCfs;
- private final ColumnFamilyStore baseCfs;
- private final boolean rightRangeIsNotMinimum;
- protected DecoratedKey curIndexKey;
-
- private final int rowsPerQuery;
- private int columnsRead;
-
-
- public MultiRowIndexSearcherIterator(IndexExpression expression,
- ColumnFamilyStore baseCfs,
- ColumnFamilyStore indexCfs,
- ExtendedFilter filter,
- AbstractBounds<RowPosition> range)
- {
- this.expression = expression;
- this.baseCfs = baseCfs;
- this.range = range;
- this.filter = filter;
- this.indexCfs = indexCfs;
-
- /*
- * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
- * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
- * possible key having a given token. A fix would be to actually store the token along the key in the
- * indexed row.
- */
- startKey = range.left instanceof DecoratedKey ? ((DecoratedKey) range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- endKey = range.right instanceof DecoratedKey ? ((DecoratedKey) range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
- int meanColumns = Math.max(indexCfs.getMeanColumns(), 1);
-
- // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
- rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2);
- rightRangeIsNotMinimum = !range.right.isMinimum(baseCfs.partitioner);
- path = new QueryPath(baseCfs.columnFamily);
-
- }
-
- /**
- * This function should return indexCfs keys in order they would be scanned by searcher
- * @return next key for scanning of null if endOfData
- */
- protected abstract DecoratedKey nextIndexKey();
-
- /**
- * resets internal state preparing for next indexCfs row scan.
- */
- protected void resetState()
- {
- curIndexKey = nextIndexKey();
- currentIndexKeyData = EMPTY_ITERATOR;
- lastSeenKey = startKey;
- columnsRead = Integer.MAX_VALUE;
- }
-
- protected Row computeNext()
- {
- if (currentIndexKeyData == null) // this is first call. Initialize
- resetState();
-
- Row result = null;
- while (result == null && curIndexKey != null) // curIndexKey would be null when endOfData is reached
- {
- if (!currentIndexKeyData.hasNext()) // we've finished scanning row page
- {
- if (columnsRead < rowsPerQuery) // previously we've read less then we queried. No more pages to read within this row
- {
- logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery);
- resetState();
- }
- else
- {
- if (logger.isDebugEnabled())
- logger.debug(String.format("Scanning index %s starting with %s",
- expressionString(expression), baseCfs.metadata.getKeyValidator().getString(startKey)));
-
- QueryFilter indexFilter = QueryFilter.getSliceFilter(curIndexKey,
- new QueryPath(indexCfs.getColumnFamilyName()),
- lastSeenKey,
- endKey,
- false,
- rowsPerQuery);
-
- ColumnFamily indexRow = indexCfs.getColumnFamily(indexFilter); //get next row page
-
- if (indexRow != null)
- {
- Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
- columnsRead = sortedColumns.size();
- currentIndexKeyData = sortedColumns.iterator();
- IColumn firstColumn = sortedColumns.iterator().next();
-
- // Paging is racy, so it is possible the first column_name of a page is not the last seen one.
- if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
- {
- // skip the row we already saw w/ the last page of results
- currentIndexKeyData.next();
- logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
- }
- else if (range instanceof Range && currentIndexKeyData.hasNext() && firstColumn.name().equals(startKey))
- {
- // skip key excluded by range
- currentIndexKeyData.next();
- logger.debug("Skipping first key as range excludes it {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
- }
- }
- else // page is empty, nothing to scan within this row
- {
- columnsRead = 0;
- currentIndexKeyData = EMPTY_ITERATOR;
- }
- }
- }
-
-
- while (result == null && currentIndexKeyData.hasNext()) // rolling through columns in page
- {
- IColumn column = currentIndexKeyData.next();
- lastSeenKey = column.name();
-
- if (column.isMarkedForDelete())
- {
- logger.debug("Skipping {}", column);
- continue;
- }
-
- DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
-
- if (rightRangeIsNotMinimum && range.right.compareTo(dk) < 0) // rightRangeIsNotMinimum is required to serve ring cycles
- {
- logger.debug("Reached end of assigned scan range");
- resetState();
- }
- else if (range.contains(dk))
- {
- logger.debug("Returning index hit for {}", dk);
- ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
-
- // While the column family we'll get in the end should contains the primary clause column_name,
- // the initialFilter may not have found it and can thus be null
- if (data == null)
- data = ColumnFamily.create(baseCfs.metadata);
-
- result = new Row(dk, data);
- }
- else
- {
- logger.debug("Skipping entry {} outside of assigned scan range", dk.token);
- }
- }
- }
-
- return result == null ? endOfData() : result;
- }
-
- private String expressionString(IndexExpression expr)
- {
- return String.format("'%s.%s %s %s'",
- baseCfs.columnFamily,
- baseCfs.getComparator().getString(expr.column_name),
- expr.op,
- baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
- }
-
- public void close() throws IOException
- {}
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 0914c64..bd4be7e 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -17,18 +17,20 @@
*/
package org.apache.cassandra.db.index.keys;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.MultiRowIndexSearcherIterator;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +66,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
return best;
}
+ private String expressionString(IndexExpression expr)
+ {
+ return String.format("'%s.%s %s %s'",
+ baseCfs.columnFamily,
+ baseCfs.getComparator().getString(expr.column_name),
+ expr.op,
+ baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
+ }
+
public boolean isIndexing(List<IndexExpression> clause)
{
return highestSelectivityPredicate(clause) != null;
@@ -84,32 +95,115 @@ public class KeysSearcher extends SecondaryIndexSearcher
// TODO: allow merge join instead of just one index + loop
final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
-
if (logger.isDebugEnabled())
logger.debug("Primary scan clause is " + baseCfs.getComparator().getString(primary.column_name));
-
assert index != null;
- return new KeysMultiRowIndexSearcherIterator(primary, filter, range, indexManager, index);
- }
+ final DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value);
- public class KeysMultiRowIndexSearcherIterator extends MultiRowIndexSearcherIterator
- {
- final DecoratedKey indexKey;
+ /*
+ * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+ * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
+ * possible key having a given token. A fix would be to actually store the token along the key in the
+ * indexed row.
+ */
+ final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
- public KeysMultiRowIndexSearcherIterator(IndexExpression expression,
- ExtendedFilter filter,
- AbstractBounds<RowPosition> range,
- SecondaryIndexManager indexManager,
- SecondaryIndex index)
+ return new ColumnFamilyStore.AbstractScanIterator()
{
- super(expression, baseCfs, index.getIndexCfs(), filter, range);
- indexKey = indexManager.getIndexKeyFor(expression.column_name, expression.value);
- }
+ private ByteBuffer lastSeenKey = startKey;
+ private Iterator<IColumn> indexColumns;
+ private final QueryPath path = new QueryPath(baseCfs.columnFamily);
+ private int columnsRead = Integer.MAX_VALUE;
- @Override
- protected final DecoratedKey nextIndexKey()
- {
- return curIndexKey == null ? indexKey : null; // keys index always scan single row in indexCfs
- }
+ protected Row computeNext()
+ {
+ int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1);
+ // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
+ int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2);
+ while (true)
+ {
+ if (indexColumns == null || !indexColumns.hasNext())
+ {
+ if (columnsRead < rowsPerQuery)
+ {
+ logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery);
+ return endOfData();
+ }
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Scanning index %s starting with %s",
+ expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
+
+ QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+ new QueryPath(index.getIndexCfs().getColumnFamilyName()),
+ lastSeenKey,
+ endKey,
+ false,
+ rowsPerQuery);
+ ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
+ logger.debug("fetched {}", indexRow);
+ if (indexRow == null)
+ {
+ logger.debug("no data, all done");
+ return endOfData();
+ }
+
+ Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+ columnsRead = sortedColumns.size();
+ indexColumns = sortedColumns.iterator();
+ IColumn firstColumn = sortedColumns.iterator().next();
+
+ // Paging is racy, so it is possible the first column of a page is not the last seen one.
+ if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
+ {
+ // skip the row we already saw w/ the last page of results
+ indexColumns.next();
+ columnsRead--;
+ logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
+ }
+ else if (range instanceof Range && indexColumns.hasNext() && firstColumn.name().equals(startKey))
+ {
+ // skip key excluded by range
+ indexColumns.next();
+ columnsRead--;
+ logger.debug("Skipping first key as range excludes it");
+ }
+ }
+
+ while (indexColumns.hasNext())
+ {
+ IColumn column = indexColumns.next();
+ lastSeenKey = column.name();
+ if (column.isMarkedForDelete())
+ {
+ logger.debug("skipping {}", column.name());
+ continue;
+ }
+
+ DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
+ if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
+ {
+ logger.debug("Reached end of assigned scan range");
+ return endOfData();
+ }
+ if (!range.contains(dk))
+ {
+ logger.debug("Skipping entry {} outside of assigned scan range", dk.token);
+ continue;
+ }
+
+ logger.debug("Returning index hit for {}", dk);
+ ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
+ // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
+ if (data == null)
+ data = ColumnFamily.create(baseCfs.metadata);
+ return new Row(dk, data);
+ }
+ }
+ }
+
+ public void close() throws IOException {}
+ };
}
}