You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/03/19 16:18:52 UTC

[3/3] git commit: Revert "fix KEYS index from skipping results" Needs test case + backport to 1.0.9

Revert "fix KEYS index from skipping results"
Needs test case + backport to 1.0.9


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e2706a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e2706a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e2706a7

Branch: refs/heads/cassandra-1.1
Commit: 0e2706a7f1eec0ba161177002e6e25bb58678607
Parents: fe98003
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Mar 19 10:18:01 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Mar 19 10:18:01 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 -
 .../db/index/MultiRowIndexSearcherIterator.java    |  225 ---------------
 .../cassandra/db/index/keys/KeysSearcher.java      |  136 ++++++++--
 3 files changed, 115 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ddeb4c4..cb7a9c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,6 @@
 1.1.1-dev
  * optimize commitlog checksumming (CASSANDRA-3610)
  * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261)
- * fix KEYS index from skipping results (CASSANDRA-3996)
 
 
 1.1-dev

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java b/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
deleted file mode 100644
index ba7a023..0000000
--- a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.thrift.IndexExpression;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-
-/**
- * This class is a general searcher that visits rows returned by nextIndexKey();
- */
-public abstract class MultiRowIndexSearcherIterator extends ColumnFamilyStore.AbstractScanIterator
-{
-    private static final Logger logger = LoggerFactory.getLogger(MultiRowIndexSearcherIterator.class);
-    private static final Iterator<IColumn> EMPTY_ITERATOR = Collections.<IColumn>emptyList().iterator();
-
-    /* keys within a row */
-    protected final AbstractBounds<RowPosition> range;
-    private ByteBuffer lastSeenKey;
-    private final ByteBuffer startKey;
-    private final ByteBuffer endKey;
-
-    private Iterator<IColumn> currentIndexKeyData = null;
-    private final QueryPath path;
-
-    private final IndexExpression expression;
-    private final ExtendedFilter filter;
-    protected final ColumnFamilyStore indexCfs;
-    private final ColumnFamilyStore baseCfs;
-    private final boolean rightRangeIsNotMinimum;
-    protected DecoratedKey curIndexKey;
-
-    private final int rowsPerQuery;
-    private int columnsRead;
-
-
-    public MultiRowIndexSearcherIterator(IndexExpression expression,
-                                         ColumnFamilyStore baseCfs,
-                                         ColumnFamilyStore indexCfs,
-                                         ExtendedFilter filter,
-                                         AbstractBounds<RowPosition> range)
-    {
-        this.expression = expression;
-        this.baseCfs = baseCfs;
-        this.range = range;
-        this.filter = filter;
-        this.indexCfs = indexCfs;
-
-        /*
-        * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
-        * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
-        * possible key having a given token. A fix would be to actually store the token along the key in the
-        * indexed row.
-        */
-        startKey = range.left instanceof DecoratedKey ? ((DecoratedKey) range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        endKey = range.right instanceof DecoratedKey ? ((DecoratedKey) range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
-
-        int meanColumns = Math.max(indexCfs.getMeanColumns(), 1);
-
-        // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
-        rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2);
-        rightRangeIsNotMinimum = !range.right.isMinimum(baseCfs.partitioner);
-        path = new QueryPath(baseCfs.columnFamily);
-
-    }
-
-    /**
-     * This function should return indexCfs keys in order they would be scanned by searcher
-     * @return next key for scanning of null if endOfData
-     */
-    protected abstract DecoratedKey nextIndexKey();
-
-    /**
-     * resets internal state preparing for next indexCfs row scan.
-     */
-    protected void resetState()
-    {
-        curIndexKey = nextIndexKey();
-        currentIndexKeyData = EMPTY_ITERATOR;
-        lastSeenKey = startKey;
-        columnsRead = Integer.MAX_VALUE;
-    }
-
-    protected Row computeNext()
-    {
-        if (currentIndexKeyData == null) // this is first call. Initialize
-            resetState();
-
-        Row result = null;
-        while (result == null && curIndexKey != null) // curIndexKey would be null when endOfData is reached
-        {
-            if (!currentIndexKeyData.hasNext()) // we've finished scanning row page
-            {
-                if (columnsRead < rowsPerQuery) // previously we've read less then we queried. No more pages to read within this row
-                {
-                    logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery);
-                    resetState();
-                }
-                else
-                {
-                    if (logger.isDebugEnabled())
-                        logger.debug(String.format("Scanning index %s starting with %s",
-                                                   expressionString(expression), baseCfs.metadata.getKeyValidator().getString(startKey)));
-
-                    QueryFilter indexFilter = QueryFilter.getSliceFilter(curIndexKey,
-                                                                         new QueryPath(indexCfs.getColumnFamilyName()),
-                                                                         lastSeenKey,
-                                                                         endKey,
-                                                                         false,
-                                                                         rowsPerQuery);
-
-                    ColumnFamily indexRow = indexCfs.getColumnFamily(indexFilter); //get next row page
-
-                    if (indexRow != null)
-                    {
-                        Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
-                        columnsRead = sortedColumns.size();
-                        currentIndexKeyData = sortedColumns.iterator();
-                        IColumn firstColumn = sortedColumns.iterator().next();
-
-                        // Paging is racy, so it is possible the first column_name of a page is not the last seen one.
-                        if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
-                        {
-                            // skip the row we already saw w/ the last page of results
-                            currentIndexKeyData.next();
-                            logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
-                        }
-                        else if (range instanceof Range && currentIndexKeyData.hasNext() && firstColumn.name().equals(startKey))
-                        {
-                            // skip key excluded by range
-                            currentIndexKeyData.next();
-                            logger.debug("Skipping first key as range excludes it {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
-                        }
-                    }
-                    else // page is empty, nothing to scan within this row
-                    {
-                        columnsRead = 0;
-                        currentIndexKeyData = EMPTY_ITERATOR;
-                    }
-                }
-            }
-
-
-            while (result == null && currentIndexKeyData.hasNext()) // rolling through columns in page
-            {
-                IColumn column = currentIndexKeyData.next();
-                lastSeenKey = column.name();
-
-                if (column.isMarkedForDelete())
-                {
-                    logger.debug("Skipping {}", column);
-                    continue;
-                }
-
-                DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
-
-                if (rightRangeIsNotMinimum && range.right.compareTo(dk) < 0) // rightRangeIsNotMinimum is required to serve ring cycles
-                {
-                    logger.debug("Reached end of assigned scan range");
-                    resetState();
-                }
-                else if (range.contains(dk))
-                {
-                    logger.debug("Returning index hit for {}", dk);
-                    ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
-
-                    // While the column family we'll get in the end should contains the primary clause column_name,
-                    // the initialFilter may not have found it and can thus be null
-                    if (data == null)
-                        data = ColumnFamily.create(baseCfs.metadata);
-
-                    result = new Row(dk, data);
-                }
-                else
-                {
-                    logger.debug("Skipping entry {} outside of assigned scan range", dk.token);
-                }
-            }
-        }
-
-        return result == null ? endOfData() : result;
-    }
-
-    private String expressionString(IndexExpression expr)
-    {
-        return String.format("'%s.%s %s %s'",
-                             baseCfs.columnFamily,
-                             baseCfs.getComparator().getString(expr.column_name),
-                             expr.op,
-                             baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
-    }
-
-    public void close() throws IOException
-    {}
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 0914c64..bd4be7e 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -17,18 +17,20 @@
  */
 package org.apache.cassandra.db.index.keys;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.MultiRowIndexSearcherIterator;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +66,15 @@ public class KeysSearcher extends SecondaryIndexSearcher
         return best;
     }
 
+    private String expressionString(IndexExpression expr)
+    {
+        return String.format("'%s.%s %s %s'",
+                             baseCfs.columnFamily,
+                             baseCfs.getComparator().getString(expr.column_name),
+                             expr.op,
+                             baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value));
+    }
+
     public boolean isIndexing(List<IndexExpression> clause)
     {
         return highestSelectivityPredicate(clause) != null;
@@ -84,32 +95,115 @@ public class KeysSearcher extends SecondaryIndexSearcher
         // TODO: allow merge join instead of just one index + loop
         final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
         final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name);
-
         if (logger.isDebugEnabled())
             logger.debug("Primary scan clause is " + baseCfs.getComparator().getString(primary.column_name));
-
         assert index != null;
-        return new KeysMultiRowIndexSearcherIterator(primary, filter, range, indexManager, index);
-    }
+        final DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value);
 
-    public class KeysMultiRowIndexSearcherIterator extends MultiRowIndexSearcherIterator
-    {
-        final DecoratedKey indexKey;
+        /*
+         * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+         * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small
+         * possible key having a given token. A fix would be to actually store the token along the key in the
+         * indexed row.
+         */
+        final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
-        public KeysMultiRowIndexSearcherIterator(IndexExpression expression,
-                                                 ExtendedFilter filter,
-                                                 AbstractBounds<RowPosition> range,
-                                                 SecondaryIndexManager indexManager,
-                                                 SecondaryIndex index)
+        return new ColumnFamilyStore.AbstractScanIterator()
         {
-            super(expression, baseCfs, index.getIndexCfs(), filter, range);
-            indexKey = indexManager.getIndexKeyFor(expression.column_name, expression.value);
-        }
+            private ByteBuffer lastSeenKey = startKey;
+            private Iterator<IColumn> indexColumns;
+            private final QueryPath path = new QueryPath(baseCfs.columnFamily);
+            private int columnsRead = Integer.MAX_VALUE;
 
-        @Override
-        protected final DecoratedKey nextIndexKey()
-        {
-            return curIndexKey == null ? indexKey : null; // keys index always scan single row in indexCfs
-        }
+            protected Row computeNext()
+            {
+                int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1);
+                // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses
+                int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2);
+                while (true)
+                {
+                    if (indexColumns == null || !indexColumns.hasNext())
+                    {
+                        if (columnsRead < rowsPerQuery)
+                        {
+                            logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery);
+                            return endOfData();
+                        }
+
+                        if (logger.isDebugEnabled())
+                            logger.debug(String.format("Scanning index %s starting with %s",
+                                                       expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey)));
+
+                        QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey,
+                                                                             new QueryPath(index.getIndexCfs().getColumnFamilyName()),
+                                                                             lastSeenKey,
+                                                                             endKey,
+                                                                             false,
+                                                                             rowsPerQuery);
+                        ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter);
+                        logger.debug("fetched {}", indexRow);
+                        if (indexRow == null)
+                        {
+                            logger.debug("no data, all done");
+                            return endOfData();
+                        }
+
+                        Collection<IColumn> sortedColumns = indexRow.getSortedColumns();
+                        columnsRead = sortedColumns.size();
+                        indexColumns = sortedColumns.iterator();
+                        IColumn firstColumn = sortedColumns.iterator().next();
+
+                        // Paging is racy, so it is possible the first column of a page is not the last seen one.
+                        if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name()))
+                        {
+                            // skip the row we already saw w/ the last page of results
+                            indexColumns.next();
+                            columnsRead--;
+                            logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name()));
+                        }
+                        else if (range instanceof Range && indexColumns.hasNext() && firstColumn.name().equals(startKey))
+                        {
+                            // skip key excluded by range
+                            indexColumns.next();
+                            columnsRead--;
+                            logger.debug("Skipping first key as range excludes it");
+                        }
+                    }
+
+                    while (indexColumns.hasNext())
+                    {
+                        IColumn column = indexColumns.next();
+                        lastSeenKey = column.name();
+                        if (column.isMarkedForDelete())
+                        {
+                            logger.debug("skipping {}", column.name());
+                            continue;
+                        }
+
+                        DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey);
+                        if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0)
+                        {
+                            logger.debug("Reached end of assigned scan range");
+                            return endOfData();
+                        }
+                        if (!range.contains(dk))
+                        {
+                            logger.debug("Skipping entry {} outside of assigned scan range", dk.token);
+                            continue;
+                        }
+
+                        logger.debug("Returning index hit for {}", dk);
+                        ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter()));
+                        // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null
+                        if (data == null)
+                            data = ColumnFamily.create(baseCfs.metadata);
+                        return new Row(dk, data);
+                    }
+                 }
+             }
+
+            public void close() throws IOException {}
+        };
     }
 }