You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ms...@apache.org on 2016/08/16 22:42:49 UTC

[48/50] [abbrv] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb56193a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb56193a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb56193a

Branch: refs/heads/cassandra-3.8
Commit: bb56193a3f2ac865efb18c9d6944f2927f667771
Parents: 6528fbf 9583b6b
Author: Benjamin Lerer <b....@gmail.com>
Authored: Tue Aug 16 15:29:42 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Tue Aug 16 15:34:35 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |  10 +
 .../cassandra/db/compaction/Scrubber.java       | 169 ++++-
 .../cassandra/db/marshal/ReversedType.java      |  10 -
 .../db/partitions/AbstractBTreePartition.java   |   7 +-
 .../db/partitions/ImmutableBTreePartition.java  |  34 +-
 .../db/rows/UnfilteredRowIterators.java         |  28 +
 .../org/apache/cassandra/cql3/CQLTester.java    |  18 +
 .../validation/entities/SecondaryIndexTest.java | 131 ++++
 .../cql3/validation/operations/DeleteTest.java  | 113 +++
 .../operations/SelectOrderByTest.java           | 719 ++++++++++---------
 .../cql3/validation/operations/SelectTest.java  | 357 ++++++++-
 .../cassandra/db/marshal/ReversedTypeTest.java  |   4 +-
 13 files changed, 1210 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a202755,c421398..7605952
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -49,9 -14,12 +49,10 @@@ Merged from 2.2
   * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
   * Synchronize ThriftServer::stop() (CASSANDRA-12105)
   * Use dedicated thread for JMX notifications (CASSANDRA-12146)
 - * NPE when trying to remove purgable tombstones from result (CASSANDRA-12143)
   * Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
   * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
 - * Don't write shadowed range tombstone (CASSANDRA-12030)
  Merged from 2.1:
+  * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127)
   * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
   * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
   * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 6b8dfd3,0a3ab36..0bd3920
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,55 -13,17 +13,65 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 -2.2.8
++3.0.9
+ =====
+ 
+ Upgrading
+ ---------
 -    - The ReversedType behaviour has been corrected for clustering columns of
 -      BYTES type containing empty value. Scrub should be run on the existing
 -      SSTables containing a descending clustering column of BYTES type to correct
 -      their ordering. See CASSANDRA-12127 for more details.
++   - The ReversedType behaviour has been corrected for clustering columns of
++     BYTES type containing empty value. Scrub should be run on the existing
++     SSTables containing a descending clustering column of BYTES type to correct
++     their ordering. See CASSANDRA-12127 for more details.
+ 
 -2.2.7
 +3.0.8
 +=====
 +
 +Upgrading
 +---------
 +   - Ec2MultiRegionSnitch will no longer automatically set broadcast_rpc_address
 +     to the public instance IP if this property is defined on cassandra.yaml.
 +
 +3.0.7
 +=====
 +
 +Upgrading
 +---------
 +   - A maximum size for SSTables values has been introduced, to prevent out of memory
 +     exceptions when reading corrupt SSTables. This maximum size can be set via
 +     max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
 +     value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
 +     they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
 +
 +Deprecation
 +-----------
 +   - DateTieredCompactionStrategy has been deprecated - new tables should use
 +     TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
 +     cause increased compaction load for a while after the migration so make sure you run
 +     tests before migrating. Read CASSANDRA-9666 for background on this.
 +
 +New features
 +------------
 +   - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
 +     to time series compaction and new tables should use this instead of DTCS. See
 +     CASSANDRA-9666 for details.
 +
 +3.0.6
 +=====
 +
 +New features
 +------------
 +   - JSON timestamps are now in UTC and contain the timezone information, see
 +     CASSANDRA-11137 for more details.
 +
 +3.0.5
 +=====
 +
 +Upgrading
 +---------
 +   - Nothing specific to this release, but please see previous versions upgrading section,
 +     especially if you are upgrading from 2.2.
 +
 +3.0.4
  =====
  
  New features

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 539c4c7,99ee62e..c010891
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -23,11 -23,11 +23,12 @@@ import java.util.*
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Throwables;
 -import com.google.common.collect.AbstractIterator;
  
++import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.io.sstable.*;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@@ -327,6 -312,32 +302,38 @@@ public class Scrubber implements Closea
          }
      }
  
+     @SuppressWarnings("resource")
+     private boolean tryAppend(DecoratedKey prevKey, DecoratedKey key, SSTableRewriter writer)
+     {
 -        // OrderCheckerIterator will check, at iteration time, that the cells are in the proper order. If it detects
 -        // that one cell is out of order, it will stop returning them. The remaining cells will be sorted and added
 -        // to the outOfOrderRows that will be later written to a new SSTable.
 -        OrderCheckerIterator atoms = new OrderCheckerIterator(new SSTableIdentityIterator(sstable, dataFile, key, checkData),
 -                                                              cfs.metadata.comparator.onDiskAtomComparator());
 -        if (prevKey != null && prevKey.compareTo(key) > 0)
++        // OrderCheckerIterator will check, at iteration time, that the rows are in the proper order. If it detects
++        // that one row is out of order, it will stop returning them. The remaining rows will be sorted and added
++        // to the outOfOrder set that will be later written to a new SSTable.
++        OrderCheckerIterator sstableIterator = new OrderCheckerIterator(new RowMergingSSTableIterator(sstable, dataFile, key),
++                                                                        cfs.metadata.comparator);
++
++        try (UnfilteredRowIterator iterator = withValidation(sstableIterator, dataFile.getPath()))
+         {
 -            saveOutOfOrderRow(prevKey, key, atoms);
 -            return false;
 -        }
++            if (prevKey != null && prevKey.compareTo(key) > 0)
++            {
++                saveOutOfOrderRow(prevKey, key, iterator);
++                return false;
++            }
+ 
 -        AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
 -        if (writer.tryAppend(compactedRow) == null)
 -            emptyRows++;
 -        else
 -            goodRows++;
++            if (writer.tryAppend(iterator) == null)
++                emptyRows++;
++            else
++                goodRows++;
++        }
+ 
 -        if (atoms.hasOutOfOrderCells())
 -            saveOutOfOrderRow(key, atoms);
++        if (sstableIterator.hasRowsOutOfOrder())
++        {
++            outputHandler.warn(String.format("Out of order rows found in partition: %s", key));
++            outOfOrder.add(sstableIterator.getRowsOutOfOrder());
++        }
+ 
+         return true;
+     }
+ 
      private void updateIndexKey()
      {
          currentIndexKey = nextIndexKey;
@@@ -469,44 -523,88 +476,146 @@@
      }
  
      /**
 +     * During 2.x migration, under some circumstances rows might have gotten duplicated.
 +     * Merging iterator merges rows with same clustering.
 +     *
 +     * For more details, refer to CASSANDRA-12144.
 +     */
 +    private static class RowMergingSSTableIterator extends SSTableIdentityIterator
 +    {
 +        RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
 +        {
 +            super(sstable, file, key);
 +        }
 +
 +        @Override
 +        protected Unfiltered doCompute()
 +        {
 +            if (!iterator.hasNext())
 +                return endOfData();
 +
 +            Unfiltered next = iterator.next();
 +            if (!next.isRow())
 +                return next;
 +
 +            while (iterator.hasNext())
 +            {
 +                Unfiltered peek = iterator.peek();
 +                // If there was a duplicate row, merge it.
 +                if (next.clustering().equals(peek.clustering()) && peek.isRow())
 +                {
 +                    iterator.next(); // Make sure that the peeked item was consumed.
 +                    next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
 +                }
 +                else
 +                {
 +                    break;
 +                }
 +            }
 +
 +            return next;
 +        }
 +    }
++
++    /**
+      * In some case like CASSANDRA-12127 the cells might have been stored in the wrong order. This decorator check the
+      * cells order and collect the out of order cells to correct the problem.
+      */
 -    private static final class OrderCheckerIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
++    private static final class OrderCheckerIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator
+     {
+         /**
+          * The decorated iterator.
+          */
 -        private final OnDiskAtomIterator iterator;
++        private final UnfilteredRowIterator iterator;
+ 
 -        /**
 -         * The atom comparator.
 -         */
 -        private final Comparator<OnDiskAtom> comparator;
++        private final ClusteringComparator comparator;
+ 
 -        /**
 -         * The Column family containing the cells which are out of order.
 -         */
 -        private ColumnFamily outOfOrderCells;
++        private Unfiltered previous;
+ 
+         /**
 -         * The previous atom returned
++         * The partition containing the rows which are out of order.
+          */
 -        private OnDiskAtom previous;
++        private Partition rowsOutOfOrder;
+ 
 -        public OrderCheckerIterator(OnDiskAtomIterator iterator, Comparator<OnDiskAtom> comparator)
++        public OrderCheckerIterator(UnfilteredRowIterator iterator, ClusteringComparator comparator)
+         {
+             this.iterator = iterator;
+             this.comparator = comparator;
+         }
+ 
 -        public ColumnFamily getColumnFamily()
++        public CFMetaData metadata()
+         {
 -            return iterator.getColumnFamily();
++            return iterator.metadata();
+         }
+ 
 -        public DecoratedKey getKey()
++        public boolean isReverseOrder()
+         {
 -            return iterator.getKey();
++            return iterator.isReverseOrder();
+         }
+ 
 -        public void close() throws IOException
++        public PartitionColumns columns()
+         {
 -            iterator.close();
++            return iterator.columns();
++        }
++
++        public DecoratedKey partitionKey()
++        {
++            return iterator.partitionKey();
++        }
++
++        public Row staticRow()
++        {
++            return iterator.staticRow();
+         }
+ 
+         @Override
 -        protected OnDiskAtom computeNext()
++        public boolean isEmpty()
++        {
++            return iterator.isEmpty();
++        }
++
++        public void close()
++        {
++            iterator.close();
++        }
++
++        public DeletionTime partitionLevelDeletion()
++        {
++            return iterator.partitionLevelDeletion();
++        }
++
++        public EncodingStats stats()
++        {
++            return iterator.stats();
++        }
++
++        public boolean hasRowsOutOfOrder()
++        {
++            return rowsOutOfOrder != null;
++        }
++
++        public Partition getRowsOutOfOrder()
++        {
++            return rowsOutOfOrder;
++        }
++
++        protected Unfiltered computeNext()
+         {
+             if (!iterator.hasNext())
+                 return endOfData();
+ 
 -            OnDiskAtom next = iterator.next();
++            Unfiltered next = iterator.next();
+ 
 -            // If we detect that some cells are out of order we will store and sort the remaining once to insert them
++            // If we detect that some rows are out of order we will store and sort the remaining ones to insert them
+             // in a separate SSTable.
+             if (previous != null && comparator.compare(next, previous) < 0)
+             {
 -                outOfOrderCells = collectOutOfOrderCells(next, iterator);
++                rowsOutOfOrder = ImmutableBTreePartition.create(UnfilteredRowIterators.concat(next, iterator), false);
+                 return endOfData();
+             }
+             previous = next;
+             return next;
+         }
+ 
 -        public boolean hasOutOfOrderCells()
 -        {
 -            return outOfOrderCells != null;
 -        }
 -
 -        public ColumnFamily getOutOfOrderCells()
 -        {
 -            return outOfOrderCells;
 -        }
 -
 -        private static ColumnFamily collectOutOfOrderCells(OnDiskAtom atom, OnDiskAtomIterator iterator)
 -        {
 -            ColumnFamily cf = iterator.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);
 -            cf.addAtom(atom);
 -            while (iterator.hasNext())
 -                cf.addAtom(iterator.next());
 -            return cf;
 -        }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/marshal/ReversedType.java
index 02320c7,19bee5f..82a1895
--- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
@@@ -66,18 -65,8 +66,8 @@@ public class ReversedType<T> extends Ab
          return baseType.isEmptyValueMeaningless();
      }
  
 -    public int compare(ByteBuffer o1, ByteBuffer o2)
 +    public int compareCustom(ByteBuffer o1, ByteBuffer o2)
      {
-         // An empty byte buffer is always smaller
-         if (o1.remaining() == 0)
-         {
-             return o2.remaining() == 0 ? 0 : -1;
-         }
-         if (o2.remaining() == 0)
-         {
-             return 1;
-         }
- 
          return baseType.compare(o2, o1);
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 1fa3324,0000000..c63acc2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@@ -1,408 -1,0 +1,413 @@@
 +/*
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*    http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing,
 +* software distributed under the License is distributed on an
 +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +* KIND, either express or implied.  See the License for the
 +* specific language governing permissions and limitations
 +* under the License.
 +*/
 +package org.apache.cassandra.db.partitions;
 +
 +import java.util.Iterator;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.utils.SearchIterator;
 +import org.apache.cassandra.utils.btree.BTree;
 +import org.apache.cassandra.utils.btree.BTreeSearchIterator;
 +
 +import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
 +
 +public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
 +{
 +    protected static final Holder EMPTY = new Holder(PartitionColumns.NONE, BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
 +
 +    protected final CFMetaData metadata;
 +    protected final DecoratedKey partitionKey;
 +
 +    protected abstract Holder holder();
 +    protected abstract boolean canHaveShadowedData();
 +
 +    protected AbstractBTreePartition(CFMetaData metadata, DecoratedKey partitionKey)
 +    {
 +        this.metadata = metadata;
 +        this.partitionKey = partitionKey;
 +    }
 +
 +    protected static final class Holder
 +    {
 +        final PartitionColumns columns;
 +        final DeletionInfo deletionInfo;
 +        // the btree of rows
 +        final Object[] tree;
 +        final Row staticRow;
 +        final EncodingStats stats;
 +
 +        Holder(PartitionColumns columns, Object[] tree, DeletionInfo deletionInfo, Row staticRow, EncodingStats stats)
 +        {
 +            this.columns = columns;
 +            this.tree = tree;
 +            this.deletionInfo = deletionInfo;
 +            this.staticRow = staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
 +            this.stats = stats;
 +        }
 +    }
 +
 +    public DeletionInfo deletionInfo()
 +    {
 +        return holder().deletionInfo;
 +    }
 +
 +    public Row staticRow()
 +    {
 +        return holder().staticRow;
 +    }
 +
 +    public boolean isEmpty()
 +    {
 +        Holder holder = holder();
 +        return holder.deletionInfo.isLive() && BTree.isEmpty(holder.tree) && holder.staticRow.isEmpty();
 +    }
 +
 +    public boolean hasRows()
 +    {
 +        Holder holder = holder();
 +        return !BTree.isEmpty(holder.tree);
 +    }
 +
 +    public CFMetaData metadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public DecoratedKey partitionKey()
 +    {
 +        return partitionKey;
 +    }
 +
 +    public DeletionTime partitionLevelDeletion()
 +    {
 +        return holder().deletionInfo.getPartitionDeletion();
 +    }
 +
 +    public PartitionColumns columns()
 +    {
 +        return holder().columns;
 +    }
 +
 +    public EncodingStats stats()
 +    {
 +        return holder().stats;
 +    }
 +
 +    public Row getRow(Clustering clustering)
 +    {
 +        Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
 +        // Note that for statics, this will never return null, this will return an empty row. However,
 +        // it's more consistent for this method to return null if we don't really have a static row.
 +        return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
 +    }
 +
 +    private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow)
 +    {
 +        DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
 +        if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive()))
 +            return Rows.EMPTY_STATIC_ROW;
 +
 +        Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata);
 +        return row == null ? Rows.EMPTY_STATIC_ROW : row;
 +    }
 +
 +    public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
 +    {
 +        // TODO: we could optimize comparison for "NativeRow" � la #6755
 +        final Holder current = holder();
 +        return new SearchIterator<Clustering, Row>()
 +        {
 +            private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
 +            private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
 +
 +            public boolean hasNext()
 +            {
 +                return rawIter.hasNext();
 +            }
 +
 +            public Row next(Clustering clustering)
 +            {
 +                if (clustering == Clustering.STATIC_CLUSTERING)
 +                    return staticRow(current, columns, true);
 +
 +                Row row = rawIter.next(clustering);
 +                RangeTombstone rt = current.deletionInfo.rangeCovering(clustering);
 +
 +                // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row
 +                // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion
 +                // to carry the proper deletion on the row.
 +                DeletionTime activeDeletion = partitionDeletion;
 +                if (rt != null && rt.deletionTime().supersedes(activeDeletion))
 +                    activeDeletion = rt.deletionTime();
 +
 +                if (row == null)
 +                    return activeDeletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
 +
 +                return row.filter(columns, activeDeletion, true, metadata);
 +            }
 +        };
 +    }
 +
 +    public UnfilteredRowIterator unfilteredIterator()
 +    {
 +        return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false);
 +    }
 +
 +    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
 +    {
 +        return unfilteredIterator(holder(), selection, slices, reversed);
 +    }
 +
 +    public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed)
 +    {
 +        Row staticRow = staticRow(current, selection, false);
 +        if (slices.size() == 0)
 +        {
 +            DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
 +            return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed);
 +        }
 +
 +        return slices.size() == 1
 +               ? sliceIterator(selection, slices.get(0), reversed, current, staticRow)
 +               : new SlicesIterator(selection, slices, reversed, current, staticRow);
 +    }
 +
 +    private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow)
 +    {
 +        Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
 +        Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
 +        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed));
 +        Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(slice, reversed);
 +
 +        return merge(rowIter, deleteIter, selection, reversed, current, staticRow);
 +    }
 +
 +    private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter,
 +                                                     ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
 +    {
 +        return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(),
 +                                               selection, staticRow, reversed, current.stats,
 +                                               rowIter, deleteIter,
 +                                               canHaveShadowedData());
 +    }
 +
 +    private abstract class AbstractIterator extends AbstractUnfilteredRowIterator
 +    {
 +        final Holder current;
 +        final ColumnFilter selection;
 +
 +        private AbstractIterator(ColumnFilter selection, boolean isReversed)
 +        {
 +            this(AbstractBTreePartition.this.holder(), selection, isReversed);
 +        }
 +
 +        private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed)
 +        {
 +            this(current,
 +                 AbstractBTreePartition.this.staticRow(current, selection, false),
 +                 selection, isReversed);
 +        }
 +
 +        private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed)
 +        {
 +            super(AbstractBTreePartition.this.metadata,
 +                  AbstractBTreePartition.this.partitionKey,
 +                  current.deletionInfo.getPartitionDeletion(),
 +                  selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
 +                                              // it would also be more precise to return the intersection of the selection and current.columns,
 +                                              // but its probably not worth spending time on computing that.
 +                  staticRow,
 +                  isReversed,
 +                  current.stats);
 +            this.current = current;
 +            this.selection = selection;
 +        }
 +    }
 +
 +    public class SlicesIterator extends AbstractIterator
 +    {
 +        private final Slices slices;
 +
 +        private int idx;
 +        private Iterator<Unfiltered> currentSlice;
 +
 +        private SlicesIterator(ColumnFilter selection,
 +                               Slices slices,
 +                               boolean isReversed,
 +                               Holder current,
 +                               Row staticRow)
 +        {
 +            super(current, staticRow, selection, isReversed);
 +            this.slices = slices;
 +        }
 +
 +        protected Unfiltered computeNext()
 +        {
 +            while (true)
 +            {
 +                if (currentSlice == null)
 +                {
 +                    if (idx >= slices.size())
 +                        return endOfData();
 +
 +                    int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx;
 +                    currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW);
 +                    idx++;
 +                }
 +
 +                if (currentSlice.hasNext())
 +                    return currentSlice.next();
 +
 +                currentSlice = null;
 +            }
 +        }
 +    }
 +
 +    public class SliceableIterator extends AbstractIterator implements SliceableUnfilteredRowIterator
 +    {
 +        private Iterator<Unfiltered> iterator;
 +
 +        protected SliceableIterator(ColumnFilter selection, boolean isReversed)
 +        {
 +            super(selection, isReversed);
 +        }
 +
 +        protected Unfiltered computeNext()
 +        {
 +            if (iterator == null)
 +                iterator = unfilteredIterator(selection, Slices.ALL, isReverseOrder);
 +            if (!iterator.hasNext())
 +                return endOfData();
 +            return iterator.next();
 +        }
 +
 +        public Iterator<Unfiltered> slice(Slice slice)
 +        {
 +            return sliceIterator(selection, slice, isReverseOrder, current, staticRow);
 +        }
 +    }
 +
 +    public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
 +    {
 +        return new SliceableIterator(columns, reversed);
 +    }
 +
 +    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
 +    {
 +        return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false);
 +    }
 +
 +    protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
 +    {
++        return build(iterator, initialRowCapacity, true);
++    }
++
++    protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
++    {
 +        CFMetaData metadata = iterator.metadata();
 +        PartitionColumns columns = iterator.columns();
 +        boolean reversed = iterator.isReverseOrder();
 +
 +        BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
-         builder.auto(false);
++        builder.auto(!ordered);
 +        MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed);
 +
 +        while (iterator.hasNext())
 +        {
 +            Unfiltered unfiltered = iterator.next();
 +            if (unfiltered.kind() == Unfiltered.Kind.ROW)
 +                builder.add((Row)unfiltered);
 +            else
 +                deletionBuilder.add((RangeTombstoneMarker)unfiltered);
 +        }
 +
 +        if (reversed)
 +            builder.reverse();
 +
 +        return new Holder(columns, builder.build(), deletionBuilder.build(), iterator.staticRow(), iterator.stats());
 +    }
 +
 +    // Note that when building with a RowIterator, deletion will generally be LIVE, but we allow to pass it nonetheless because PartitionUpdate
 +    // passes a MutableDeletionInfo that it mutates later.
 +    protected static Holder build(RowIterator rows, DeletionInfo deletion, boolean buildEncodingStats, int initialRowCapacity)
 +    {
 +        CFMetaData metadata = rows.metadata();
 +        PartitionColumns columns = rows.columns();
 +        boolean reversed = rows.isReverseOrder();
 +
 +        BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
 +        builder.auto(false);
 +        while (rows.hasNext())
 +        {
 +            Row row = rows.next();
 +            builder.add(row);
 +        }
 +
 +        if (reversed)
 +            builder.reverse();
 +
 +        Row staticRow = rows.staticRow();
 +        Object[] tree = builder.build();
 +        EncodingStats stats = buildEncodingStats ? EncodingStats.Collector.collect(staticRow, BTree.iterator(tree), deletion)
 +                                                 : EncodingStats.NO_STATS;
 +        return new Holder(columns, tree, deletion, staticRow, stats);
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +
 +        sb.append(String.format("[%s.%s] key=%s columns=%s",
 +                                metadata.ksName,
 +                                metadata.cfName,
 +                                metadata.getKeyValidator().getString(partitionKey().getKey()),
 +                                columns()));
 +
 +        if (staticRow() != Rows.EMPTY_STATIC_ROW)
 +            sb.append("\n    ").append(staticRow().toString(metadata));
 +
 +        for (Row row : this)
 +            sb.append("\n    ").append(row.toString(metadata));
 +
 +        return sb.toString();
 +    }
 +
 +    public int rowCount()
 +    {
 +        return BTree.size(holder().tree);
 +    }
 +
 +    public Iterator<Row> iterator()
 +    {
 +        return BTree.<Row>iterator(holder().tree);
 +    }
 +
 +    public Row lastRow()
 +    {
 +        Object[] tree = holder().tree;
 +        if (BTree.isEmpty(tree))
 +            return null;
 +
 +        return BTree.findByIndex(tree, BTree.size(tree) - 1);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
index 9af4bad,0000000..8d96f1e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
@@@ -1,91 -1,0 +1,123 @@@
 +/*
 +* Licensed to the Apache Software Foundation (ASF) under one
 +* or more contributor license agreements.  See the NOTICE file
 +* distributed with this work for additional information
 +* regarding copyright ownership.  The ASF licenses this file
 +* to you under the Apache License, Version 2.0 (the
 +* "License"); you may not use this file except in compliance
 +* with the License.  You may obtain a copy of the License at
 +*
 +*    http://www.apache.org/licenses/LICENSE-2.0
 +*
 +* Unless required by applicable law or agreed to in writing,
 +* software distributed under the License is distributed on an
 +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +* KIND, either express or implied.  See the License for the
 +* specific language governing permissions and limitations
 +* under the License.
 +*/
 +package org.apache.cassandra.db.partitions;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionInfo;
 +import org.apache.cassandra.db.PartitionColumns;
 +import org.apache.cassandra.db.rows.*;
 +
 +public class ImmutableBTreePartition extends AbstractBTreePartition
 +{
 +
 +    protected final Holder holder;
 +
 +    public ImmutableBTreePartition(CFMetaData metadata,
 +                                      DecoratedKey partitionKey,
 +                                      PartitionColumns columns,
 +                                      Row staticRow,
 +                                      Object[] tree,
 +                                      DeletionInfo deletionInfo,
 +                                      EncodingStats stats)
 +    {
 +        super(metadata, partitionKey);
 +        this.holder = new Holder(columns, tree, deletionInfo, staticRow, stats);
 +    }
 +
 +    protected ImmutableBTreePartition(CFMetaData metadata,
 +                                      DecoratedKey partitionKey,
 +                                      Holder holder)
 +    {
 +        super(metadata, partitionKey);
 +        this.holder = holder;
 +    }
 +
 +    /**
 +     * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
 +     *
 +     * Warning: Note that this method does not close the provided iterator and it is
 +     * up to the caller to do so.
 +     *
 +     * @param iterator the iterator to gather in memory.
 +     * @return the created partition.
 +     */
 +    public static ImmutableBTreePartition create(UnfilteredRowIterator iterator)
 +    {
 +        return create(iterator, 16);
 +    }
 +
 +    /**
 +     * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
 +     *
 +     * Warning: Note that this method does not close the provided iterator and it is
 +     * up to the caller to do so.
 +     *
 +     * @param iterator the iterator to gather in memory.
++     * @param ordered {@code true} if the iterator will return the rows in order, {@code false} otherwise.
++     * @return the created partition.
++     */
++    public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, boolean ordered)
++    {
++        return create(iterator, 16, ordered);
++    }
++
++    /**
++     * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
++     *
++     * Warning: Note that this method does not close the provided iterator and it is
++     * up to the caller to do so.
++     *
++     * @param iterator the iterator to gather in memory.
 +     * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
 +     * correspond or be a good estimation of the number or rows in {@code iterator}.
 +     * @return the created partition.
 +     */
 +    public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity)
 +    {
-         return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity));
++        return create(iterator, initialRowCapacity, true);
++    }
++
++    /**
++     * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
++     *
++     * Warning: Note that this method does not close the provided iterator and it is
++     * up to the caller to do so.
++     *
++     * @param iterator the iterator to gather in memory.
++     * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
++     * correspond or be a good estimation of the number or rows in {@code iterator}.
++     * @param ordered {@code true} if the iterator will return the rows in order, {@code false} otherwise.
++     * @return the created partition.
++     */
++    public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
++    {
++        return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity, ordered));
 +    }
 +
 +    protected Holder holder()
 +    {
 +        return holder;
 +    }
 +
 +    protected boolean canHaveShadowedData()
 +    {
 +        return false;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 3218ff2,0000000..43653a9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@@ -1,558 -1,0 +1,586 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.rows;
 +
 +import java.util.*;
 +import java.security.MessageDigest;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.transform.FilteredRows;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.IMergeIterator;
 +import org.apache.cassandra.utils.MergeIterator;
 +import org.apache.cassandra.utils.memory.AbstractAllocator;
 +
 +/**
 + * Static methods to work with atom iterators.
 + */
 +public abstract class UnfilteredRowIterators
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIterators.class);
 +
 +    private UnfilteredRowIterators() {}
 +
 +    /**
 +     * Interface for a listener interested in the result of merging multiple versions of a given row.
 +     * <p>
 +     * Implementors of this interface are given enough information that they can easily reconstruct the difference
 +     * between the merged result and each individual input. This is used when reconciling results on replias for
 +     * instance to figure out what to send as read-repair to each source.
 +     */
 +    public interface MergeListener
 +    {
 +        /**
 +         * Called once for the merged partition.
 +         *
 +         * @param mergedDeletion the partition level deletion for the merged partition. Implementors can test if the
 +         * merged partition actually has a partition level deletion or not by calling {@code mergedDeletion.isLive()}.
 +         * @param versions the partition level deletion for the sources of the merge. Elements of the array will never
 +         * be null, but be "live".
 +         **/
 +        public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
 +
 +        /**
 +         * Called once for every row participating in the merge.
 +         * <p>
 +         * Note that this is called for every clustering where at least one of the source merged has a row. In
 +         * particular, this may be called in cases where there is no row in the merged output (if a source has a row
 +         * that is shadowed by another source range tombstone or partition level deletion).
 +         *
 +         * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
 +         * placeholder for when at least one source has a row, but that row is shadowed in the merged output.
 +         * @param versions for each source, the row in that source corresponding to {@code merged}. This can be
 +         * {@code null} for some sources if the source has not such row.
 +         */
 +        public void onMergedRows(Row merged, Row[] versions);
 +
 +        /**
 +         * Called once for every range tombstone marker participating in the merge.
 +         * <p>
 +         * Note that this is called for every "clustering position" where at least one of the source merged has a range
 +         * tombstone marker.
 +         *
 +         * @param merged the marker in the merged output. This can be {@code null} if there is no such marker, which
 +         * means that at least one source has a marker in {@code versions} but the merged out has nothing corresponding
 +         * (this basically mean the merged output has a currently open deletion that shadows whatever marker the source
 +         * had).
 +         * @param versions the marker for each source merged. This can be {@code null} for some source if that source
 +         * has not such marker.
 +         */
 +        public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
 +
 +        public void close();
 +    }
 +
 +    /**
 +     * Returns a iterator that only returns rows with only live content.
 +     *
 +     * This is mainly used in the CQL layer when we know we don't care about deletion
 +     * infos (and since an UnfilteredRowIterator cannot shadow it's own data, we know everyting
 +     * returned isn't shadowed by a tombstone).
 +     */
 +    public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec)
 +    {
 +        return FilteredRows.filter(iter, nowInSec);
 +    }
 +
 +    /**
 +     * Returns an iterator that is the result of merging other iterators.
 +     */
 +    public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec)
 +    {
 +        assert !iterators.isEmpty();
 +        if (iterators.size() == 1)
 +            return iterators.get(0);
 +
 +        return UnfilteredRowMergeIterator.create(iterators, nowInSec, null);
 +    }
 +
 +    /**
 +     * Returns an iterator that is the result of merging other iterators, and (optionally) using
 +     * specific MergeListener.
 +     *
 +     * Note that this method assumes that there is at least 2 iterators to merge.
 +     */
 +    public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener)
 +    {
 +        return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
 +    }
 +
 +    /**
 +     * Returns an empty unfiltered iterator for a given partition.
 +     */
 +    public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
 +    {
 +        return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion);
 +    }
 +
 +    /**
 +     * Digests the partition represented by the provided iterator.
 +     *
 +     * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
 +     * as this is only used when producing digest to be sent to legacy nodes.
 +     * @param iterator the iterator to digest.
 +     * @param digest the {@code MessageDigest} to use for the digest.
 +     * @param version the messaging protocol to use when producing the digest.
 +     */
 +    public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version)
 +    {
 +        if (version < MessagingService.VERSION_30)
 +        {
 +            LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest);
 +            return;
 +        }
 +
 +        digest.update(iterator.partitionKey().getKey().duplicate());
 +        iterator.partitionLevelDeletion().digest(digest);
 +        iterator.columns().regulars.digest(digest);
 +        // When serializing an iterator, we skip the static columns if the iterator has not static row, even if the
 +        // columns() object itself has some (the columns() is a superset of what the iterator actually contains, and
 +        // will correspond to the queried columns pre-serialization). So we must avoid taking the satic column names
 +        // into account if there is no static row or we'd have a digest mismatch between depending on whether the digest
 +        // is computed on an iterator that has been serialized or not (see CASSANDRA-12090)
 +        // TODO: in practice we could completely skip digesting the columns since they are more informative of what the
 +        // iterator may contain, and digesting the actual content is enough. And in fact, that would be more correct
 +        // (since again, the columns could be different without the information represented by the iterator being
 +        // different), but removing them entirely is stricly speaking a breaking change (it would create mismatches on
 +        // upgrade) so we can only do on the next protocol version bump.
 +        if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
 +            iterator.columns().statics.digest(digest);
 +        FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
 +        iterator.staticRow().digest(digest);
 +
 +        while (iterator.hasNext())
 +        {
 +            Unfiltered unfiltered = iterator.next();
 +            unfiltered.digest(digest);
 +        }
 +    }
 +
 +    /**
 +     * Returns an iterator that concatenate two atom iterators.
 +     * This method assumes that both iterator are from the same partition and that the atom from
 +     * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator
 +     * make sense).
 +     */
 +    public static UnfilteredRowIterator concat(final UnfilteredRowIterator iter1, final UnfilteredRowIterator iter2)
 +    {
 +        assert iter1.metadata().cfId.equals(iter2.metadata().cfId)
 +            && iter1.partitionKey().equals(iter2.partitionKey())
 +            && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion())
 +            && iter1.isReverseOrder() == iter2.isReverseOrder()
 +            && iter1.columns().equals(iter2.columns())
 +            && iter1.staticRow().equals(iter2.staticRow());
 +
 +        class Extend implements MoreRows<UnfilteredRowIterator>
 +        {
 +            boolean returned = false;
 +            public UnfilteredRowIterator moreContents()
 +            {
 +                if (returned)
 +                    return null;
 +                returned = true;
 +                return iter2;
 +            }
 +        }
 +
 +        return MoreRows.extend(iter1, new Extend());
 +    }
 +
++    /**
++     * Returns an iterator that concatenate the specified atom with the iterator.
++     */
++    public static UnfilteredRowIterator concat(final Unfiltered first, final UnfilteredRowIterator rest)
++    {
++        return new WrappingUnfilteredRowIterator(rest)
++        {
++            private boolean hasReturnedFirst;
++
++            @Override
++            public boolean hasNext()
++            {
++                return hasReturnedFirst ? super.hasNext() : true;
++            }
++
++            @Override
++            public Unfiltered next()
++            {
++                if (!hasReturnedFirst)
++                {
++                    hasReturnedFirst = true;
++                    return first;
++                }
++                return super.next();
++            }
++        };
++    }
++
 +    public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator)
 +    {
 +        class Cloner extends Transformation
 +        {
 +            private final Row.Builder builder = allocator.cloningBTreeRowBuilder();
 +
 +            public Row applyToStatic(Row row)
 +            {
 +                return Rows.copy(row, builder).build();
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                return Rows.copy(row, builder).build();
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                return marker.copy(allocator);
 +            }
 +        }
 +        return Transformation.apply(iterator, new Cloner());
 +    }
 +
 +    /**
 +     * Validate that the data of the provided iterator is valid, that is that the values
 +     * it contains are valid for the type they represent, and more generally that the
 +     * infos stored are sensible.
 +     *
 +     * This is mainly used by scrubber to detect problems in sstables.
 +     *
 +     * @param iterator the partition to check.
 +     * @param filename the name of the file the data is comming from.
 +     * @return an iterator that returns the same data than {@code iterator} but that
 +     * checks said data and throws a {@code CorruptedSSTableException} if it detects
 +     * invalid data.
 +     */
 +    public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename)
 +    {
 +        class Validator extends Transformation
 +        {
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                validate(row);
 +                return row;
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                validate(row);
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                validate(marker);
 +                return marker;
 +            }
 +
 +            private void validate(Unfiltered unfiltered)
 +            {
 +                try
 +                {
 +                    unfiltered.validateData(iterator.metadata());
 +                }
 +                catch (MarshalException me)
 +                {
 +                    throw new CorruptSSTableException(me, filename);
 +                }
 +            }
 +        }
 +        return Transformation.apply(iterator, new Validator());
 +    }
 +
 +    /**
 +     * Wraps the provided iterator so it logs the returned atoms for debugging purposes.
 +     * <p>
 +     * Note that this is only meant for debugging as this can log a very large amount of
 +     * logging at INFO.
 +     */
 +    public static UnfilteredRowIterator loggingIterator(UnfilteredRowIterator iterator, final String id, final boolean fullDetails)
 +    {
 +        CFMetaData metadata = iterator.metadata();
 +        logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}, deletion={}",
 +                    id,
 +                    metadata.ksName,
 +                    metadata.cfName,
 +                    metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
 +                    iterator.isReverseOrder(),
 +                    iterator.partitionLevelDeletion().markedForDeleteAt());
 +
 +        class Logger extends Transformation
 +        {
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                if (!row.isEmpty())
 +                    logger.info("[{}] {}", id, row.toString(metadata, fullDetails));
 +                return row;
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                logger.info("[{}] {}", id, row.toString(metadata, fullDetails));
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                logger.info("[{}] {}", id, marker.toString(metadata));
 +                return marker;
 +            }
 +        }
 +        return Transformation.apply(iterator, new Logger());
 +    }
 +
 +    /**
 +     * A wrapper over MergeIterator to implement the UnfilteredRowIterator interface.
 +     */
 +    private static class UnfilteredRowMergeIterator extends AbstractUnfilteredRowIterator
 +    {
 +        private final IMergeIterator<Unfiltered, Unfiltered> mergeIterator;
 +        private final MergeListener listener;
 +
 +        private UnfilteredRowMergeIterator(CFMetaData metadata,
 +                                           List<UnfilteredRowIterator> iterators,
 +                                           PartitionColumns columns,
 +                                           DeletionTime partitionDeletion,
 +                                           int nowInSec,
 +                                           boolean reversed,
 +                                           MergeListener listener)
 +        {
 +            super(metadata,
 +                  iterators.get(0).partitionKey(),
 +                  partitionDeletion,
 +                  columns,
 +                  mergeStaticRows(iterators, columns.statics, nowInSec, listener, partitionDeletion),
 +                  reversed,
 +                  mergeStats(iterators));
 +
 +            this.mergeIterator = MergeIterator.get(iterators,
 +                                                   reversed ? metadata.comparator.reversed() : metadata.comparator,
 +                                                   new MergeReducer(iterators.size(), reversed, nowInSec, listener));
 +            this.listener = listener;
 +        }
 +
 +        private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener)
 +        {
 +            try
 +            {
 +                checkForInvalidInput(iterators);
 +                return new UnfilteredRowMergeIterator(iterators.get(0).metadata(),
 +                                                      iterators,
 +                                                      collectColumns(iterators),
 +                                                      collectPartitionLevelDeletion(iterators, listener),
 +                                                      nowInSec,
 +                                                      iterators.get(0).isReverseOrder(),
 +                                                      listener);
 +            }
 +            catch (RuntimeException | Error e)
 +            {
 +                try
 +                {
 +                    FBUtilities.closeAll(iterators);
 +                }
 +                catch (Exception suppressed)
 +                {
 +                    e.addSuppressed(suppressed);
 +                }
 +                throw e;
 +            }
 +        }
 +
 +        @SuppressWarnings("resource") // We're not really creating any resource here
 +        private static void checkForInvalidInput(List<UnfilteredRowIterator> iterators)
 +        {
 +            if (iterators.isEmpty())
 +                return;
 +
 +            UnfilteredRowIterator first = iterators.get(0);
 +            for (int i = 1; i < iterators.size(); i++)
 +            {
 +                UnfilteredRowIterator iter = iterators.get(i);
 +                assert first.metadata().cfId.equals(iter.metadata().cfId);
 +                assert first.partitionKey().equals(iter.partitionKey());
 +                assert first.isReverseOrder() == iter.isReverseOrder();
 +            }
 +        }
 +
 +        @SuppressWarnings("resource") // We're not really creating any resource here
 +        private static DeletionTime collectPartitionLevelDeletion(List<UnfilteredRowIterator> iterators, MergeListener listener)
 +        {
 +            DeletionTime[] versions = listener == null ? null : new DeletionTime[iterators.size()];
 +
 +            DeletionTime delTime = DeletionTime.LIVE;
 +            for (int i = 0; i < iterators.size(); i++)
 +            {
 +                UnfilteredRowIterator iter = iterators.get(i);
 +                DeletionTime iterDeletion = iter.partitionLevelDeletion();
 +                if (listener != null)
 +                    versions[i] = iterDeletion;
 +                if (!delTime.supersedes(iterDeletion))
 +                    delTime = iterDeletion;
 +            }
 +            if (listener != null)
 +                listener.onMergedPartitionLevelDeletion(delTime, versions);
 +            return delTime;
 +        }
 +
 +        private static Row mergeStaticRows(List<UnfilteredRowIterator> iterators,
 +                                           Columns columns,
 +                                           int nowInSec,
 +                                           MergeListener listener,
 +                                           DeletionTime partitionDeletion)
 +        {
 +            if (columns.isEmpty())
 +                return Rows.EMPTY_STATIC_ROW;
 +
 +            if (iterators.stream().allMatch(iter -> iter.staticRow().isEmpty()))
 +                return Rows.EMPTY_STATIC_ROW;
 +
 +            Row.Merger merger = new Row.Merger(iterators.size(), nowInSec, columns.hasComplex());
 +            for (int i = 0; i < iterators.size(); i++)
 +                merger.add(i, iterators.get(i).staticRow());
 +
 +            Row merged = merger.merge(partitionDeletion);
 +            if (merged == null)
 +                merged = Rows.EMPTY_STATIC_ROW;
 +            if (listener != null)
 +                listener.onMergedRows(merged, merger.mergedRows());
 +            return merged;
 +        }
 +
 +        private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
 +        {
 +            PartitionColumns first = iterators.get(0).columns();
 +            Columns statics = first.statics;
 +            Columns regulars = first.regulars;
 +            for (int i = 1; i < iterators.size(); i++)
 +            {
 +                PartitionColumns cols = iterators.get(i).columns();
 +                statics = statics.mergeTo(cols.statics);
 +                regulars = regulars.mergeTo(cols.regulars);
 +            }
 +            return statics == first.statics && regulars == first.regulars
 +                 ? first
 +                 : new PartitionColumns(statics, regulars);
 +        }
 +
 +        private static EncodingStats mergeStats(List<UnfilteredRowIterator> iterators)
 +        {
 +            EncodingStats stats = EncodingStats.NO_STATS;
 +            for (UnfilteredRowIterator iter : iterators)
 +                stats = stats.mergeWith(iter.stats());
 +            return stats;
 +        }
 +
 +        protected Unfiltered computeNext()
 +        {
 +            while (mergeIterator.hasNext())
 +            {
 +                Unfiltered merged = mergeIterator.next();
 +                if (merged != null)
 +                    return merged;
 +            }
 +            return endOfData();
 +        }
 +
 +        public void close()
 +        {
 +            // This will close the input iterators
 +            FileUtils.closeQuietly(mergeIterator);
 +
 +            if (listener != null)
 +                listener.close();
 +        }
 +
 +        private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered>
 +        {
 +            private final MergeListener listener;
 +
 +            private Unfiltered.Kind nextKind;
 +
 +            private final Row.Merger rowMerger;
 +            private final RangeTombstoneMarker.Merger markerMerger;
 +
 +            private MergeReducer(int size, boolean reversed, int nowInSec, MergeListener listener)
 +            {
 +                this.rowMerger = new Row.Merger(size, nowInSec, columns().regulars.hasComplex());
 +                this.markerMerger = new RangeTombstoneMarker.Merger(size, partitionLevelDeletion(), reversed);
 +                this.listener = listener;
 +            }
 +
 +            @Override
 +            public boolean trivialReduceIsTrivial()
 +            {
 +                // If we have a listener, we must signal it even when we have a single version
 +                return listener == null;
 +            }
 +
 +            public void reduce(int idx, Unfiltered current)
 +            {
 +                nextKind = current.kind();
 +                if (nextKind == Unfiltered.Kind.ROW)
 +                    rowMerger.add(idx, (Row)current);
 +                else
 +                    markerMerger.add(idx, (RangeTombstoneMarker)current);
 +            }
 +
 +            protected Unfiltered getReduced()
 +            {
 +                if (nextKind == Unfiltered.Kind.ROW)
 +                {
 +                    Row merged = rowMerger.merge(markerMerger.activeDeletion());
 +                    if (listener != null)
 +                        listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows());
 +                    return merged;
 +                }
 +                else
 +                {
 +                    RangeTombstoneMarker merged = markerMerger.merge();
 +                    if (listener != null)
 +                        listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers());
 +                    return merged;
 +                }
 +            }
 +
 +            protected void onKeyChange()
 +            {
 +                if (nextKind == Unfiltered.Kind.ROW)
 +                    rowMerger.clear();
 +                else
 +                    markerMerger.clear();
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index e3dc220,98b8e23..c9c4631
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -366,21 -278,20 +366,39 @@@ public abstract class CQLTeste
          return list.isEmpty() ? Collections.<String>emptyList() : new ArrayList<>(list);
      }
  
 +    public ColumnFamilyStore getCurrentColumnFamilyStore()
 +    {
 +        String currentTable = currentTable();
 +        return currentTable == null
 +             ? null
 +             : Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable);
 +    }
 +
      public void flush()
      {
 -        try
 -        {
 -            String currentTable = currentTable();
 -            if (currentTable != null)
 -                Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable).forceFlush().get();
 -        }
 -        catch (InterruptedException | ExecutionException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 +        ColumnFamilyStore store = getCurrentColumnFamilyStore();
 +        if (store != null)
 +            store.forceBlockingFlush();
 +    }
 +
++    @FunctionalInterface
++    public interface CheckedFunction {
++        void apply() throws Throwable;
++    }
++
++    /**
++     * Runs the given function before and after a flush of sstables.  This is useful for checking that behavior is
++     * the same whether data is in memtables or sstables.
++     * @param runnable
++     * @throws Throwable
++     */
++    public void beforeAndAfterFlush(CheckedFunction runnable) throws Throwable
++    {
++        runnable.apply();
++        flush();
++        runnable.apply();
+     }
+ 
      public void compact()
      {
          try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index f9802d7,11d2462..0cf13bd
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@@ -17,47 -17,38 +17,49 @@@
   */
  package org.apache.cassandra.cql3.validation.entities;
  
 -import static org.junit.Assert.assertTrue;
 -import static org.junit.Assert.fail;
 -
  import java.nio.ByteBuffer;
 -import java.util.HashMap;
 -import java.util.List;
 -import java.util.Locale;
 -import java.util.Map;
 -import java.util.Set;
 -import java.util.UUID;
 +import java.util.*;
 +import java.util.concurrent.Callable;
  import java.util.concurrent.CountDownLatch;
  
 +import org.apache.commons.lang3.StringUtils;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.cql3.CQLTester;
 -import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.DecoratedKey;
 -import org.apache.cassandra.db.IndexExpression;
 -import org.apache.cassandra.db.composites.CellName;
 -import org.apache.cassandra.db.index.IndexNotAvailableException;
 -import org.apache.cassandra.db.index.PerRowSecondaryIndex;
 -import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 -import org.apache.cassandra.db.index.composites.CompositesSearcher;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.Row;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.exceptions.InvalidRequestException;
  import org.apache.cassandra.exceptions.SyntaxException;
 -import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.concurrent.OpOrder.Group;
 -import org.apache.commons.lang3.StringUtils;
 -import org.junit.Test;
 -
 +import org.apache.cassandra.index.IndexNotAvailableException;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.index.StubIndex;
 +import org.apache.cassandra.index.internal.CustomCassandraIndex;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.MD5Digest;
 +import org.apache.cassandra.utils.Pair;
 +
 +import static org.apache.cassandra.Util.throwAssert;
+ import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
  
  public class SecondaryIndexTest extends CQLTester
  {
@@@ -949,77 -684,190 +951,206 @@@
      }
  
      @Test
 +    public void droppingIndexInvalidatesPreparedStatements() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
 +        createIndex("CREATE INDEX c_idx ON %s(c)");
 +        MD5Digest cqlId = prepareStatement("SELECT * FROM %s.%s WHERE c=?", false).statementId;
 +        Integer thriftId = prepareStatement("SELECT * FROM %s.%s WHERE c=?", true).toThriftPreparedResult().getItemId();
 +
 +        assertNotNull(QueryProcessor.instance.getPrepared(cqlId));
 +        assertNotNull(QueryProcessor.instance.getPreparedForThrift(thriftId));
 +
 +        dropIndex("DROP INDEX %s.c_idx");
 +
 +        assertNull(QueryProcessor.instance.getPrepared(cqlId));
 +        assertNull(QueryProcessor.instance.getPreparedForThrift(thriftId));
 +    }
 +
 +    // See CASSANDRA-11021
 +    @Test
 +    public void testIndexesOnNonStaticColumnsWhereSchemaIncludesStaticColumns() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int, b int, c int static, d int, PRIMARY KEY (a, b))");
 +        createIndex("CREATE INDEX b_idx on %s(b)");
 +        createIndex("CREATE INDEX d_idx on %s(d)");
 +
 +        execute("INSERT INTO %s (a, b, c ,d) VALUES (0, 0, 0, 0)");
 +        execute("INSERT INTO %s (a, b, c, d) VALUES (1, 1, 1, 1)");
 +        assertRows(execute("SELECT * FROM %s WHERE b = 0"), row(0, 0, 0, 0));
 +        assertRows(execute("SELECT * FROM %s WHERE d = 1"), row(1, 1, 1, 1));
 +
 +        execute("UPDATE %s SET c = 2 WHERE a = 0");
 +        execute("UPDATE %s SET c = 3, d = 4 WHERE a = 1 AND b = 1");
 +        assertRows(execute("SELECT * FROM %s WHERE b = 0"), row(0, 0, 2, 0));
 +        assertRows(execute("SELECT * FROM %s WHERE d = 4"), row(1, 1, 3, 4));
 +
 +        execute("DELETE FROM %s WHERE a = 0");
 +        execute("DELETE FROM %s WHERE a = 1 AND b = 1");
 +        assertEmpty(execute("SELECT * FROM %s WHERE b = 0"));
 +        assertEmpty(execute("SELECT * FROM %s WHERE d = 3"));
 +    }
 +
++    @Test
+     public void testWithEmptyRestrictionValueAndSecondaryIndex() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk blob, c blob, v blob, PRIMARY KEY ((pk), c))");
+         createIndex("CREATE INDEX on %s(c)");
+         createIndex("CREATE INDEX on %s(v)");
+ 
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", bytes("foo123"), bytes("1"), bytes("1"));
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", bytes("foo123"), bytes("2"), bytes("1"));
+ 
 -        for (boolean flush : new boolean[]{false, true})
 -        {
 -            if (flush)
 -                flush();
 -
++        beforeAndAfterFlush(() -> {
+             // Test clustering columns restrictions
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c = textAsBlob('');"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) = (textAsBlob(''));"));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c IN (textAsBlob(''), textAsBlob('1'));"),
+                        row(bytes("foo123"), bytes("1"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) IN ((textAsBlob('')), (textAsBlob('1')));"),
+                        row(bytes("foo123"), bytes("1"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), bytes("1"), bytes("1")),
+                        row(bytes("foo123"), bytes("2"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), bytes("1"), bytes("1")),
+                        row(bytes("foo123"), bytes("2"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), bytes("1"), bytes("1")),
+                        row(bytes("foo123"), bytes("2"), bytes("1")));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c <= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) <= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) > (textAsBlob('')) AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
 -        }
++        });
+ 
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)",
+                 bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1"));
+ 
 -        for (boolean flush : new boolean[]{false, true})
 -        {
 -            if (flush)
 -                flush();
++        beforeAndAfterFlush(() -> {
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c = textAsBlob('');"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) = (textAsBlob(''));"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c IN (textAsBlob(''), textAsBlob('1'));"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+                        row(bytes("foo123"), bytes("1"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) IN ((textAsBlob('')), (textAsBlob('1')));"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+                        row(bytes("foo123"), bytes("1"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), bytes("1"), bytes("1")),
+                        row(bytes("foo123"), bytes("2"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+                        row(bytes("foo123"), bytes("1"), bytes("1")),
+                        row(bytes("foo123"), bytes("2"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+                        row(bytes("foo123"), bytes("1"), bytes("1")),
+                        row(bytes("foo123"), bytes("2"), bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c <= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) <= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"),
+                        row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+ 
+             // Test restrictions on non-primary key value
+             assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');"));
 -        }
++        });
+ 
+         execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)",
+                 bytes("foo123"), bytes("3"), EMPTY_BYTE_BUFFER);
+ 
 -        for (boolean flush : new boolean[]{false, true})
 -        {
 -            if (flush)
 -                flush();
++        beforeAndAfterFlush(() -> {
+ 
+             assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');"),
+                        row(bytes("foo123"), bytes("3"), EMPTY_BYTE_BUFFER));
 -        }
++        });
+     }
+ 
+     @Test
+     public void testEmptyRestrictionValueWithSecondaryIndexAndCompactTables() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk blob, c blob, v blob, PRIMARY KEY ((pk), c)) WITH COMPACT STORAGE");
 -        assertInvalidMessage("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables",
++        assertInvalidMessage("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns",
+                             "CREATE INDEX on %s(c)");
+ 
+         createTable("CREATE TABLE %s (pk blob PRIMARY KEY, v blob) WITH COMPACT STORAGE");
+         createIndex("CREATE INDEX on %s(v)");
+ 
+         execute("INSERT INTO %s (pk, v) VALUES (?, ?)", bytes("foo123"), bytes("1"));
+ 
+         // Test restrictions on non-primary key value
+         assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');"));
+ 
+         execute("INSERT INTO %s (pk, v) VALUES (?, ?)", bytes("foo124"), EMPTY_BYTE_BUFFER);
+ 
+         assertRows(execute("SELECT * FROM %s WHERE v = textAsBlob('');"),
+                    row(bytes("foo124"), EMPTY_BYTE_BUFFER));
+     }
+ 
 -    /**
 -     * Custom index used to test the behavior of the system when the index is not ready.
 -     * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code>
 -     * to avoid the check but return a <code>CompositesSearcher</code>.
 -     */
 -    public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex
 +    private ResultMessage.Prepared prepareStatement(String cql, boolean forThrift)
      {
 -        private volatile CountDownLatch latch = new CountDownLatch(1);
 -
 -        @Override
 -        public void index(ByteBuffer rowKey, ColumnFamily cf)
 -        {
 -            try
 -            {
 -                latch.await();
 -            }
 -            catch (InterruptedException e)
 -            {
 -                Thread.interrupted();
 -            }
 -        }
 -
 -        @Override
 -        public void delete(DecoratedKey key, Group opGroup)
 -        {
 -        }
 +        return QueryProcessor.prepare(String.format(cql, KEYSPACE, currentTable()),
 +                                      ClientState.forInternalCalls(),
 +                                      forThrift);
 +    }
  
 -        @Override
 -        public void init()
 -        {
 -        }
 +    private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp)
 +    {
 +        assertNotNull(cell);
 +        assertEquals(0, def.type.compare(cell.value(), val));
 +        assertEquals(timestamp, cell.timestamp());
 +    }
  
 -        @Override
 -        public void reload()
 -        {
 -        }
 +    private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm)
 +    {
 +        ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
 +        AbstractType<?> type = col.type;
 +        assertEquals(expected, type.compose(row.getCell(col).value()));
 +    }
  
 -        @Override
 -        public void validateOptions() throws ConfigurationException
 -        {
 -        }
 +    /**
 +     * <code>CassandraIndex</code> that blocks during the initialization.
 +     */
 +    public static class IndexBlockingOnInitialization extends CustomCassandraIndex
 +    {
 +        private final CountDownLatch latch = new CountDownLatch(1);
  
 -        @Override
 -        public String getIndexName()
 +        public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
          {
 -            return "testIndex";
 +            super(baseCfs, indexDef);
          }
  
          @Override