You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/01/14 11:55:38 UTC

[1/4] cassandra git commit: Fix potentially returning deleted row with range tombstones

Repository: cassandra
Updated Branches:
  refs/heads/trunk f01b3194e -> 0b723ce74


Fix potentially returning deleted row with range tombstones

patch by slebresne; reviewed by thobbs for CASSANDRA-8558


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2424dd13
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2424dd13
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2424dd13

Branch: refs/heads/trunk
Commit: 2424dd133bd971de4b19e5030619d7be79e261e0
Parents: c5ccdb7
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 14 11:07:22 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 14 11:11:24 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/columniterator/IndexedSliceReader.java   | 36 ++++++++++++++++++--
 2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2424dd13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a711790..92bf422 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Fix potentially returning deleted rows with range tombstone (CASSANDRA-8558)
  * Make sure we unmark compacting after scrub/cleanup etc (CASSANDRA-8548)
  * Check for available disk space before starting a compaction (CASSANDRA-8562)
  * Fix DISTINCT queries with LIMITs or paging when some partitions

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2424dd13/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index b6aa085..ac48fc0 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -327,6 +327,10 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
 
             // If we read blocks in reversed disk order, we may have columns from the previous block to handle.
             // Note that prefetched keeps columns in reversed disk order.
+            // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones
+            // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include
+            // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that
+            // can be mistakenly added this way.
             if (reversed && !prefetched.isEmpty())
             {
                 boolean gotSome = false;
@@ -340,8 +344,22 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                     if (isColumnBeforeSliceStart(prefetchedCol))
                     {
                         inSlice = false;
-                        if (!setNextSlice())
-                            return false;
+
+                        // As explained above, we add RT unconditionally
+                        if (prefetchedCol instanceof RangeTombstone)
+                        {
+                            blockColumns.addLast(prefetched.poll());
+                            gotSome = true;
+                            continue;
+                        }
+
+                        // Otherwise, we either move to the next slice or, if we have none (which can happen
+                        // because we unwind prefetched no matter what due to RT), we skip the cell
+                        if (hasMoreSlice())
+                            setNextSlice();
+                        else
+                            prefetched.poll();
+
                     }
                     // col is within slice, all columns
                     // (we go in reverse, so as soon as we are in a slice, no need to check
@@ -416,12 +434,19 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 // (If in slice, don't bother checking that until we change slice)
                 if (!inSlice && isColumnBeforeSliceStart(column))
                 {
-                    if (reversed)
+                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
+                    // stops before our slice start.
+                    if (column instanceof RangeTombstone && !isBeforeSliceStart(((RangeTombstone)column).max))
+                    {
+                        addColumn(column);
+                    }
+                    else if (reversed)
                     {
                         // the next slice select columns that are before the current one, so it may
                         // match this column, so keep it around.
                         prefetched.addFirst(column);
                     }
+
                     column = null;
                 }
                 // col is within slice
@@ -492,6 +517,11 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 // (If in slice, don't bother checking that until we change slice)
                 if (!inSlice && isColumnBeforeSliceStart(column))
                 {
+                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
+                    // stops before our slice start.
+                    if (column instanceof RangeTombstone && !isBeforeSliceStart(((RangeTombstone)column).max))
+                        addColumn(column);
+
                     column = null;
                     continue;
                 }


[4/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by sl...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
	src/java/org/apache/cassandra/db/AtomDeserializer.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b723ce7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b723ce7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b723ce7

Branch: refs/heads/trunk
Commit: 0b723ce74d97fb2c75c8b685de9996b26ddfe683
Parents: f01b319 1c9c47d
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 14 11:55:31 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 14 11:55:31 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../SingleColumnPrimaryKeyRestrictions.java     | 13 +++---
 .../apache/cassandra/db/AtomDeserializer.java   | 34 +++++++++++----
 .../cassandra/db/composites/CellNameType.java   |  2 +-
 .../sstable/format/big/IndexedSliceReader.java  | 46 ++++++++++++++++++--
 .../format/big/SSTableNamesIterator.java        | 25 ++++++++---
 .../cassandra/cql3/RangeDeletionTest.java       | 35 +++++++++++++++
 7 files changed, 131 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
index d2a3885,0000000..e109036
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnPrimaryKeyRestrictions.java
@@@ -1,308 -1,0 +1,305 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3.restrictions;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.statements.Bound;
 +import org.apache.cassandra.db.IndexExpression;
 +import org.apache.cassandra.db.composites.CBuilder;
 +import org.apache.cassandra.db.composites.CType;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.composites.Composite.EOC;
 +import org.apache.cassandra.db.composites.Composites;
 +import org.apache.cassandra.db.composites.CompositesBuilder;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 +
 +/**
 + * A set of single column restrictions on a primary key part (partition key or clustering key).
 + */
 +final class SingleColumnPrimaryKeyRestrictions extends AbstractPrimaryKeyRestrictions
 +{
 +    /**
 +     * The restrictions.
 +     */
 +    private final SingleColumnRestrictions restrictions;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to an EQ, <code>false</code> otherwise.
 +     */
 +    private boolean eq;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to an IN, <code>false</code> otherwise.
 +     */
 +    private boolean in;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to a Slice, <code>false</code> otherwise.
 +     */
 +    private boolean slice;
 +
 +    /**
 +     * <code>true</code> if the restrictions are corresponding to a Contains, <code>false</code> otherwise.
 +     */
 +    private boolean contains;
 +
 +    public SingleColumnPrimaryKeyRestrictions(CType ctype)
 +    {
 +        super(ctype);
 +        this.restrictions = new SingleColumnRestrictions();
 +        this.eq = true;
 +    }
 +
 +    private SingleColumnPrimaryKeyRestrictions(SingleColumnPrimaryKeyRestrictions primaryKeyRestrictions,
 +                                               SingleColumnRestriction restriction) throws InvalidRequestException
 +    {
 +        super(primaryKeyRestrictions.ctype);
 +        this.restrictions = primaryKeyRestrictions.restrictions.addRestriction(restriction);
 +
 +        if (!primaryKeyRestrictions.isEmpty())
 +        {
 +            ColumnDefinition lastColumn = primaryKeyRestrictions.restrictions.lastColumn();
 +            ColumnDefinition newColumn = restriction.getColumnDef();
 +
 +            checkFalse(primaryKeyRestrictions.isSlice() && newColumn.position() > lastColumn.position(),
 +                       "Clustering column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
 +                       newColumn.name,
 +                       lastColumn.name);
 +
 +            if (newColumn.position() < lastColumn.position())
 +                checkFalse(restriction.isSlice(),
 +                           "PRIMARY KEY column \"%s\" cannot be restricted (preceding column \"%s\" is restricted by a non-EQ relation)",
 +                           restrictions.nextColumn(newColumn).name,
 +                           newColumn.name);
 +        }
 +
 +        if (restriction.isSlice() || primaryKeyRestrictions.isSlice())
 +            this.slice = true;
 +        else if (restriction.isContains() || primaryKeyRestrictions.isContains())
 +            this.contains = true;
 +        else if (restriction.isIN())
 +            this.in = true;
 +        else
 +            this.eq = true;
 +    }
 +
 +    @Override
 +    public boolean isSlice()
 +    {
 +        return slice;
 +    }
 +
 +    @Override
 +    public boolean isEQ()
 +    {
 +        return eq;
 +    }
 +
 +    @Override
 +    public boolean isIN()
 +    {
 +        return in;
 +    }
 +
 +    @Override
 +    public boolean isOnToken()
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean isContains()
 +    {
 +        return contains;
 +    }
 +
 +    @Override
 +    public boolean isMultiColumn()
 +    {
 +        return false;
 +    }
 +
 +    @Override
 +    public boolean usesFunction(String ksName, String functionName)
 +    {
 +        return restrictions.usesFunction(ksName, functionName);
 +    }
 +
 +    @Override
 +    public PrimaryKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
 +    {
 +        if (restriction.isMultiColumn())
 +        {
 +            checkTrue(isEmpty(),
 +                      "Mixing single column relations and multi column relations on clustering columns is not allowed");
 +            return (PrimaryKeyRestrictions) restriction;
 +        }
 +
 +        if (restriction.isOnToken())
 +        {
 +            if (isEmpty())
 +                return (PrimaryKeyRestrictions) restriction;
 +
 +            return new TokenFilter(this, (TokenRestriction) restriction);
 +        }
 +
 +        return new SingleColumnPrimaryKeyRestrictions(this, (SingleColumnRestriction) restriction);
 +    }
 +
 +    @Override
 +    public List<Composite> valuesAsComposites(QueryOptions options) throws InvalidRequestException
 +    {
 +        CompositesBuilder builder = new CompositesBuilder(ctype.builder(), ctype);
 +        for (ColumnDefinition def : restrictions.getColumnDefs())
 +        {
 +            Restriction r = restrictions.getRestriction(def);
 +            assert !r.isSlice();
 +
 +            List<ByteBuffer> values = r.values(options);
 +
 +            if (values.isEmpty())
 +                return Collections.emptyList();
 +
 +            builder.addEachElementToAll(values);
 +            checkFalse(builder.containsNull(), "Invalid null value for column %s", def.name);
 +        }
 +
 +        return builder.build();
 +    }
 +
 +    @Override
 +    public List<Composite> boundsAsComposites(Bound bound, QueryOptions options) throws InvalidRequestException
 +    {
 +        CBuilder builder = ctype.builder();
 +        List<ColumnDefinition> defs = new ArrayList<>(restrictions.getColumnDefs());
 +
 +        CompositesBuilder compositeBuilder = new CompositesBuilder(builder, ctype);
 +        // The end-of-component of composite doesn't depend on whether the
 +        // component type is reversed or not (i.e. the ReversedType is applied
 +        // to the component comparator but not to the end-of-component itself),
 +        // it only depends on whether the slice is reversed
 +        int keyPosition = 0;
 +        for (ColumnDefinition def : defs)
 +        {
 +            // In a restriction, we always have Bound.START < Bound.END for the "base" comparator.
 +            // So if we're doing a reverse slice, we must inverse the bounds when giving them as start and end of the slice filter.
 +            // But if the actual comparator itself is reversed, we must inversed the bounds too.
 +            Bound b = !def.isReversedType() ? bound : bound.reverse();
 +            Restriction r = restrictions.getRestriction(def);
 +            if (keyPosition != def.position() || r.isContains())
-             {
-                 EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
-                 return compositeBuilder.buildWithEOC(eoc);
-             }
++                return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
++
 +            if (r.isSlice())
 +            {
 +                if (!r.hasBound(b))
 +                {
 +                    // There wasn't any non EQ relation on that key, we select all records having the preceding component as prefix.
 +                    // For composites, if there was preceding component and we're computing the end, we must change the last component
 +                    // End-Of-Component, otherwise we would be selecting only one record.
-                     EOC eoc = !compositeBuilder.isEmpty() && bound.isEnd() ? EOC.END : EOC.NONE;
-                     return compositeBuilder.buildWithEOC(eoc);
++                    return compositeBuilder.buildWithEOC(bound.isEnd() ? EOC.END : EOC.START);
 +                }
 +
 +                ByteBuffer value = checkNotNull(r.bounds(b, options).get(0), "Invalid null clustering key part %s", r);
 +                compositeBuilder.addElementToAll(value);
 +                Composite.EOC eoc = eocFor(r, bound, b);
 +                return compositeBuilder.buildWithEOC(eoc);
 +            }
 +
 +            List<ByteBuffer> values = r.values(options);
 +
 +            if (values.isEmpty())
 +                return Collections.emptyList();
 +
 +            compositeBuilder.addEachElementToAll(values);
 +
 +            checkFalse(compositeBuilder.containsNull(), "Invalid null clustering key part %s", def.name);
 +            keyPosition++;
 +        }
 +        // Means no relation at all or everything was an equal
 +        // Note: if the builder is "full", there is no need to use the end-of-component bit. For columns selection,
 +        // it would be harmless to do it. However, we use this method got the partition key too. And when a query
 +        // with 2ndary index is done, and with the the partition provided with an EQ, we'll end up here, and in that
 +        // case using the eoc would be bad, since for the random partitioner we have no guarantee that
 +        // prefix.end() will sort after prefix (see #5240).
-         EOC eoc = bound.isEnd() && compositeBuilder.hasRemaining() ? EOC.END : EOC.NONE;
++        EOC eoc = !compositeBuilder.hasRemaining() ? EOC.NONE : (bound.isEnd() ? EOC.END : EOC.START);
 +        return compositeBuilder.buildWithEOC(eoc);
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
 +    {
 +        return Composites.toByteBuffers(valuesAsComposites(options));
 +    }
 +
 +    @Override
 +    public List<ByteBuffer> bounds(Bound b, QueryOptions options) throws InvalidRequestException
 +    {
 +        return Composites.toByteBuffers(boundsAsComposites(b, options));
 +    }
 +
 +    private static Composite.EOC eocFor(Restriction r, Bound eocBound, Bound inclusiveBound)
 +    {
 +        if (eocBound.isStart())
 +            return r.isInclusive(inclusiveBound) ? Composite.EOC.NONE : Composite.EOC.END;
 +
 +        return r.isInclusive(inclusiveBound) ? Composite.EOC.END : Composite.EOC.START;
 +    }
 +
 +    @Override
 +    public boolean hasBound(Bound b)
 +    {
 +        if (isEmpty())
 +            return false;
 +        return restrictions.lastRestriction().hasBound(b);
 +    }
 +
 +    @Override
 +    public boolean isInclusive(Bound b)
 +    {
 +        if (isEmpty())
 +            return false;
 +        return restrictions.lastRestriction().isInclusive(b);
 +    }
 +
 +    @Override
 +    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
 +    {
 +        return restrictions.hasSupportingIndex(indexManager);
 +    }
 +
 +    @Override
 +    public void addIndexExpressionTo(List<IndexExpression> expressions, QueryOptions options) throws InvalidRequestException
 +    {
 +        restrictions.addIndexExpressionTo(expressions, options);
 +    }
 +
 +    @Override
 +    public Collection<ColumnDefinition> getColumnDefs()
 +    {
 +        return restrictions.getColumnDefs();
 +    }
- }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomDeserializer.java
index 0c43422,a103647..c71321c
--- a/src/java/org/apache/cassandra/db/AtomDeserializer.java
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@@ -41,9 -40,13 +41,13 @@@ public class AtomDeserialize
      private final DataInput in;
      private final ColumnSerializer.Flag flag;
      private final int expireBefore;
 -    private final Descriptor.Version version;
 +    private final Version version;
  
+     // The "flag" for the next name (which correspond to the "masks" in ColumnSerializer) if it has been
+     // read already, Integer.MIN_VALUE otherwise;
+     private int nextFlags = Integer.MIN_VALUE;
+ 
 -    public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
 +    public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version)
      {
          this.type = type;
          this.nameDeserializer = type.newDeserializer(in);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/db/composites/CellNameType.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
index a69cff9,0000000..43fd046
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/IndexedSliceReader.java
@@@ -1,500 -1,0 +1,540 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
 +import java.util.ArrayDeque;
 +import java.util.Deque;
 +import java.util.List;
 +
 +import com.google.common.collect.AbstractIterator;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.composites.CellNameType;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.IndexHelper;
 +import org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +/**
 + * This is a reader that finds the block for a starting column and returns blocks before/after it for each next call.
 + * This function assumes that the CF is sorted by name and exploits the name index.
 + */
 +class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
 +{
 +    private final ColumnFamily emptyColumnFamily;
 +
 +    private final SSTableReader sstable;
 +    private final List<IndexHelper.IndexInfo> indexes;
 +    private final FileDataInput originalInput;
 +    private FileDataInput file;
 +    private final boolean reversed;
 +    private final ColumnSlice[] slices;
 +    private final BlockFetcher fetcher;
 +    private final Deque<OnDiskAtom> blockColumns = new ArrayDeque<OnDiskAtom>();
 +    private final CellNameType comparator;
 +
 +    // Holds range tombstone in reverse queries. See addColumn()
 +    private final Deque<OnDiskAtom> rangeTombstonesReversed;
 +
 +    /**
 +     * This slice reader assumes that slices are sorted correctly, e.g. that for forward lookup slices are in
 +     * lexicographic order of start elements and that for reverse lookup they are in reverse lexicographic order of
 +     * finish (reverse start) elements. i.e. forward: [a,b],[d,e],[g,h] reverse: [h,g],[e,d],[b,a]. This reader also
 +     * assumes that validation has been performed in terms of intervals (no overlapping intervals).
 +     */
 +    IndexedSliceReader(SSTableReader sstable, RowIndexEntry indexEntry, FileDataInput input, ColumnSlice[] slices, boolean reversed)
 +    {
 +        Tracing.trace("Seeking to partition indexed section in data file");
 +        this.sstable = sstable;
 +        this.originalInput = input;
 +        this.reversed = reversed;
 +        this.slices = slices;
 +        this.comparator = sstable.metadata.comparator;
 +        this.rangeTombstonesReversed = reversed ? new ArrayDeque<OnDiskAtom>() : null;
 +
 +        try
 +        {
 +            this.indexes = indexEntry.columnsIndex();
 +            emptyColumnFamily = ArrayBackedSortedColumns.factory.create(sstable.metadata);
 +            if (indexes.isEmpty())
 +            {
 +                setToRowStart(indexEntry, input);
 +                emptyColumnFamily.delete(DeletionTime.serializer.deserialize(file));
 +                fetcher = new SimpleBlockFetcher();
 +            }
 +            else
 +            {
 +                emptyColumnFamily.delete(indexEntry.deletionTime());
 +                fetcher = new IndexedBlockFetcher(indexEntry.position);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, file.getPath());
 +        }
 +    }
 +
 +    /**
 +     * Sets the seek position to the start of the row for column scanning.
 +     */
 +    private void setToRowStart(RowIndexEntry rowEntry, FileDataInput in) throws IOException
 +    {
 +        if (in == null)
 +        {
 +            this.file = sstable.getFileDataInput(rowEntry.position);
 +        }
 +        else
 +        {
 +            this.file = in;
 +            in.seek(rowEntry.position);
 +        }
 +        sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
 +    }
 +
 +    public ColumnFamily getColumnFamily()
 +    {
 +        return emptyColumnFamily;
 +    }
 +
 +    public DecoratedKey getKey()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    protected OnDiskAtom computeNext()
 +    {
 +        while (true)
 +        {
 +            if (reversed)
 +            {
 +                // Return all tombstone for the block first (see addColumn() below)
 +                OnDiskAtom column = rangeTombstonesReversed.poll();
 +                if (column != null)
 +                    return column;
 +            }
 +
 +            OnDiskAtom column = blockColumns.poll();
 +            if (column == null)
 +            {
 +                if (!fetcher.fetchMoreData())
 +                    return endOfData();
 +            }
 +            else
 +            {
 +                return column;
 +            }
 +        }
 +    }
 +
 +    public void close() throws IOException
 +    {
 +        if (originalInput == null && file != null)
 +            file.close();
 +    }
 +
 +    protected void addColumn(OnDiskAtom col)
 +    {
 +        if (reversed)
 +        {
 +            /*
 +             * We put range tomstone markers at the beginning of the range they delete. But for reversed queries,
 +             * the caller still need to know about a RangeTombstone before it sees any column that it covers.
 +             * To make that simple, we keep said tombstones separate and return them all before any column for
 +             * a given block.
 +             */
 +            if (col instanceof RangeTombstone)
 +                rangeTombstonesReversed.addFirst(col);
 +            else
 +                blockColumns.addFirst(col);
 +        }
 +        else
 +        {
 +            blockColumns.addLast(col);
 +        }
 +    }
 +
 +    private abstract class BlockFetcher
 +    {
 +        protected int currentSliceIdx;
 +
 +        protected BlockFetcher(int sliceIdx)
 +        {
 +            this.currentSliceIdx = sliceIdx;
 +        }
 +
 +        /*
 +         * Return the smallest key selected by the current ColumnSlice.
 +         */
 +        protected Composite currentStart()
 +        {
 +            return reversed ? slices[currentSliceIdx].finish : slices[currentSliceIdx].start;
 +        }
 +
 +        /*
 +         * Return the biggest key selected by the current ColumnSlice.
 +         */
 +        protected Composite currentFinish()
 +        {
 +            return reversed ? slices[currentSliceIdx].start : slices[currentSliceIdx].finish;
 +        }
 +
 +        protected abstract boolean setNextSlice();
 +
 +        protected abstract boolean fetchMoreData();
 +
 +        protected boolean isColumnBeforeSliceStart(OnDiskAtom column)
 +        {
 +            return isBeforeSliceStart(column.name());
 +        }
 +
 +        protected boolean isBeforeSliceStart(Composite name)
 +        {
 +            Composite start = currentStart();
 +            return !start.isEmpty() && comparator.compare(name, start) < 0;
 +        }
 +
 +        protected boolean isColumnBeforeSliceFinish(OnDiskAtom column)
 +        {
 +            Composite finish = currentFinish();
 +            return finish.isEmpty() || comparator.compare(column.name(), finish) <= 0;
 +        }
 +
 +        protected boolean isAfterSliceFinish(Composite name)
 +        {
 +            Composite finish = currentFinish();
 +            return !finish.isEmpty() && comparator.compare(name, finish) > 0;
 +        }
 +    }
 +
 +    private class IndexedBlockFetcher extends BlockFetcher
 +    {
 +        // where this row starts
 +        private final long columnsStart;
 +
 +        // the index entry for the next block to deserialize
 +        private int nextIndexIdx = -1;
 +
 +        // index of the last block we've read from disk;
 +        private int lastDeserializedBlock = -1;
 +
 +        // For reversed, keep columns at the beginning of the last deserialized block that
 +        // may still match a slice
 +        private final Deque<OnDiskAtom> prefetched;
 +
 +        public IndexedBlockFetcher(long columnsStart)
 +        {
 +            super(-1);
 +            this.columnsStart = columnsStart;
 +            this.prefetched = reversed ? new ArrayDeque<OnDiskAtom>() : null;
 +            setNextSlice();
 +        }
 +
 +        protected boolean setNextSlice()
 +        {
 +            while (++currentSliceIdx < slices.length)
 +            {
 +                nextIndexIdx = IndexHelper.indexFor(slices[currentSliceIdx].start, indexes, comparator, reversed, nextIndexIdx);
 +                if (nextIndexIdx < 0 || nextIndexIdx >= indexes.size())
 +                    // no index block for that slice
 +                    continue;
 +
 +                // Check if we can exclude this slice entirely from the index
 +                IndexInfo info = indexes.get(nextIndexIdx);
 +                if (reversed)
 +                {
 +                    if (!isBeforeSliceStart(info.lastName))
 +                        return true;
 +                }
 +                else
 +                {
 +                    if (!isAfterSliceFinish(info.firstName))
 +                        return true;
 +                }
 +            }
 +            nextIndexIdx = -1;
 +            return false;
 +        }
 +
 +        protected boolean hasMoreSlice()
 +        {
 +            return currentSliceIdx < slices.length;
 +        }
 +
 +        protected boolean fetchMoreData()
 +        {
 +            if (!hasMoreSlice())
 +                return false;
 +
 +            // If we read blocks in reversed disk order, we may have columns from the previous block to handle.
 +            // Note that prefetched keeps columns in reversed disk order.
++            // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones
++            // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include
++            // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that
++            // can be mistakenly added this way.
 +            if (reversed && !prefetched.isEmpty())
 +            {
++                // Whether we've found anything to return in prefetched
 +                boolean gotSome = false;
 +                // Avoids some comparison when we know it's not useful
 +                boolean inSlice = false;
 +
 +                OnDiskAtom prefetchedCol;
 +                while ((prefetchedCol = prefetched.peek() ) != null)
 +                {
 +                    // col is before slice, we update the slice
 +                    if (isColumnBeforeSliceStart(prefetchedCol))
 +                    {
 +                        inSlice = false;
-                         if (!setNextSlice())
-                             return false;
++
++                        // As explained above, we add RT unconditionally
++                        if (prefetchedCol instanceof RangeTombstone)
++                        {
++                            blockColumns.addLast(prefetched.poll());
++                            gotSome = true;
++                            continue;
++                        }
++
++                        // Otherwise, we either move to the next slice or, if we have none (which can happen
++                        // because we unwind prefetched no matter what due to RT), we skip the cell
++                        if (hasMoreSlice())
++                            setNextSlice();
++                        else
++                            prefetched.poll();
++
 +                    }
 +                    // col is within slice, all columns
 +                    // (we go in reverse, so as soon as we are in a slice, no need to check
 +                    // we're after the slice until we change slice)
 +                    else if (inSlice || isColumnBeforeSliceFinish(prefetchedCol))
 +                    {
 +                        blockColumns.addLast(prefetched.poll());
 +                        gotSome = true;
 +                        inSlice = true;
 +                    }
 +                    // if col is after slice, ignore
 +                    else
 +                    {
 +                        prefetched.poll();
 +                    }
 +                }
 +                if (gotSome)
 +                    return true;
 +            }
 +            try
 +            {
 +                return getNextBlock();
 +            }
 +            catch (IOException e)
 +            {
 +                throw new CorruptSSTableException(e, file.getPath());
 +            }
 +        }
 +
 +        private boolean getNextBlock() throws IOException
 +        {
 +            if (lastDeserializedBlock == nextIndexIdx)
 +            {
 +                if (reversed)
 +                    nextIndexIdx--;
 +                else
 +                    nextIndexIdx++;
 +            }
 +            lastDeserializedBlock = nextIndexIdx;
 +
 +            // Are we done?
 +            if (lastDeserializedBlock < 0 || lastDeserializedBlock >= indexes.size())
 +                return false;
 +
 +            IndexInfo currentIndex = indexes.get(lastDeserializedBlock);
 +
 +            /* seek to the correct offset to the data, and calculate the data size */
 +            long positionToSeek = columnsStart + currentIndex.offset;
 +
 +            // With new promoted indexes, our first seek in the data file will happen at that point.
 +            if (file == null)
 +                file = originalInput == null ? sstable.getFileDataInput(positionToSeek) : originalInput;
 +
 +            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
 +
 +            file.seek(positionToSeek);
 +            FileMark mark = file.mark();
 +
 +            // We remenber when we are whithin a slice to avoid some comparison
 +            boolean inSlice = false;
 +
 +            // scan from index start
 +            while (file.bytesPastMark(mark) < currentIndex.width || deserializer.hasUnprocessed())
 +            {
 +                // col is before slice
 +                // (If in slice, don't bother checking that until we change slice)
 +                Composite start = currentStart();
 +                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
 +                {
++                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
++                    // stops before our slice start.
++                    if (deserializer.nextIsRangeTombstone())
++                    {
++                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++                        if (comparator.compare(rt.max, start) >= 0)
++                            addColumn(rt);
++                        continue;
++                    }
++
 +                    if (reversed)
 +                    {
 +                        // the next slice select columns that are before the current one, so it may
 +                        // match this column, so keep it around.
 +                        prefetched.addFirst(deserializer.readNext());
 +                    }
 +                    else
 +                    {
 +                        deserializer.skipNext();
 +                    }
 +                }
 +                // col is within slice
 +                else
 +                {
 +                    Composite finish = currentFinish();
 +                    if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
 +                    {
 +                        inSlice = true;
 +                        addColumn(deserializer.readNext());
 +                    }
 +                    // col is after slice.
 +                    else
 +                    {
 +                        // When reading forward, if we hit a column that sorts after the current slice, it means we're done with this slice.
 +                        // For reversed, this may either mean that we're done with the current slice, or that we need to read the previous
 +                        // index block. However, we can be sure that we are in the first case though (the current slice is done) if the first
 +                        // columns of the block were not part of the current slice, i.e. if we have columns in prefetched.
 +                        if (reversed && prefetched.isEmpty())
 +                            break;
 +
 +                        if (!setNextSlice())
 +                            break;
 +
 +                        inSlice = false;
 +
 +                        // The next index block now corresponds to the first block that may have columns for the newly set slice.
 +                        // So if it's different from the current block, we're done with this block. And in that case, we know
 +                        // that our prefetched columns won't match.
 +                        if (nextIndexIdx != lastDeserializedBlock)
 +                        {
 +                            if (reversed)
 +                                prefetched.clear();
 +                            break;
 +                        }
 +
 +                        // Even if the next slice may have column in this blocks, if we're reversed, those columns have been
 +                        // prefetched and we're done with that block
 +                        if (reversed)
 +                            break;
 +
 +                        // otherwise, we will deal with that column at the next iteration
 +                    }
 +                }
 +            }
 +            return true;
 +        }
 +    }
 +
 +    private class SimpleBlockFetcher extends BlockFetcher
 +    {
 +        public SimpleBlockFetcher() throws IOException
 +        {
 +            // Since we have to deserialize in order and will read all slices might as well reverse the slices and
 +            // behave as if it was not reversed
 +            super(reversed ? slices.length - 1 : 0);
 +
 +            // We remenber when we are whithin a slice to avoid some comparison
 +            boolean inSlice = false;
 +
 +            AtomDeserializer deserializer = emptyColumnFamily.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
 +            while (deserializer.hasNext())
 +            {
 +                // col is before slice
 +                // (If in slice, don't bother checking that until we change slice)
 +                Composite start = currentStart();
 +                if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
 +                {
-                     deserializer.skipNext();
++                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
++                    // stops before our slice start. Otherwise, we can skip it.
++                    if (deserializer.nextIsRangeTombstone())
++                    {
++                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++                        if (comparator.compare(rt.max, start) >= 0)
++                            addColumn(rt);
++                    }
++                    else
++                    {
++                        deserializer.skipNext();
++                    }
 +                    continue;
 +                }
 +
 +                // col is within slice
 +                Composite finish = currentFinish();
 +                if (finish.isEmpty() || deserializer.compareNextTo(finish) <= 0)
 +                {
 +                    inSlice = true;
 +                    addColumn(deserializer.readNext());
 +                }
 +                // col is after slice. more slices?
 +                else
 +                {
 +                    inSlice = false;
 +                    if (!setNextSlice())
 +                        break;
 +                }
 +            }
 +        }
 +
 +        protected boolean setNextSlice()
 +        {
 +            if (reversed)
 +            {
 +                if (currentSliceIdx <= 0)
 +                    return false;
 +
 +                currentSliceIdx--;
 +            }
 +            else
 +            {
 +                if (currentSliceIdx >= slices.length - 1)
 +                    return false;
 +
 +                currentSliceIdx++;
 +            }
 +            return true;
 +        }
 +
 +        protected boolean fetchMoreData()
 +        {
 +            return false;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b723ce7/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
index 07dc59a,0000000..c51e595
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/SSTableNamesIterator.java
@@@ -1,250 -1,0 +1,265 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
 +import java.util.*;
 +
 +import com.google.common.collect.AbstractIterator;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.CellNameType;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.IndexHelper;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
 +{
 +    private ColumnFamily cf;
 +    private final SSTableReader sstable;
 +    private FileDataInput fileToClose;
 +    private Iterator<OnDiskAtom> iter;
 +    public final SortedSet<CellName> columns;
 +    public final DecoratedKey key;
 +
 +    public SSTableNamesIterator(SSTableReader sstable, DecoratedKey key, SortedSet<CellName> columns)
 +    {
 +        assert columns != null;
 +        this.sstable = sstable;
 +        this.columns = columns;
 +        this.key = key;
 +
 +        RowIndexEntry indexEntry = sstable.getPosition(key, SSTableReader.Operator.EQ);
 +        if (indexEntry == null)
 +            return;
 +
 +        try
 +        {
 +            read(sstable, null, indexEntry);
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +        finally
 +        {
 +            if (fileToClose != null)
 +                FileUtils.closeQuietly(fileToClose);
 +        }
 +    }
 +
 +    public SSTableNamesIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry)
 +    {
 +        assert columns != null;
 +        this.sstable = sstable;
 +        this.columns = columns;
 +        this.key = key;
 +
 +        try
 +        {
 +            read(sstable, file, indexEntry);
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +    }
 +
 +    private FileDataInput createFileDataInput(long position)
 +    {
 +        fileToClose = sstable.getFileDataInput(position);
 +        return fileToClose;
 +    }
 +
 +    private void read(SSTableReader sstable, FileDataInput file, RowIndexEntry indexEntry)
 +    throws IOException
 +    {
 +        List<IndexHelper.IndexInfo> indexList;
 +
 +        // If the entry is not indexed or the index is not promoted, read from the row start
 +        if (!indexEntry.isIndexed())
 +        {
 +            if (file == null)
 +                file = createFileDataInput(indexEntry.position);
 +            else
 +                file.seek(indexEntry.position);
 +
 +            DecoratedKey keyInDisk = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(file));
 +            assert keyInDisk.equals(key) : String.format("%s != %s in %s", keyInDisk, key, file.getPath());
 +        }
 +
 +        indexList = indexEntry.columnsIndex();
 +
 +        if (!indexEntry.isIndexed())
 +        {
 +            ColumnFamilySerializer serializer = ColumnFamily.serializer;
 +            try
 +            {
 +                cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
 +                cf.delete(DeletionTime.serializer.deserialize(file));
 +            }
 +            catch (Exception e)
 +            {
 +                throw new IOException(serializer + " failed to deserialize " + sstable.getColumnFamilyName() + " with " + sstable.metadata + " from " + file, e);
 +            }
 +        }
 +        else
 +        {
 +            cf = ArrayBackedSortedColumns.factory.create(sstable.metadata);
 +            cf.delete(indexEntry.deletionTime());
 +        }
 +
 +        List<OnDiskAtom> result = new ArrayList<OnDiskAtom>();
 +        if (indexList.isEmpty())
 +        {
 +            readSimpleColumns(file, columns, result);
 +        }
 +        else
 +        {
 +            readIndexedColumns(sstable.metadata, file, columns, indexList, indexEntry.position, result);
 +        }
 +
 +        // create an iterator view of the columns we read
 +        iter = result.iterator();
 +    }
 +
 +    private void readSimpleColumns(FileDataInput file, SortedSet<CellName> columnNames, List<OnDiskAtom> result)
 +    {
 +        Iterator<OnDiskAtom> atomIterator = cf.metadata().getOnDiskIterator(file, sstable.descriptor.version);
 +        int n = 0;
 +        while (atomIterator.hasNext())
 +        {
 +            OnDiskAtom column = atomIterator.next();
 +            if (column instanceof Cell)
 +            {
 +                if (columnNames.contains(column.name()))
 +                {
 +                    result.add(column);
 +                    if (++n >= columns.size())
 +                        break;
 +                }
 +            }
 +            else
 +            {
 +                result.add(column);
 +            }
 +        }
 +    }
 +
 +    private void readIndexedColumns(CFMetaData metadata,
 +                                    FileDataInput file,
 +                                    SortedSet<CellName> columnNames,
 +                                    List<IndexHelper.IndexInfo> indexList,
 +                                    long basePosition,
 +                                    List<OnDiskAtom> result)
 +    throws IOException
 +    {
 +        /* get the various column ranges we have to read */
 +        CellNameType comparator = metadata.comparator;
 +        List<IndexHelper.IndexInfo> ranges = new ArrayList<IndexHelper.IndexInfo>();
 +        int lastIndexIdx = -1;
 +        for (CellName name : columnNames)
 +        {
 +            int index = IndexHelper.indexFor(name, indexList, comparator, false, lastIndexIdx);
 +            if (index < 0 || index == indexList.size())
 +                continue;
 +            IndexHelper.IndexInfo indexInfo = indexList.get(index);
 +            // Check the index block does contain the column names and that we haven't inserted this block yet.
 +            if (comparator.compare(name, indexInfo.firstName) < 0 || index == lastIndexIdx)
 +                continue;
 +
 +            ranges.add(indexInfo);
 +            lastIndexIdx = index;
 +        }
 +
 +        if (ranges.isEmpty())
 +            return;
 +
 +        Iterator<CellName> toFetch = columnNames.iterator();
 +        CellName nextToFetch = toFetch.next();
 +        for (IndexHelper.IndexInfo indexInfo : ranges)
 +        {
 +            long positionToSeek = basePosition + indexInfo.offset;
 +
 +            // With new promoted indexes, our first seek in the data file will happen at that point.
 +            if (file == null)
 +                file = createFileDataInput(positionToSeek);
 +
 +            AtomDeserializer deserializer = cf.metadata().getOnDiskDeserializer(file, sstable.descriptor.version);
 +            file.seek(positionToSeek);
 +            FileMark mark = file.mark();
 +            while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null)
 +            {
 +                int cmp = deserializer.compareNextTo(nextToFetch);
-                 if (cmp == 0)
++                if (cmp < 0)
++                {
++                    // If it's a rangeTombstone, then we need to read it and include
++                    // it if it includes our target. Otherwise, we can skip it.
++                    if (deserializer.nextIsRangeTombstone())
++                    {
++                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
++                        if (comparator.compare(rt.max, nextToFetch) >= 0)
++                            result.add(rt);
++                    }
++                    else
++                    {
++                        deserializer.skipNext();
++                    }
++                }
++                else if (cmp == 0)
 +                {
 +                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
 +                    result.add(deserializer.readNext());
-                     continue;
 +                }
- 
-                 deserializer.skipNext();
-                 if (cmp > 0)
++                else
++                {
++                    deserializer.skipNext();
 +                    nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
++                }
 +            }
 +        }
 +    }
 +
 +    public DecoratedKey getKey()
 +    {
 +        return key;
 +    }
 +
 +    public ColumnFamily getColumnFamily()
 +    {
 +        return cf;
 +    }
 +
 +    protected OnDiskAtom computeNext()
 +    {
 +        if (iter == null || !iter.hasNext())
 +            return endOfData();
 +        return iter.next();
 +    }
 +
 +    public void close() throws IOException { }
 +}


[3/4] cassandra git commit: Fix potentially returning deleted row with range tombstones (2.1 version)

Posted by sl...@apache.org.
Fix potentially returning deleted row with range tombstones (2.1 version)

patch by slebresne; reviewed by thobbs for CASSANDRA-8558


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c9c47d2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c9c47d2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c9c47d2

Branch: refs/heads/trunk
Commit: 1c9c47d26f2da98d1a923254858bcde550122445
Parents: 19c54a5
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 14 11:40:57 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 14 11:40:57 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/SelectStatement.java        |  6 +--
 .../apache/cassandra/db/AtomDeserializer.java   | 34 +++++++++++----
 .../db/columniterator/IndexedSliceReader.java   | 46 ++++++++++++++++++--
 .../db/columniterator/SSTableNamesIterator.java | 25 ++++++++---
 .../cassandra/db/composites/CellNameType.java   |  2 +-
 .../cassandra/cql3/RangeDeletionTest.java       | 35 +++++++++++++++
 7 files changed, 129 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e070eaf..175a78a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -52,6 +52,7 @@
  * Log failed host when preparing incremental repair (CASSANDRA-8228)
  * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 Merged from 2.0:
+ * Fix potentially returning deleted rows with range tombstone (CASSANDRA-8558)
  * Check for available disk space before starting a compaction (CASSANDRA-8562)
  * Fix DISTINCT queries with LIMITs or paging when some partitions
    contain only tombstones (CASSANDRA-8490)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 92a9579..4ef554d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -844,7 +844,7 @@ public class SelectStatement implements CQLStatement
                 // For composites, if there was preceding component and we're computing the end, we must change the last component
                 // End-Of-Component, otherwise we would be selecting only one record.
                 Composite prefix = builder.build();
-                return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix);
+                return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix.start());
             }
             if (r.isSlice())
             {
@@ -869,7 +869,7 @@ public class SelectStatement implements CQLStatement
                             throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name));
                         Composite prefix = builder.buildWith(val);
                         // See below for why this
-                        s.add((eocBound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix);
+                        s.add((eocBound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix.start());
                     }
                     return new ArrayList<>(s);
                 }
@@ -887,7 +887,7 @@ public class SelectStatement implements CQLStatement
         // case using the eoc would be bad, since for the random partitioner we have no guarantee that
         // prefix.end() will sort after prefix (see #5240).
         Composite prefix = builder.build();
-        return Collections.singletonList(eocBound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix);
+        return Collections.singletonList(eocBound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix.start());
     }
 
     private static Composite.EOC eocForRelation(Operator op)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java b/src/java/org/apache/cassandra/db/AtomDeserializer.java
index 799ed0e..a103647 100644
--- a/src/java/org/apache/cassandra/db/AtomDeserializer.java
+++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java
@@ -42,6 +42,10 @@ public class AtomDeserializer
     private final int expireBefore;
     private final Descriptor.Version version;
 
+    // The "flag" for the next name (which correspond to the "masks" in ColumnSerializer) if it has been
+    // read already, Integer.MIN_VALUE otherwise;
+    private int nextFlags = Integer.MIN_VALUE;
+
     public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
     {
         this.type = type;
@@ -82,17 +86,30 @@ public class AtomDeserializer
     }
 
     /**
+     * Returns whether the next atom is a range tombstone or not.
+     *
+     * Please note that this should only be called after compareNextTo() has been called.
+     */
+    public boolean nextIsRangeTombstone() throws IOException
+    {
+        nextFlags = in.readUnsignedByte();
+        return (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0;
+    }
+
+    /**
      * Returns the next atom.
      */
     public OnDiskAtom readNext() throws IOException
     {
         Composite name = nameDeserializer.readNext();
         assert !name.isEmpty(); // This would imply hasNext() hasn't been called
-        int b = in.readUnsignedByte();
-        if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
-            return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
-        else
-            return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
+
+        nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : nextFlags;
+        OnDiskAtom atom = (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0
+                        ? type.rangeTombstoneSerializer().deserializeBody(in, name, version)
+                        : type.columnSerializer().deserializeColumnBody(in, (CellName)name, nextFlags, flag, expireBefore);
+        nextFlags = Integer.MIN_VALUE;
+        return atom;
     }
 
     /**
@@ -101,10 +118,11 @@ public class AtomDeserializer
     public void skipNext() throws IOException
     {
         nameDeserializer.skipNext();
-        int b = in.readUnsignedByte();
-        if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
+        nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : nextFlags;
+        if ((nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
             type.rangeTombstoneSerializer().skipBody(in, version);
         else
-            type.columnSerializer().skipColumnBody(in, b);
+            type.columnSerializer().skipColumnBody(in, nextFlags);
+        nextFlags = Integer.MIN_VALUE;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
index 7012321..924e9bc 100644
--- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
+++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java
@@ -290,8 +290,13 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
 
             // If we read blocks in reversed disk order, we may have columns from the previous block to handle.
             // Note that prefetched keeps columns in reversed disk order.
+            // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones
+            // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include
+            // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that
+            // can be mistakenly added this way.
             if (reversed && !prefetched.isEmpty())
             {
+                // Whether we've found anything to return in prefetched
                 boolean gotSome = false;
                 // Avoids some comparison when we know it's not useful
                 boolean inSlice = false;
@@ -303,8 +308,22 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                     if (isColumnBeforeSliceStart(prefetchedCol))
                     {
                         inSlice = false;
-                        if (!setNextSlice())
-                            return false;
+
+                        // As explained above, we add RT unconditionally
+                        if (prefetchedCol instanceof RangeTombstone)
+                        {
+                            blockColumns.addLast(prefetched.poll());
+                            gotSome = true;
+                            continue;
+                        }
+
+                        // Otherwise, we either move to the next slice or, if we have none (which can happen
+                        // because we unwind prefetched no matter what due to RT), we skip the cell
+                        if (hasMoreSlice())
+                            setNextSlice();
+                        else
+                            prefetched.poll();
+
                     }
                     // col is within slice, all columns
                     // (we go in reverse, so as soon as we are in a slice, no need to check
@@ -374,6 +393,16 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 Composite start = currentStart();
                 if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
                 {
+                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
+                    // stops before our slice start.
+                    if (deserializer.nextIsRangeTombstone())
+                    {
+                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
+                        if (comparator.compare(rt.max, start) >= 0)
+                            addColumn(rt);
+                        continue;
+                    }
+
                     if (reversed)
                     {
                         // the next slice select columns that are before the current one, so it may
@@ -451,7 +480,18 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA
                 Composite start = currentStart();
                 if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0)
                 {
-                    deserializer.skipNext();
+                    // If it's a rangeTombstone, then we need to read it and include it unless it's end
+                    // stops before our slice start. Otherwise, we can skip it.
+                    if (deserializer.nextIsRangeTombstone())
+                    {
+                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
+                        if (comparator.compare(rt.max, start) >= 0)
+                            addColumn(rt);
+                    }
+                    else
+                    {
+                        deserializer.skipNext();
+                    }
                     continue;
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
index 224b63f..221f499 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java
@@ -214,16 +214,31 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement
             while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null)
             {
                 int cmp = deserializer.compareNextTo(nextToFetch);
-                if (cmp == 0)
+                if (cmp < 0)
+                {
+                    // If it's a rangeTombstone, then we need to read it and include
+                    // it if it includes our target. Otherwise, we can skip it.
+                    if (deserializer.nextIsRangeTombstone())
+                    {
+                        RangeTombstone rt = (RangeTombstone)deserializer.readNext();
+                        if (comparator.compare(rt.max, nextToFetch) >= 0)
+                            result.add(rt);
+                    }
+                    else
+                    {
+                        deserializer.skipNext();
+                    }
+                }
+                else if (cmp == 0)
                 {
                     nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
                     result.add(deserializer.readNext());
-                    continue;
                 }
-
-                deserializer.skipNext();
-                if (cmp > 0)
+                else
+                {
+                    deserializer.skipNext();
                     nextToFetch = toFetch.hasNext() ? toFetch.next() : null;
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/composites/CellNameType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/composites/CellNameType.java b/src/java/org/apache/cassandra/db/composites/CellNameType.java
index 1e87296..7b4fd36 100644
--- a/src/java/org/apache/cassandra/db/composites/CellNameType.java
+++ b/src/java/org/apache/cassandra/db/composites/CellNameType.java
@@ -197,7 +197,7 @@ public interface CellNameType extends CType
         public boolean hasUnprocessed() throws IOException;
 
         /**
-         * Comparare the next name to read to the provided Composite.
+         * Compare the next name to read to the provided Composite.
          * This does not consume the next name.
          */
         public int compareNextTo(Composite composite) throws IOException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java b/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java
new file mode 100644
index 0000000..b31d0c2
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import org.junit.Test;
+
+public class RangeDeletionTest extends CQLTester
+{
+    @Test
+    public void testCassandra8558() throws Throwable
+    {
+        createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))");
+
+        execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 1, 1);
+        flush();
+        execute("DELETE FROM %s WHERE a=? AND b=?", 1, 1);
+        flush();
+        assertEmpty(execute("SELECT * FROM %s WHERE a=? AND b=? AND c=?", 1, 1, 1));
+    }
+}


[2/4] cassandra git commit: Merge commit '2424dd133bd971de4b19e5030619d7be79e261e0' into cassandra-2.1

Posted by sl...@apache.org.
Merge commit '2424dd133bd971de4b19e5030619d7be79e261e0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/19c54a59
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/19c54a59
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/19c54a59

Branch: refs/heads/trunk
Commit: 19c54a5971888e6bb3815b11fdf0e0d58ec9cbc5
Parents: 739f9ce 2424dd1
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 14 11:13:06 2015 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 14 11:13:06 2015 +0100

----------------------------------------------------------------------

----------------------------------------------------------------------