You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/12 18:43:14 UTC

[1/3] cassandra git commit: Avoid stack overflow on large clustering IN values

Repository: cassandra
Updated Branches:
  refs/heads/trunk e530f4230 -> 4125ca0aa


Avoid stack overflow on large clustering IN values

Patch by Tyler Hobbs; reviewed by Benjamin Lerer for CASSANDRA-8410


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9dc9185f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9dc9185f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9dc9185f

Branch: refs/heads/trunk
Commit: 9dc9185f5c7172915485f713dbbb6b78b22d0f66
Parents: 3f3d0ed
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Dec 12 11:41:06 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Dec 12 11:41:06 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/db/filter/ColumnSlice.java | 48 ++++++++++----------
 2 files changed, 26 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dc9185f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc426bb..6cecf99 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.12:
+ * Avoid StackOverflowError when a large list of IN values
+   is used for a clustering column (CASSANDRA-8410)
  * Fix NPE when writetime() or ttl() calls are wrapped by
    another function call (CASSANDRA-8451)
  * Fix NPE after dropping a keyspace (CASSANDRA-8332)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dc9185f/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 9eff12a..6a9efbb 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -130,36 +130,36 @@ public class ColumnSlice
 
         protected Column computeNext()
         {
-            if (currentSlice == null)
+            while (currentSlice != null || idx < slices.length)
             {
-                if (idx >= slices.length)
-                    return endOfData();
-
-                ColumnSlice slice = slices[idx++];
-                // Note: we specialize the case of start == "" and finish = "" because it is slightly more efficient, but also they have a specific
-                // meaning (namely, they always extend to the beginning/end of the range).
-                if (slice.start.remaining() == 0)
+                if (currentSlice == null)
                 {
-                    if (slice.finish.remaining() == 0)
-                        currentSlice = map.values().iterator();
+                    ColumnSlice slice = slices[idx++];
+                    // Note: we specialize the case of start == "" and finish = "" because it is slightly more efficient, but also they have a specific
+                    // meaning (namely, they always extend to the beginning/end of the range).
+                    if (slice.start.remaining() == 0)
+                    {
+                        if (slice.finish.remaining() == 0)
+                            currentSlice = map.values().iterator();
+                        else
+                            currentSlice = map.headMap(slice.finish, true).values().iterator();
+                    }
+                    else if (slice.finish.remaining() == 0)
+                    {
+                        currentSlice = map.tailMap(slice.start, true).values().iterator();
+                    }
                     else
-                        currentSlice = map.headMap(slice.finish, true).values().iterator();
-                }
-                else if (slice.finish.remaining() == 0)
-                {
-                    currentSlice = map.tailMap(slice.start, true).values().iterator();
+                    {
+                        currentSlice = map.subMap(slice.start, true, slice.finish, true).values().iterator();
+                    }
                 }
-                else
-                {
-                    currentSlice = map.subMap(slice.start, true, slice.finish, true).values().iterator();
-                }
-            }
 
-            if (currentSlice.hasNext())
-                return currentSlice.next();
+                if (currentSlice.hasNext())
+                    return currentSlice.next();
 
-            currentSlice = null;
-            return computeNext();
+                currentSlice = null;
+            }
+            return endOfData();
         }
     }
 }


[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by ty...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24e895c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24e895c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24e895c4

Branch: refs/heads/trunk
Commit: 24e895c4c73dcc1f849232c6ae54c73bc16ab831
Parents: 025a635 9dc9185
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Dec 12 11:42:42 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Dec 12 11:42:42 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/cassandra/db/AtomicBTreeColumns.java | 27 ++++++++++----------
 .../cql3/SingleColumnRelationTest.java          | 16 ++++++++++++
 3 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e895c4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 579fd62,6cecf99..5402ad5
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,6 +1,31 @@@
 -2.0.12:
 +2.1.3
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 +Merged from 2.0:
+  * Avoid StackOverflowError when a large list of IN values
+    is used for a clustering column (CASSANDRA-8410)
   * Fix NPE when writetime() or ttl() calls are wrapped by
     another function call (CASSANDRA-8451)
   * Fix NPE after dropping a keyspace (CASSANDRA-8332)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e895c4/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 372ce5c,0000000..dc2b5ee
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@@ -1,558 -1,0 +1,559 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.util.AbstractCollection;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Comparator;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 +
 +import com.google.common.base.Function;
 +import com.google.common.base.Functions;
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.collect.Iterators;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.composites.Composite;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.btree.BTree;
 +import org.apache.cassandra.utils.btree.UpdateFunction;
 +import org.apache.cassandra.utils.concurrent.Locks;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.HeapAllocator;
 +import org.apache.cassandra.utils.memory.MemtableAllocator;
 +import org.apache.cassandra.utils.memory.NativePool;
 +
 +import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
 +
 +/**
 + * A thread-safe and atomic ISortedColumns implementation.
 + * Operations (in particular addAll) on this implemenation are atomic and
 + * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
 + * other thread can see the state where only parts but not all columns have
 + * been added.
 + * <p/>
 + * WARNING: removing element through getSortedColumns().iterator() is *not* supported
 + */
 +public class AtomicBTreeColumns extends ColumnFamily
 +{
 +    static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
 +            + ObjectSizes.measure(new Holder(null, null));
 +
 +    // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
 +    private static final int TRACKER_NEVER_WASTED = 0;
 +    private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
 +
 +    // The granularity with which we track wasted allocation/work; we round up
 +    private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
 +    // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below)
 +    private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
 +    private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES);
 +    // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior
 +    private static final int CLOCK_SHIFT = 17;
 +    // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
 +
 +    /**
 +     * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by
 +     * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s
 +     *
 +     * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes
 +     * we increment the current value if it is within this window, and set it to the min of the window plus our waste
 +     * otherwise.
 +     */
 +    private volatile int wasteTracker = TRACKER_NEVER_WASTED;
 +
 +    private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker");
 +
 +    private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
 +    {
 +        public CellName apply(Cell column)
 +        {
 +            return column.name();
 +        }
 +    };
 +
 +    public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>()
 +    {
 +        public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed, int initialCapacity)
 +        {
 +            if (insertReversed)
 +                throw new IllegalArgumentException();
 +            return new AtomicBTreeColumns(metadata);
 +        }
 +    };
 +
 +    private static final DeletionInfo LIVE = DeletionInfo.live();
 +    // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
 +    // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
 +    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
 +
 +    private volatile Holder ref;
 +
 +    private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref");
 +
 +    private AtomicBTreeColumns(CFMetaData metadata)
 +    {
 +        this(metadata, EMPTY);
 +    }
 +
 +    private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
 +    {
 +        super(metadata);
 +        this.ref = holder;
 +    }
 +
 +    public Factory getFactory()
 +    {
 +        return factory;
 +    }
 +
 +    public ColumnFamily cloneMe()
 +    {
 +        return new AtomicBTreeColumns(metadata, ref);
 +    }
 +
 +    public DeletionInfo deletionInfo()
 +    {
 +        return ref.deletionInfo;
 +    }
 +
 +    public void delete(DeletionTime delTime)
 +    {
 +        delete(new DeletionInfo(delTime));
 +    }
 +
 +    protected void delete(RangeTombstone tombstone)
 +    {
 +        delete(new DeletionInfo(tombstone, getComparator()));
 +    }
 +
 +    public void delete(DeletionInfo info)
 +    {
 +        if (info.isLive())
 +            return;
 +
 +        // Keeping deletion info for max markedForDeleteAt value
 +        while (true)
 +        {
 +            Holder current = ref;
 +            DeletionInfo curDelInfo = current.deletionInfo;
 +            DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? curDelInfo.copy().add(info) : curDelInfo;
 +            if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
 +                break;
 +        }
 +    }
 +
 +    public void setDeletionInfo(DeletionInfo newInfo)
 +    {
 +        ref = ref.with(newInfo);
 +    }
 +
 +    public void purgeTombstones(int gcBefore)
 +    {
 +        while (true)
 +        {
 +            Holder current = ref;
 +            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
 +                break;
 +
 +            DeletionInfo purgedInfo = current.deletionInfo.copy();
 +            purgedInfo.purge(gcBefore);
 +            if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
 +                break;
 +        }
 +    }
 +
 +    /**
 +     * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it.
 +     *
 +     * @return the difference in size seen after merging the given columns
 +     */
 +    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
 +    {
 +        ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
 +        DeletionInfo inputDeletionInfoCopy = null;
 +
 +        boolean monitorOwned = false;
 +        try
 +        {
 +            if (usePessimisticLocking())
 +            {
 +                Locks.monitorEnterUnsafe(this);
 +                monitorOwned = true;
 +            }
 +            while (true)
 +            {
 +                Holder current = ref;
 +                updater.ref = current;
 +                updater.reset();
 +
 +                DeletionInfo deletionInfo;
 +                if (cm.deletionInfo().mayModify(current.deletionInfo))
 +                {
 +                    if (inputDeletionInfoCopy == null)
 +                        inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance);
 +
 +                    deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
 +                    updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
 +                }
 +                else
 +                {
 +                    deletionInfo = current.deletionInfo;
 +                }
 +
 +                Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater);
 +
 +                if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo)))
 +                {
 +                    indexer.updateRowLevelIndexes();
 +                    updater.finish();
 +                    return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
 +                }
 +                else if (!monitorOwned)
 +                {
 +                    boolean shouldLock = usePessimisticLocking();
 +                    if (!shouldLock)
 +                    {
 +                        shouldLock = updateWastedAllocationTracker(updater.heapSize);
 +                    }
 +                    if (shouldLock)
 +                    {
 +                        Locks.monitorEnterUnsafe(this);
 +                        monitorOwned = true;
 +                    }
 +                }
 +            }
 +        }
 +        finally
 +        {
 +            if (monitorOwned)
 +                Locks.monitorExitUnsafe(this);
 +        }
 +    }
 +
 +    boolean usePessimisticLocking()
 +    {
 +        return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
 +    }
 +
 +    /**
 +     * Update the wasted allocation tracker state based on newly wasted allocation information
 +     *
 +     * @param wastedBytes the number of bytes wasted by this thread
 +     * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached
 +     */
 +    private boolean updateWastedAllocationTracker(long wastedBytes) {
 +        // Early check for huge allocation that exceeds the limit
 +        if (wastedBytes < EXCESS_WASTE_BYTES)
 +        {
 +            // We round up to ensure work < granularity are still accounted for
 +            int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
 +
 +            int oldTrackerValue;
 +            while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker))
 +            {
 +                // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap)
 +                int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
 +                int delta = oldTrackerValue - time;
 +                if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET)
 +                    delta = -EXCESS_WASTE_OFFSET;
 +                delta += wastedAllocation;
 +                if (delta >= 0)
 +                    break;
 +                if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta)))
 +                    return false;
 +            }
 +        }
 +        // We have definitely reached our waste limit so set the state if it isn't already
 +        wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
 +        // And tell the caller to proceed with pessimistic locking
 +        return true;
 +    }
 +
 +    private static int avoidReservedValues(int wasteTracker)
 +    {
 +        if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING)
 +            return wasteTracker + 1;
 +        return wasteTracker;
 +    }
 +
 +    // no particular reason not to implement these next methods, we just haven't needed them yet
 +
 +    public void addColumn(Cell column)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void addAll(ColumnFamily cf)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public void clear()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    public Cell getColumn(CellName name)
 +    {
 +        return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
 +    }
 +
 +    private Comparator<Object> asymmetricComparator()
 +    {
 +        return metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof NativePool);
 +    }
 +
 +    public Iterable<CellName> getColumnNames()
 +    {
 +        return collection(false, NAME);
 +    }
 +
 +    public Collection<Cell> getSortedColumns()
 +    {
 +        return collection(true, Functions.<Cell>identity());
 +    }
 +
 +    public Collection<Cell> getReverseSortedColumns()
 +    {
 +        return collection(false, Functions.<Cell>identity());
 +    }
 +
 +    private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f)
 +    {
 +        final Holder ref = this.ref;
 +        return new AbstractCollection<V>()
 +        {
 +            public Iterator<V> iterator()
 +            {
 +                return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f);
 +            }
 +
 +            public int size()
 +            {
 +                return BTree.slice(ref.tree, true).count();
 +            }
 +        };
 +    }
 +
 +    public int getColumnCount()
 +    {
 +        return BTree.slice(ref.tree, true).count();
 +    }
 +
 +    public boolean hasColumns()
 +    {
 +        return !BTree.isEmpty(ref.tree);
 +    }
 +
 +    public Iterator<Cell> iterator(ColumnSlice[] slices)
 +    {
 +        return slices.length == 1
 +             ? slice(ref.tree, asymmetricComparator(), slices[0].start, slices[0].finish, true)
 +             : new SliceIterator(ref.tree, asymmetricComparator(), true, slices);
 +    }
 +
 +    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
 +    {
 +        return slices.length == 1
 +             ? slice(ref.tree, asymmetricComparator(), slices[0].finish, slices[0].start, false)
 +             : new SliceIterator(ref.tree, asymmetricComparator(), false, slices);
 +    }
 +
 +    public boolean isInsertReversed()
 +    {
 +        return false;
 +    }
 +
 +    private static final class Holder
 +    {
 +        final DeletionInfo deletionInfo;
 +        // the btree of columns
 +        final Object[] tree;
 +
 +        Holder(Object[] tree, DeletionInfo deletionInfo)
 +        {
 +            this.tree = tree;
 +            this.deletionInfo = deletionInfo;
 +        }
 +
 +        Holder with(DeletionInfo info)
 +        {
 +            return new Holder(this.tree, info);
 +        }
 +    }
 +
 +    // the function we provide to the btree utilities to perform any column replacements
 +    private static final class ColumnUpdater implements UpdateFunction<Cell>
 +    {
 +        final AtomicBTreeColumns updating;
 +        final CFMetaData metadata;
 +        final MemtableAllocator allocator;
 +        final OpOrder.Group writeOp;
 +        final Updater indexer;
 +        Holder ref;
 +        long dataSize;
 +        long heapSize;
 +        long colUpdateTimeDelta = Long.MAX_VALUE;
 +        final MemtableAllocator.DataReclaimer reclaimer;
 +        List<Cell> inserted; // TODO: replace with walk of aborted BTree
 +
 +        private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
 +        {
 +            this.updating = updating;
 +            this.allocator = allocator;
 +            this.writeOp = writeOp;
 +            this.indexer = indexer;
 +            this.metadata = metadata;
 +            this.reclaimer = allocator.reclaimer();
 +        }
 +
 +        public Cell apply(Cell insert)
 +        {
 +            indexer.insert(insert);
 +            insert = insert.localCopy(metadata, allocator, writeOp);
 +            this.dataSize += insert.cellDataSize();
 +            this.heapSize += insert.unsharedHeapSizeExcludingData();
 +            if (inserted == null)
 +                inserted = new ArrayList<>();
 +            inserted.add(insert);
 +            return insert;
 +        }
 +
 +        public Cell apply(Cell existing, Cell update)
 +        {
 +            Cell reconciled = existing.reconcile(update);
 +            indexer.update(existing, reconciled);
 +            if (existing != reconciled)
 +            {
 +                reconciled = reconciled.localCopy(metadata, allocator, writeOp);
 +                dataSize += reconciled.cellDataSize() - existing.cellDataSize();
 +                heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
 +                if (inserted == null)
 +                    inserted = new ArrayList<>();
 +                inserted.add(reconciled);
 +                discard(existing);
 +                //Getting the minimum delta for an update containing multiple columns
 +                colUpdateTimeDelta =  Math.min(Math.abs(existing.timestamp()  - update.timestamp()), colUpdateTimeDelta);
 +            }
 +            return reconciled;
 +        }
 +
 +        protected void reset()
 +        {
 +            this.dataSize = 0;
 +            this.heapSize = 0;
 +            if (inserted != null)
 +            {
 +                for (Cell cell : inserted)
 +                    abort(cell);
 +                inserted.clear();
 +            }
 +            reclaimer.cancel();
 +        }
 +
 +        protected void abort(Cell abort)
 +        {
 +            reclaimer.reclaimImmediately(abort);
 +        }
 +
 +        protected void discard(Cell discard)
 +        {
 +            reclaimer.reclaim(discard);
 +        }
 +
 +        public boolean abortEarly()
 +        {
 +            return updating.ref != ref;
 +        }
 +
 +        public void allocated(long heapSize)
 +        {
 +            this.heapSize += heapSize;
 +        }
 +
 +        protected void finish()
 +        {
 +            allocator.onHeap().allocate(heapSize, writeOp);
 +            reclaimer.commit();
 +        }
 +    }
 +
 +    private static class SliceIterator extends AbstractIterator<Cell>
 +    {
 +        private final Object[] btree;
 +        private final boolean forwards;
 +        private final Comparator<Object> comparator;
 +        private final ColumnSlice[] slices;
 +
 +        private int idx = 0;
 +        private Iterator<Cell> currentSlice;
 +
 +        SliceIterator(Object[] btree, Comparator<Object> comparator, boolean forwards, ColumnSlice[] slices)
 +        {
 +            this.btree = btree;
 +            this.comparator = comparator;
 +            this.slices = slices;
 +            this.forwards = forwards;
 +        }
 +
 +        protected Cell computeNext()
 +        {
-             if (currentSlice == null)
++            while (currentSlice != null || idx < slices.length)
 +            {
-                 if (idx >= slices.length)
-                     return endOfData();
++                if (currentSlice == null)
++                {
++                    ColumnSlice slice = slices[idx++];
++                    if (forwards)
++                        currentSlice = slice(btree, comparator, slice.start, slice.finish, true);
++                    else
++                        currentSlice = slice(btree, comparator, slice.finish, slice.start, false);
++                }
 +
-                 ColumnSlice slice = slices[idx++];
-                 if (forwards)
-                     currentSlice = slice(btree, comparator, slice.start, slice.finish, true);
-                 else
-                     currentSlice = slice(btree, comparator, slice.finish, slice.start, false);
-             }
++                if (currentSlice.hasNext())
++                    return currentSlice.next();
 +
-             if (currentSlice.hasNext())
-                 return currentSlice.next();
++                currentSlice = null;
++            }
 +
-             currentSlice = null;
-             return computeNext();
++            return endOfData();
 +        }
 +    }
 +
 +    private static Iterator<Cell> slice(Object[] btree, Comparator<Object> comparator, Composite start, Composite finish, boolean forwards)
 +    {
 +        return BTree.slice(btree,
 +                           comparator,
 +                           start.isEmpty() ? null : start,
 +                           true,
 +                           finish.isEmpty() ? null : finish,
 +                           true,
 +                           forwards);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e895c4/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
index 120c780,0000000..2ad4bda
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
@@@ -1,51 -1,0 +1,67 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cql3;
 +
 +import org.junit.Test;
 +
++import java.util.ArrayList;
++import java.util.List;
++
 +public class SingleColumnRelationTest extends CQLTester
 +{
 +    @Test
 +    public void testInvalidCollectionEqualityRelation() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int PRIMARY KEY, b set<int>, c list<int>, d map<int, int>)");
 +        createIndex("CREATE INDEX ON %s (b)");
 +        createIndex("CREATE INDEX ON %s (c)");
 +        createIndex("CREATE INDEX ON %s (d)");
 +
 +        assertInvalid("SELECT * FROM %s WHERE a = 0 AND b=?", set(0));
 +        assertInvalid("SELECT * FROM %s WHERE a = 0 AND c=?", list(0));
 +        assertInvalid("SELECT * FROM %s WHERE a = 0 AND d=?", map(0, 0));
 +    }
 +
 +    @Test
 +    public void testInvalidCollectionNonEQRelation() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a int PRIMARY KEY, b set<int>, c int)");
 +        createIndex("CREATE INDEX ON %s (c)");
 +        execute("INSERT INTO %s (a, b, c) VALUES (0, {0}, 0)");
 +
 +        // non-EQ operators
 +        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b > ?", set(0));
 +        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b >= ?", set(0));
 +        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b < ?", set(0));
 +        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b <= ?", set(0));
 +        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b IN (?)", set(0));
 +    }
++
++    @Test
++    public void testLargeClusteringINValues() throws Throwable
++    {
++        createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c))");
++        execute("INSERT INTO %s (k, c, v) VALUES (0, 0, 0)");
++        List<Integer> inValues = new ArrayList<>(10000);
++        for (int i = 0; i < 10000; i++)
++            inValues.add(i);
++        assertRows(execute("SELECT * FROM %s WHERE k=? AND c IN ?", 0, inValues),
++                row(0, 0, 0)
++        );
++    }
 +}


[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ty...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4125ca0a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4125ca0a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4125ca0a

Branch: refs/heads/trunk
Commit: 4125ca0aaa73cd7e1d52718c7ec1a145696cc957
Parents: e530f42 24e895c
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Dec 12 11:43:02 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Dec 12 11:43:02 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/cassandra/db/AtomicBTreeColumns.java | 27 ++++++++++----------
 .../cql3/SingleColumnRelationTest.java          | 16 ++++++++++++
 3 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4125ca0a/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4125ca0a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4125ca0a/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
index e6412a3,2ad4bda..0fd300b
--- a/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
@@@ -17,10 -17,11 +17,13 @@@
   */
  package org.apache.cassandra.cql3;
  
 +import java.util.Arrays;
 +
  import org.junit.Test;
  
+ import java.util.ArrayList;
+ import java.util.List;
+ 
  public class SingleColumnRelationTest extends CQLTester
  {
      @Test
@@@ -47,336 -45,23 +50,349 @@@
          execute("INSERT INTO %s (a, b, c) VALUES (0, {0}, 0)");
  
          // non-EQ operators
 -        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b > ?", set(0));
 -        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b >= ?", set(0));
 -        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b < ?", set(0));
 -        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b <= ?", set(0));
 -        assertInvalid("SELECT * FROM %s WHERE c = 0 AND b IN (?)", set(0));
 +        assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '>' relation",
 +                             "SELECT * FROM %s WHERE c = 0 AND b > ?", set(0));
 +        assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '>=' relation",
 +                             "SELECT * FROM %s WHERE c = 0 AND b >= ?", set(0));
 +        assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '<' relation",
 +                             "SELECT * FROM %s WHERE c = 0 AND b < ?", set(0));
 +        assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '<=' relation",
 +                             "SELECT * FROM %s WHERE c = 0 AND b <= ?", set(0));
 +        assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a 'IN' relation",
 +                             "SELECT * FROM %s WHERE c = 0 AND b IN (?)", set(0));
 +    }
 +
 +    @Test
 +    public void testClusteringColumnRelations() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key(a, b, c))");
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 5, 1);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 6, 2);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 7, 3);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 8, 4);
 +
 +        testSelectQueriesWithClusteringColumnRelations();
 +    }
 +
 +    @Test
 +    public void testClusteringColumnRelationsWithCompactStorage() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key(a, b, c)) WITH COMPACT STORAGE;");
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 5, 1);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 6, 2);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 7, 3);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 8, 4);
 +
 +        testSelectQueriesWithClusteringColumnRelations();
 +    }
 +
 +    private void testSelectQueriesWithClusteringColumnRelations() throws Throwable
 +    {
 +        assertRows(execute("select * from %s where a in (?, ?)", "first", "second"),
 +                   row("first", 1, 5, 1),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3),
 +                   row("second", 4, 8, 4));
 +
 +        assertRows(execute("select * from %s where a = ? and b = ? and c in (?, ?)", "first", 2, 6, 7),
 +                   row("first", 2, 6, 2));
 +
 +        assertRows(execute("select * from %s where a = ? and b in (?, ?) and c in (?, ?)", "first", 2, 3, 6, 7),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertRows(execute("select * from %s where a = ? and b in (?, ?) and c in (?, ?)", "first", 3, 2, 7, 6),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?)", "first", 7, 6, 3, 2),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertRows(execute("select c, d from %s where a = ? and c in (?, ?) and b in (?, ?)", "first", 7, 6, 3, 2),
 +                   row(6, 2),
 +                   row(7, 3));
 +
 +        assertRows(execute("select c, d from %s where a = ? and c in (?, ?) and b in (?, ?, ?)", "first", 7, 6, 3, 2, 3),
 +                   row(6, 2),
 +                   row(7, 3));
 +
 +        assertRows(execute("select * from %s where a = ? and b in (?, ?) and c = ?", "first", 3, 2, 7),
 +                   row("first", 3, 7, 3));
 +
 +        assertRows(execute("select * from %s where a = ? and b in ? and c in ?",
 +                           "first", Arrays.asList(3, 2), Arrays.asList(7, 6)),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertInvalidMessage("Invalid null value for IN restriction",
 +                             "select * from %s where a = ? and b in ? and c in ?", "first", null, Arrays.asList(7, 6));
 +
 +        assertRows(execute("select * from %s where a = ? and c >= ? and b in (?, ?)", "first", 6, 3, 2),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertRows(execute("select * from %s where a = ? and c > ? and b in (?, ?)", "first", 6, 3, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertRows(execute("select * from %s where a = ? and c <= ? and b in (?, ?)", "first", 6, 3, 2),
 +                   row("first", 2, 6, 2));
 +
 +        assertRows(execute("select * from %s where a = ? and c < ? and b in (?, ?)", "first", 7, 3, 2),
 +                   row("first", 2, 6, 2));
 +//---
 +        assertRows(execute("select * from %s where a = ? and c >= ? and c <= ? and b in (?, ?)", "first", 6, 7, 3, 2),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertRows(execute("select * from %s where a = ? and c > ? and c <= ? and b in (?, ?)", "first", 6, 7, 3, 2),
 +                   row("first", 3, 7, 3));
 +
 +        assertEmpty(execute("select * from %s where a = ? and c > ? and c < ? and b in (?, ?)", "first", 6, 7, 3, 2));
 +
 +        assertInvalidMessage("Column \"c\" cannot be restricted by both an equality and an inequality relation",
 +                             "select * from %s where a = ? and c > ? and c = ? and b in (?, ?)", "first", 6, 7, 3, 2);
 +
 +        assertInvalidMessage("c cannot be restricted by more than one relation if it includes an Equal",
 +                             "select * from %s where a = ? and c = ? and c > ?  and b in (?, ?)", "first", 6, 7, 3, 2);
 +
 +        assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?) order by b DESC",
 +                           "first", 7, 6, 3, 2),
 +                   row("first", 3, 7, 3),
 +                   row("first", 2, 6, 2));
 +
 +        assertInvalidMessage("More than one restriction was found for the start bound on b",
 +                             "select * from %s where a = ? and b > ? and b > ?", "first", 6, 3, 2);
 +
 +        assertInvalidMessage("More than one restriction was found for the end bound on b",
 +                             "select * from %s where a = ? and b < ? and b <= ?", "first", 6, 3, 2);
 +    }
 +
 +    @Test
 +    public void testPartitionKeyColumnRelations() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key((a, b), c))");
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 1, 1);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 2, 2);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 3, 3);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 4, 4);
 +
 +        assertInvalidMessage("Partition KEY part a cannot be restricted by IN relation (only the last part of the partition key can)",
 +                             "select * from %s where a in (?, ?)", "first", "second");
 +        assertInvalidMessage("Partition KEY part a cannot be restricted by IN relation (only the last part of the partition key can)",
 +                             "select * from %s where a in (?, ?) and b in (?, ?)", "first", "second", 2, 3);
 +        assertInvalidMessage("Partition key parts: b must be restricted as other parts are",
 +                             "select * from %s where a = ?", "first");
 +        assertInvalidMessage("b cannot be restricted by more than one relation if it includes a IN",
 +                             "select * from %s where a = ? AND b IN (?, ?) AND b = ?", "first", 2, 2, 3);
 +        assertInvalidMessage("b cannot be restricted by more than one relation if it includes an Equal",
 +                             "select * from %s where a = ? AND b = ? AND b IN (?, ?)", "first", 2, 2, 3);
 +    }
 +
 +    @Test
 +    public void testClusteringColumnRelationsWithClusteringOrder() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key(a, b, c)) WITH CLUSTERING ORDER BY (b DESC);");
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 5, 1);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 6, 2);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 7, 3);
 +        execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 8, 4);
 +
 +        assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?) order by b DESC",
 +                           "first", 7, 6, 3, 2),
 +                   row("first", 3, 7, 3),
 +                   row("first", 2, 6, 2));
 +
 +        assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?) order by b ASC",
 +                           "first", 7, 6, 3, 2),
 +                   row("first", 2, 6, 2),
 +                   row("first", 3, 7, 3));
 +    }
 +
 +    @Test
 +    public void testAllowFilteringWithClusteringColumn() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c))");
 +
 +        execute("INSERT INTO %s (k, c, v) VALUES(?, ?, ?)", 1, 2, 1);
 +        execute("INSERT INTO %s (k, c, v) VALUES(?, ?, ?)", 1, 3, 2);
 +        execute("INSERT INTO %s (k, c, v) VALUES(?, ?, ?)", 2, 2, 3);
 +
 +        // Don't require filtering, always allowed
 +        assertRows(execute("SELECT * FROM %s WHERE k = ?", 1),
 +                   row(1, 2, 1),
 +                   row(1, 3, 2));
 +
 +        assertRows(execute("SELECT * FROM %s WHERE k = ? AND c > ?", 1, 2), row(1, 3, 2));
 +
 +        assertRows(execute("SELECT * FROM %s WHERE k = ? AND c = ?", 1, 2), row(1, 2, 1));
 +
 +        assertRows(execute("SELECT * FROM %s WHERE k = ? ALLOW FILTERING", 1),
 +                   row(1, 2, 1),
 +                   row(1, 3, 2));
 +
 +        assertRows(execute("SELECT * FROM %s WHERE k = ? AND c > ? ALLOW FILTERING", 1, 2), row(1, 3, 2));
 +
 +        assertRows(execute("SELECT * FROM %s WHERE k = ? AND c = ? ALLOW FILTERING", 1, 2), row(1, 2, 1));
 +
 +        // Require filtering, allowed only with ALLOW FILTERING
 +        assertInvalidMessage("Cannot execute this query as it might involve data filtering",
 +                             "SELECT * FROM %s WHERE c = ?", 2);
 +        assertInvalidMessage("Cannot execute this query as it might involve data filtering",
 +                             "SELECT * FROM %s WHERE c > ? AND c <= ?", 2, 4);
 +
 +        assertRows(execute("SELECT * FROM %s WHERE c = ? ALLOW FILTERING", 2),
 +                   row(1, 2, 1),
 +                   row(2, 2, 3));
 +
 +        assertRows(execute("SELECT * FROM %s WHERE c > ? AND c <= ? ALLOW FILTERING", 2, 4), row(1, 3, 2));
 +    }
 +
 +    @Test
 +    public void testAllowFilteringWithIndexedColumn() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)");
 +        createIndex("CREATE INDEX ON %s(a)");
 +
 +        execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 1, 10, 100);
 +        execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 2, 20, 200);
 +        execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 3, 30, 300);
 +        execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 4, 40, 400);
 +
 +        // Don't require filtering, always allowed
 +        assertRows(execute("SELECT * FROM %s WHERE k = ?", 1), row(1, 10, 100));
 +        assertRows(execute("SELECT * FROM %s WHERE a = ?", 20), row(2, 20, 200));
 +        assertRows(execute("SELECT * FROM %s WHERE k = ? ALLOW FILTERING", 1), row(1, 10, 100));
 +        assertRows(execute("SELECT * FROM %s WHERE a = ? ALLOW FILTERING", 20), row(2, 20, 200));
 +
 +        assertInvalid("SELECT * FROM %s WHERE a = ? AND b = ?");
 +        assertRows(execute("SELECT * FROM %s WHERE a = ? AND b = ? ALLOW FILTERING", 20, 200), row(2, 20, 200));
 +    }
 +
 +    @Test
 +    public void testIndexQueriesOnComplexPrimaryKey() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (pk0 int, pk1 int, ck0 int, ck1 int, ck2 int, value int, PRIMARY KEY ((pk0, pk1), ck0, ck1, ck2))");
 +
 +        createIndex("CREATE INDEX ON %s (ck1)");
 +        createIndex("CREATE INDEX ON %s (ck2)");
 +        createIndex("CREATE INDEX ON %s (pk0)");
 +        createIndex("CREATE INDEX ON %s (ck0)");
 +
 +        execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 0, 1, 2, 3, 4, 5);
 +        execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 1, 2, 3, 4, 5, 0);
 +        execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 2, 3, 4, 5, 0, 1);
 +        execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 3, 4, 5, 0, 1, 2);
 +        execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 4, 5, 0, 1, 2, 3);
 +        execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 5, 0, 1, 2, 3, 4);
 +
 +        assertRows(execute("SELECT value FROM %s WHERE pk0 = 2"), row(1));
 +        assertRows(execute("SELECT value FROM %s WHERE ck0 = 0"), row(3));
 +        assertRows(execute("SELECT value FROM %s WHERE pk0 = 3 AND pk1 = 4 AND ck1 = 0"), row(2));
 +        assertRows(execute("SELECT value FROM %s WHERE pk0 = 5 AND pk1 = 0 AND ck0 = 1 AND ck2 = 3 ALLOW FILTERING"), row(4));
 +    }
 +
 +    @Test
 +    public void testIndexOnClusteringColumns() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id1 int, id2 int, author text, time bigint, v1 text, v2 text, PRIMARY KEY ((id1, id2), author, time))");
 +        createIndex("CREATE INDEX ON %s(time)");
 +        createIndex("CREATE INDEX ON %s(id2)");
 +
 +        execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 0, 'bob', 0, 'A', 'A')");
 +        execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 0, 'bob', 1, 'B', 'B')");
 +        execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 1, 'bob', 2, 'C', 'C')");
 +        execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 0, 'tom', 0, 'D', 'D')");
 +        execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 1, 'tom', 1, 'E', 'E')");
 +
 +        assertRows(execute("SELECT v1 FROM %s WHERE time = 1"), row("B"), row("E"));
 +
 +        assertRows(execute("SELECT v1 FROM %s WHERE id2 = 1"), row("C"), row("E"));
 +
 +        assertRows(execute("SELECT v1 FROM %s WHERE id1 = 0 AND id2 = 0 AND author = 'bob' AND time = 0"), row("A"));
 +
 +        // Test for CASSANDRA-8206
 +        execute("UPDATE %s SET v2 = null WHERE id1 = 0 AND id2 = 0 AND author = 'bob' AND time = 1");
 +
 +        assertRows(execute("SELECT v1 FROM %s WHERE id2 = 0"), row("A"), row("B"), row("D"));
 +
 +        assertRows(execute("SELECT v1 FROM %s WHERE time = 1"), row("B"), row("E"));
 +
 +        assertInvalidMessage("IN restrictions are not supported on indexed columns",
 +                             "SELECT v1 FROM %s WHERE id2 = 0 and time IN (1, 2) ALLOW FILTERING");
 +    }
 +
 +    @Test
 +    public void testCompositeIndexWithPrimaryKey() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (blog_id int, time1 int, time2 int, author text, content text, PRIMARY KEY (blog_id, time1, time2))");
 +
 +        createIndex("CREATE INDEX ON %s(author)");
 +
 +        String req = "INSERT INTO %s (blog_id, time1, time2, author, content) VALUES (?, ?, ?, ?, ?)";
 +        execute(req, 1, 0, 0, "foo", "bar1");
 +        execute(req, 1, 0, 1, "foo", "bar2");
 +        execute(req, 2, 1, 0, "foo", "baz");
 +        execute(req, 3, 0, 1, "gux", "qux");
 +
 +        assertRows(execute("SELECT blog_id, content FROM %s WHERE author='foo'"),
 +                   row(1, "bar1"),
 +                   row(1, "bar2"),
 +                   row(2, "baz"));
 +        assertRows(execute("SELECT blog_id, content FROM %s WHERE time1 > 0 AND author='foo' ALLOW FILTERING"), row(2, "baz"));
 +        assertRows(execute("SELECT blog_id, content FROM %s WHERE time1 = 1 AND author='foo' ALLOW FILTERING"), row(2, "baz"));
 +        assertRows(execute("SELECT blog_id, content FROM %s WHERE time1 = 1 AND time2 = 0 AND author='foo' ALLOW FILTERING"),
 +                   row(2, "baz"));
 +        assertEmpty(execute("SELECT content FROM %s WHERE time1 = 1 AND time2 = 1 AND author='foo' ALLOW FILTERING"));
 +        assertEmpty(execute("SELECT content FROM %s WHERE time1 = 1 AND time2 > 0 AND author='foo' ALLOW FILTERING"));
 +
 +        assertInvalidMessage("Cannot execute this query as it might involve data filtering",
 +                             "SELECT content FROM %s WHERE time2 >= 0 AND author='foo'");
 +    }
 +
 +    @Test
 +    public void testRangeQueryOnIndex() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s (id int primary key, row int, setid int);");
 +        createIndex("CREATE INDEX ON %s (setid)");
 +
 +        String q = "INSERT INTO %s (id, row, setid) VALUES (?, ?, ?);";
 +        execute(q, 0, 0, 0);
 +        execute(q, 1, 1, 0);
 +        execute(q, 2, 2, 0);
 +        execute(q, 3, 3, 0);
 +
 +        assertInvalidMessage("Cannot execute this query as it might involve data filtering",
 +                             "SELECT * FROM %s WHERE setid = 0 AND row < 1;");
 +        assertRows(execute("SELECT * FROM %s WHERE setid = 0 AND row < 1 ALLOW FILTERING;"), row(0, 0, 0));
 +    }
 +
 +    @Test
 +    public void testEmptyIN() throws Throwable
 +    {
 +        for (String compactOption : new String[] { "", " WITH COMPACT STORAGE" })
 +        {
 +            createTable("CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))" + compactOption);
 +
 +            for (int i = 0; i <= 2; i++)
 +                for (int j = 0; j <= 2; j++)
 +                    execute("INSERT INTO %s (k1, k2, v) VALUES (?, ?, ?)", i, j, i + j);
 +
 +            assertEmpty(execute("SELECT v FROM %s WHERE k1 IN ()"));
 +            assertEmpty(execute("SELECT v FROM %s WHERE k1 = 0 AND k2 IN ()"));
 +        }
      }
+ 
+     @Test
+     public void testLargeClusteringINValues() throws Throwable
+     {
+         createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c))");
+         execute("INSERT INTO %s (k, c, v) VALUES (0, 0, 0)");
+         List<Integer> inValues = new ArrayList<>(10000);
+         for (int i = 0; i < 10000; i++)
+             inValues.add(i);
+         assertRows(execute("SELECT * FROM %s WHERE k=? AND c IN ?", 0, inValues),
+                 row(0, 0, 0)
+         );
+     }
  }