You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/12 18:43:14 UTC
[1/3] cassandra git commit: Avoid stack overflow on large clustering
IN values
Repository: cassandra
Updated Branches:
refs/heads/trunk e530f4230 -> 4125ca0aa
Avoid stack overflow on large clustering IN values
Patch by Tyler Hobbs; reviewed by Benjamin Lerer for CASSANDRA-8410
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9dc9185f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9dc9185f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9dc9185f
Branch: refs/heads/trunk
Commit: 9dc9185f5c7172915485f713dbbb6b78b22d0f66
Parents: 3f3d0ed
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Dec 12 11:41:06 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Dec 12 11:41:06 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/db/filter/ColumnSlice.java | 48 ++++++++++----------
2 files changed, 26 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dc9185f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc426bb..6cecf99 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.12:
+ * Avoid StackOverflowError when a large list of IN values
+ is used for a clustering column (CASSANDRA-8410)
* Fix NPE when writetime() or ttl() calls are wrapped by
another function call (CASSANDRA-8451)
* Fix NPE after dropping a keyspace (CASSANDRA-8332)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9dc9185f/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 9eff12a..6a9efbb 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@ -130,36 +130,36 @@ public class ColumnSlice
protected Column computeNext()
{
- if (currentSlice == null)
+ while (currentSlice != null || idx < slices.length)
{
- if (idx >= slices.length)
- return endOfData();
-
- ColumnSlice slice = slices[idx++];
- // Note: we specialize the case of start == "" and finish = "" because it is slightly more efficient, but also they have a specific
- // meaning (namely, they always extend to the beginning/end of the range).
- if (slice.start.remaining() == 0)
+ if (currentSlice == null)
{
- if (slice.finish.remaining() == 0)
- currentSlice = map.values().iterator();
+ ColumnSlice slice = slices[idx++];
+ // Note: we specialize the case of start == "" and finish = "" because it is slightly more efficient, but also they have a specific
+ // meaning (namely, they always extend to the beginning/end of the range).
+ if (slice.start.remaining() == 0)
+ {
+ if (slice.finish.remaining() == 0)
+ currentSlice = map.values().iterator();
+ else
+ currentSlice = map.headMap(slice.finish, true).values().iterator();
+ }
+ else if (slice.finish.remaining() == 0)
+ {
+ currentSlice = map.tailMap(slice.start, true).values().iterator();
+ }
else
- currentSlice = map.headMap(slice.finish, true).values().iterator();
- }
- else if (slice.finish.remaining() == 0)
- {
- currentSlice = map.tailMap(slice.start, true).values().iterator();
+ {
+ currentSlice = map.subMap(slice.start, true, slice.finish, true).values().iterator();
+ }
}
- else
- {
- currentSlice = map.subMap(slice.start, true, slice.finish, true).values().iterator();
- }
- }
- if (currentSlice.hasNext())
- return currentSlice.next();
+ if (currentSlice.hasNext())
+ return currentSlice.next();
- currentSlice = null;
- return computeNext();
+ currentSlice = null;
+ }
+ return endOfData();
}
}
}
[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by ty...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24e895c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24e895c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24e895c4
Branch: refs/heads/trunk
Commit: 24e895c4c73dcc1f849232c6ae54c73bc16ab831
Parents: 025a635 9dc9185
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Dec 12 11:42:42 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Dec 12 11:42:42 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../apache/cassandra/db/AtomicBTreeColumns.java | 27 ++++++++++----------
.../cql3/SingleColumnRelationTest.java | 16 ++++++++++++
3 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e895c4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 579fd62,6cecf99..5402ad5
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,6 +1,31 @@@
-2.0.12:
+2.1.3
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
+ * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
+ * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
+ * Reduce maxHintsInProgress (CASSANDRA-8415)
+ * BTree updates may call provided update function twice (CASSANDRA-8018)
+ * Release sstable references after anticompaction (CASSANDRA-8386)
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+ * Log failed host when preparing incremental repair (CASSANDRA-8228)
+Merged from 2.0:
+ * Avoid StackOverflowError when a large list of IN values
+ is used for a clustering column (CASSANDRA-8410)
* Fix NPE when writetime() or ttl() calls are wrapped by
another function call (CASSANDRA-8451)
* Fix NPE after dropping a keyspace (CASSANDRA-8332)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e895c4/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 372ce5c,0000000..dc2b5ee
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@@ -1,558 -1,0 +1,559 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.concurrent.Locks;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+
+import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+
+/**
+ * A thread-safe and atomic ISortedColumns implementation.
+ * Operations (in particular addAll) on this implemenation are atomic and
+ * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
+ * other thread can see the state where only parts but not all columns have
+ * been added.
+ * <p/>
+ * WARNING: removing element through getSortedColumns().iterator() is *not* supported
+ */
+public class AtomicBTreeColumns extends ColumnFamily
+{
+ static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
+ + ObjectSizes.measure(new Holder(null, null));
+
+ // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues)
+ private static final int TRACKER_NEVER_WASTED = 0;
+ private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
+
+ // The granularity with which we track wasted allocation/work; we round up
+ private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
+ // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below)
+ private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
+ private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES);
+ // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior
+ private static final int CLOCK_SHIFT = 17;
+ // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
+
+ /**
+ * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by
+ * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s
+ *
+ * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes
+ * we increment the current value if it is within this window, and set it to the min of the window plus our waste
+ * otherwise.
+ */
+ private volatile int wasteTracker = TRACKER_NEVER_WASTED;
+
+ private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker");
+
+ private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
+ {
+ public CellName apply(Cell column)
+ {
+ return column.name();
+ }
+ };
+
+ public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>()
+ {
+ public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed, int initialCapacity)
+ {
+ if (insertReversed)
+ throw new IllegalArgumentException();
+ return new AtomicBTreeColumns(metadata);
+ }
+ };
+
+ private static final DeletionInfo LIVE = DeletionInfo.live();
+ // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class,
+ // so we can safely alias one DeletionInfo.live() reference and avoid some allocations.
+ private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
+
+ private volatile Holder ref;
+
+ private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref");
+
+ private AtomicBTreeColumns(CFMetaData metadata)
+ {
+ this(metadata, EMPTY);
+ }
+
+ private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
+ {
+ super(metadata);
+ this.ref = holder;
+ }
+
+ public Factory getFactory()
+ {
+ return factory;
+ }
+
+ public ColumnFamily cloneMe()
+ {
+ return new AtomicBTreeColumns(metadata, ref);
+ }
+
+ public DeletionInfo deletionInfo()
+ {
+ return ref.deletionInfo;
+ }
+
+ public void delete(DeletionTime delTime)
+ {
+ delete(new DeletionInfo(delTime));
+ }
+
+ protected void delete(RangeTombstone tombstone)
+ {
+ delete(new DeletionInfo(tombstone, getComparator()));
+ }
+
+ public void delete(DeletionInfo info)
+ {
+ if (info.isLive())
+ return;
+
+ // Keeping deletion info for max markedForDeleteAt value
+ while (true)
+ {
+ Holder current = ref;
+ DeletionInfo curDelInfo = current.deletionInfo;
+ DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? curDelInfo.copy().add(info) : curDelInfo;
+ if (refUpdater.compareAndSet(this, current, current.with(newDelInfo)))
+ break;
+ }
+ }
+
+ public void setDeletionInfo(DeletionInfo newInfo)
+ {
+ ref = ref.with(newInfo);
+ }
+
+ public void purgeTombstones(int gcBefore)
+ {
+ while (true)
+ {
+ Holder current = ref;
+ if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
+ break;
+
+ DeletionInfo purgedInfo = current.deletionInfo.copy();
+ purgedInfo.purge(gcBefore);
+ if (refUpdater.compareAndSet(this, current, current.with(purgedInfo)))
+ break;
+ }
+ }
+
+ /**
+ * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it.
+ *
+ * @return the difference in size seen after merging the given columns
+ */
+ public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ {
+ ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
+ DeletionInfo inputDeletionInfoCopy = null;
+
+ boolean monitorOwned = false;
+ try
+ {
+ if (usePessimisticLocking())
+ {
+ Locks.monitorEnterUnsafe(this);
+ monitorOwned = true;
+ }
+ while (true)
+ {
+ Holder current = ref;
+ updater.ref = current;
+ updater.reset();
+
+ DeletionInfo deletionInfo;
+ if (cm.deletionInfo().mayModify(current.deletionInfo))
+ {
+ if (inputDeletionInfoCopy == null)
+ inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance);
+
+ deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy);
+ updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+ }
+ else
+ {
+ deletionInfo = current.deletionInfo;
+ }
+
+ Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater);
+
+ if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo)))
+ {
+ indexer.updateRowLevelIndexes();
+ updater.finish();
+ return Pair.create(updater.dataSize, updater.colUpdateTimeDelta);
+ }
+ else if (!monitorOwned)
+ {
+ boolean shouldLock = usePessimisticLocking();
+ if (!shouldLock)
+ {
+ shouldLock = updateWastedAllocationTracker(updater.heapSize);
+ }
+ if (shouldLock)
+ {
+ Locks.monitorEnterUnsafe(this);
+ monitorOwned = true;
+ }
+ }
+ }
+ }
+ finally
+ {
+ if (monitorOwned)
+ Locks.monitorExitUnsafe(this);
+ }
+ }
+
+ boolean usePessimisticLocking()
+ {
+ return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
+ }
+
+ /**
+ * Update the wasted allocation tracker state based on newly wasted allocation information
+ *
+ * @param wastedBytes the number of bytes wasted by this thread
+ * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached
+ */
+ private boolean updateWastedAllocationTracker(long wastedBytes) {
+ // Early check for huge allocation that exceeds the limit
+ if (wastedBytes < EXCESS_WASTE_BYTES)
+ {
+ // We round up to ensure work < granularity are still accounted for
+ int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
+
+ int oldTrackerValue;
+ while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker))
+ {
+ // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap)
+ int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
+ int delta = oldTrackerValue - time;
+ if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET)
+ delta = -EXCESS_WASTE_OFFSET;
+ delta += wastedAllocation;
+ if (delta >= 0)
+ break;
+ if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta)))
+ return false;
+ }
+ }
+ // We have definitely reached our waste limit so set the state if it isn't already
+ wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
+ // And tell the caller to proceed with pessimistic locking
+ return true;
+ }
+
+ private static int avoidReservedValues(int wasteTracker)
+ {
+ if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING)
+ return wasteTracker + 1;
+ return wasteTracker;
+ }
+
+ // no particular reason not to implement these next methods, we just haven't needed them yet
+
+ public void addColumn(Cell column)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addAll(ColumnFamily cf)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Cell getColumn(CellName name)
+ {
+ return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
+ }
+
+ private Comparator<Object> asymmetricComparator()
+ {
+ return metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof NativePool);
+ }
+
+ public Iterable<CellName> getColumnNames()
+ {
+ return collection(false, NAME);
+ }
+
+ public Collection<Cell> getSortedColumns()
+ {
+ return collection(true, Functions.<Cell>identity());
+ }
+
+ public Collection<Cell> getReverseSortedColumns()
+ {
+ return collection(false, Functions.<Cell>identity());
+ }
+
+ private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f)
+ {
+ final Holder ref = this.ref;
+ return new AbstractCollection<V>()
+ {
+ public Iterator<V> iterator()
+ {
+ return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f);
+ }
+
+ public int size()
+ {
+ return BTree.slice(ref.tree, true).count();
+ }
+ };
+ }
+
+ public int getColumnCount()
+ {
+ return BTree.slice(ref.tree, true).count();
+ }
+
+ public boolean hasColumns()
+ {
+ return !BTree.isEmpty(ref.tree);
+ }
+
+ public Iterator<Cell> iterator(ColumnSlice[] slices)
+ {
+ return slices.length == 1
+ ? slice(ref.tree, asymmetricComparator(), slices[0].start, slices[0].finish, true)
+ : new SliceIterator(ref.tree, asymmetricComparator(), true, slices);
+ }
+
+ public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
+ {
+ return slices.length == 1
+ ? slice(ref.tree, asymmetricComparator(), slices[0].finish, slices[0].start, false)
+ : new SliceIterator(ref.tree, asymmetricComparator(), false, slices);
+ }
+
+ public boolean isInsertReversed()
+ {
+ return false;
+ }
+
+ private static final class Holder
+ {
+ final DeletionInfo deletionInfo;
+ // the btree of columns
+ final Object[] tree;
+
+ Holder(Object[] tree, DeletionInfo deletionInfo)
+ {
+ this.tree = tree;
+ this.deletionInfo = deletionInfo;
+ }
+
+ Holder with(DeletionInfo info)
+ {
+ return new Holder(this.tree, info);
+ }
+ }
+
+ // the function we provide to the btree utilities to perform any column replacements
+ private static final class ColumnUpdater implements UpdateFunction<Cell>
+ {
+ final AtomicBTreeColumns updating;
+ final CFMetaData metadata;
+ final MemtableAllocator allocator;
+ final OpOrder.Group writeOp;
+ final Updater indexer;
+ Holder ref;
+ long dataSize;
+ long heapSize;
+ long colUpdateTimeDelta = Long.MAX_VALUE;
+ final MemtableAllocator.DataReclaimer reclaimer;
+ List<Cell> inserted; // TODO: replace with walk of aborted BTree
+
+ private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+ {
+ this.updating = updating;
+ this.allocator = allocator;
+ this.writeOp = writeOp;
+ this.indexer = indexer;
+ this.metadata = metadata;
+ this.reclaimer = allocator.reclaimer();
+ }
+
+ public Cell apply(Cell insert)
+ {
+ indexer.insert(insert);
+ insert = insert.localCopy(metadata, allocator, writeOp);
+ this.dataSize += insert.cellDataSize();
+ this.heapSize += insert.unsharedHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(insert);
+ return insert;
+ }
+
+ public Cell apply(Cell existing, Cell update)
+ {
+ Cell reconciled = existing.reconcile(update);
+ indexer.update(existing, reconciled);
+ if (existing != reconciled)
+ {
+ reconciled = reconciled.localCopy(metadata, allocator, writeOp);
+ dataSize += reconciled.cellDataSize() - existing.cellDataSize();
+ heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
+ if (inserted == null)
+ inserted = new ArrayList<>();
+ inserted.add(reconciled);
+ discard(existing);
+ //Getting the minimum delta for an update containing multiple columns
+ colUpdateTimeDelta = Math.min(Math.abs(existing.timestamp() - update.timestamp()), colUpdateTimeDelta);
+ }
+ return reconciled;
+ }
+
+ protected void reset()
+ {
+ this.dataSize = 0;
+ this.heapSize = 0;
+ if (inserted != null)
+ {
+ for (Cell cell : inserted)
+ abort(cell);
+ inserted.clear();
+ }
+ reclaimer.cancel();
+ }
+
+ protected void abort(Cell abort)
+ {
+ reclaimer.reclaimImmediately(abort);
+ }
+
+ protected void discard(Cell discard)
+ {
+ reclaimer.reclaim(discard);
+ }
+
+ public boolean abortEarly()
+ {
+ return updating.ref != ref;
+ }
+
+ public void allocated(long heapSize)
+ {
+ this.heapSize += heapSize;
+ }
+
+ protected void finish()
+ {
+ allocator.onHeap().allocate(heapSize, writeOp);
+ reclaimer.commit();
+ }
+ }
+
+ private static class SliceIterator extends AbstractIterator<Cell>
+ {
+ private final Object[] btree;
+ private final boolean forwards;
+ private final Comparator<Object> comparator;
+ private final ColumnSlice[] slices;
+
+ private int idx = 0;
+ private Iterator<Cell> currentSlice;
+
+ SliceIterator(Object[] btree, Comparator<Object> comparator, boolean forwards, ColumnSlice[] slices)
+ {
+ this.btree = btree;
+ this.comparator = comparator;
+ this.slices = slices;
+ this.forwards = forwards;
+ }
+
+ protected Cell computeNext()
+ {
- if (currentSlice == null)
++ while (currentSlice != null || idx < slices.length)
+ {
- if (idx >= slices.length)
- return endOfData();
++ if (currentSlice == null)
++ {
++ ColumnSlice slice = slices[idx++];
++ if (forwards)
++ currentSlice = slice(btree, comparator, slice.start, slice.finish, true);
++ else
++ currentSlice = slice(btree, comparator, slice.finish, slice.start, false);
++ }
+
- ColumnSlice slice = slices[idx++];
- if (forwards)
- currentSlice = slice(btree, comparator, slice.start, slice.finish, true);
- else
- currentSlice = slice(btree, comparator, slice.finish, slice.start, false);
- }
++ if (currentSlice.hasNext())
++ return currentSlice.next();
+
- if (currentSlice.hasNext())
- return currentSlice.next();
++ currentSlice = null;
++ }
+
- currentSlice = null;
- return computeNext();
++ return endOfData();
+ }
+ }
+
+ private static Iterator<Cell> slice(Object[] btree, Comparator<Object> comparator, Composite start, Composite finish, boolean forwards)
+ {
+ return BTree.slice(btree,
+ comparator,
+ start.isEmpty() ? null : start,
+ true,
+ finish.isEmpty() ? null : finish,
+ true,
+ forwards);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24e895c4/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
index 120c780,0000000..2ad4bda
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
@@@ -1,51 -1,0 +1,67 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import org.junit.Test;
+
++import java.util.ArrayList;
++import java.util.List;
++
+public class SingleColumnRelationTest extends CQLTester
+{
+ @Test
+ public void testInvalidCollectionEqualityRelation() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int PRIMARY KEY, b set<int>, c list<int>, d map<int, int>)");
+ createIndex("CREATE INDEX ON %s (b)");
+ createIndex("CREATE INDEX ON %s (c)");
+ createIndex("CREATE INDEX ON %s (d)");
+
+ assertInvalid("SELECT * FROM %s WHERE a = 0 AND b=?", set(0));
+ assertInvalid("SELECT * FROM %s WHERE a = 0 AND c=?", list(0));
+ assertInvalid("SELECT * FROM %s WHERE a = 0 AND d=?", map(0, 0));
+ }
+
+ @Test
+ public void testInvalidCollectionNonEQRelation() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int PRIMARY KEY, b set<int>, c int)");
+ createIndex("CREATE INDEX ON %s (c)");
+ execute("INSERT INTO %s (a, b, c) VALUES (0, {0}, 0)");
+
+ // non-EQ operators
+ assertInvalid("SELECT * FROM %s WHERE c = 0 AND b > ?", set(0));
+ assertInvalid("SELECT * FROM %s WHERE c = 0 AND b >= ?", set(0));
+ assertInvalid("SELECT * FROM %s WHERE c = 0 AND b < ?", set(0));
+ assertInvalid("SELECT * FROM %s WHERE c = 0 AND b <= ?", set(0));
+ assertInvalid("SELECT * FROM %s WHERE c = 0 AND b IN (?)", set(0));
+ }
++
++ @Test
++ public void testLargeClusteringINValues() throws Throwable
++ {
++ createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c))");
++ execute("INSERT INTO %s (k, c, v) VALUES (0, 0, 0)");
++ List<Integer> inValues = new ArrayList<>(10000);
++ for (int i = 0; i < 10000; i++)
++ inValues.add(i);
++ assertRows(execute("SELECT * FROM %s WHERE k=? AND c IN ?", 0, inValues),
++ row(0, 0, 0)
++ );
++ }
+}
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ty...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4125ca0a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4125ca0a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4125ca0a
Branch: refs/heads/trunk
Commit: 4125ca0aaa73cd7e1d52718c7ec1a145696cc957
Parents: e530f42 24e895c
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Fri Dec 12 11:43:02 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Fri Dec 12 11:43:02 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../apache/cassandra/db/AtomicBTreeColumns.java | 27 ++++++++++----------
.../cql3/SingleColumnRelationTest.java | 16 ++++++++++++
3 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4125ca0a/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4125ca0a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4125ca0a/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
index e6412a3,2ad4bda..0fd300b
--- a/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
@@@ -17,10 -17,11 +17,13 @@@
*/
package org.apache.cassandra.cql3;
+import java.util.Arrays;
+
import org.junit.Test;
+ import java.util.ArrayList;
+ import java.util.List;
+
public class SingleColumnRelationTest extends CQLTester
{
@Test
@@@ -47,336 -45,23 +50,349 @@@
execute("INSERT INTO %s (a, b, c) VALUES (0, {0}, 0)");
// non-EQ operators
- assertInvalid("SELECT * FROM %s WHERE c = 0 AND b > ?", set(0));
- assertInvalid("SELECT * FROM %s WHERE c = 0 AND b >= ?", set(0));
- assertInvalid("SELECT * FROM %s WHERE c = 0 AND b < ?", set(0));
- assertInvalid("SELECT * FROM %s WHERE c = 0 AND b <= ?", set(0));
- assertInvalid("SELECT * FROM %s WHERE c = 0 AND b IN (?)", set(0));
+ assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '>' relation",
+ "SELECT * FROM %s WHERE c = 0 AND b > ?", set(0));
+ assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '>=' relation",
+ "SELECT * FROM %s WHERE c = 0 AND b >= ?", set(0));
+ assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '<' relation",
+ "SELECT * FROM %s WHERE c = 0 AND b < ?", set(0));
+ assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a '<=' relation",
+ "SELECT * FROM %s WHERE c = 0 AND b <= ?", set(0));
+ assertInvalidMessage("Collection column 'b' (set<int>) cannot be restricted by a 'IN' relation",
+ "SELECT * FROM %s WHERE c = 0 AND b IN (?)", set(0));
+ }
+
+ @Test
+ public void testClusteringColumnRelations() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key(a, b, c))");
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 5, 1);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 6, 2);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 7, 3);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 8, 4);
+
+ testSelectQueriesWithClusteringColumnRelations();
+ }
+
+ @Test
+ public void testClusteringColumnRelationsWithCompactStorage() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key(a, b, c)) WITH COMPACT STORAGE;");
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 5, 1);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 6, 2);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 7, 3);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 8, 4);
+
+ testSelectQueriesWithClusteringColumnRelations();
+ }
+
+ private void testSelectQueriesWithClusteringColumnRelations() throws Throwable
+ {
+ assertRows(execute("select * from %s where a in (?, ?)", "first", "second"),
+ row("first", 1, 5, 1),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3),
+ row("second", 4, 8, 4));
+
+ assertRows(execute("select * from %s where a = ? and b = ? and c in (?, ?)", "first", 2, 6, 7),
+ row("first", 2, 6, 2));
+
+ assertRows(execute("select * from %s where a = ? and b in (?, ?) and c in (?, ?)", "first", 2, 3, 6, 7),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3));
+
+ assertRows(execute("select * from %s where a = ? and b in (?, ?) and c in (?, ?)", "first", 3, 2, 7, 6),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3));
+
+ assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?)", "first", 7, 6, 3, 2),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3));
+
+ assertRows(execute("select c, d from %s where a = ? and c in (?, ?) and b in (?, ?)", "first", 7, 6, 3, 2),
+ row(6, 2),
+ row(7, 3));
+
+ assertRows(execute("select c, d from %s where a = ? and c in (?, ?) and b in (?, ?, ?)", "first", 7, 6, 3, 2, 3),
+ row(6, 2),
+ row(7, 3));
+
+ assertRows(execute("select * from %s where a = ? and b in (?, ?) and c = ?", "first", 3, 2, 7),
+ row("first", 3, 7, 3));
+
+ assertRows(execute("select * from %s where a = ? and b in ? and c in ?",
+ "first", Arrays.asList(3, 2), Arrays.asList(7, 6)),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3));
+
+ assertInvalidMessage("Invalid null value for IN restriction",
+ "select * from %s where a = ? and b in ? and c in ?", "first", null, Arrays.asList(7, 6));
+
+ assertRows(execute("select * from %s where a = ? and c >= ? and b in (?, ?)", "first", 6, 3, 2),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3));
+
+ assertRows(execute("select * from %s where a = ? and c > ? and b in (?, ?)", "first", 6, 3, 2),
+ row("first", 3, 7, 3));
+
+ assertRows(execute("select * from %s where a = ? and c <= ? and b in (?, ?)", "first", 6, 3, 2),
+ row("first", 2, 6, 2));
+
+ assertRows(execute("select * from %s where a = ? and c < ? and b in (?, ?)", "first", 7, 3, 2),
+ row("first", 2, 6, 2));
+//---
+ assertRows(execute("select * from %s where a = ? and c >= ? and c <= ? and b in (?, ?)", "first", 6, 7, 3, 2),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3));
+
+ assertRows(execute("select * from %s where a = ? and c > ? and c <= ? and b in (?, ?)", "first", 6, 7, 3, 2),
+ row("first", 3, 7, 3));
+
+ assertEmpty(execute("select * from %s where a = ? and c > ? and c < ? and b in (?, ?)", "first", 6, 7, 3, 2));
+
+ assertInvalidMessage("Column \"c\" cannot be restricted by both an equality and an inequality relation",
+ "select * from %s where a = ? and c > ? and c = ? and b in (?, ?)", "first", 6, 7, 3, 2);
+
+ assertInvalidMessage("c cannot be restricted by more than one relation if it includes an Equal",
+ "select * from %s where a = ? and c = ? and c > ? and b in (?, ?)", "first", 6, 7, 3, 2);
+
+ assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?) order by b DESC",
+ "first", 7, 6, 3, 2),
+ row("first", 3, 7, 3),
+ row("first", 2, 6, 2));
+
+ assertInvalidMessage("More than one restriction was found for the start bound on b",
+ "select * from %s where a = ? and b > ? and b > ?", "first", 6, 3, 2);
+
+ assertInvalidMessage("More than one restriction was found for the end bound on b",
+ "select * from %s where a = ? and b < ? and b <= ?", "first", 6, 3, 2);
+ }
+
+ @Test
+ public void testPartitionKeyColumnRelations() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key((a, b), c))");
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 1, 1);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 2, 2);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 3, 3);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 4, 4);
+
+ assertInvalidMessage("Partition KEY part a cannot be restricted by IN relation (only the last part of the partition key can)",
+ "select * from %s where a in (?, ?)", "first", "second");
+ assertInvalidMessage("Partition KEY part a cannot be restricted by IN relation (only the last part of the partition key can)",
+ "select * from %s where a in (?, ?) and b in (?, ?)", "first", "second", 2, 3);
+ assertInvalidMessage("Partition key parts: b must be restricted as other parts are",
+ "select * from %s where a = ?", "first");
+ assertInvalidMessage("b cannot be restricted by more than one relation if it includes a IN",
+ "select * from %s where a = ? AND b IN (?, ?) AND b = ?", "first", 2, 2, 3);
+ assertInvalidMessage("b cannot be restricted by more than one relation if it includes an Equal",
+ "select * from %s where a = ? AND b = ? AND b IN (?, ?)", "first", 2, 2, 3);
+ }
+
+ @Test
+ public void testClusteringColumnRelationsWithClusteringOrder() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a text, b int, c int, d int, primary key(a, b, c)) WITH CLUSTERING ORDER BY (b DESC);");
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 1, 5, 1);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 2, 6, 2);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "first", 3, 7, 3);
+ execute("insert into %s (a, b, c, d) values (?, ?, ?, ?)", "second", 4, 8, 4);
+
+ assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?) order by b DESC",
+ "first", 7, 6, 3, 2),
+ row("first", 3, 7, 3),
+ row("first", 2, 6, 2));
+
+ assertRows(execute("select * from %s where a = ? and c in (?, ?) and b in (?, ?) order by b ASC",
+ "first", 7, 6, 3, 2),
+ row("first", 2, 6, 2),
+ row("first", 3, 7, 3));
+ }
+
+ @Test
+ public void testAllowFilteringWithClusteringColumn() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c))");
+
+ execute("INSERT INTO %s (k, c, v) VALUES(?, ?, ?)", 1, 2, 1);
+ execute("INSERT INTO %s (k, c, v) VALUES(?, ?, ?)", 1, 3, 2);
+ execute("INSERT INTO %s (k, c, v) VALUES(?, ?, ?)", 2, 2, 3);
+
+ // Don't require filtering, always allowed
+ assertRows(execute("SELECT * FROM %s WHERE k = ?", 1),
+ row(1, 2, 1),
+ row(1, 3, 2));
+
+ assertRows(execute("SELECT * FROM %s WHERE k = ? AND c > ?", 1, 2), row(1, 3, 2));
+
+ assertRows(execute("SELECT * FROM %s WHERE k = ? AND c = ?", 1, 2), row(1, 2, 1));
+
+ assertRows(execute("SELECT * FROM %s WHERE k = ? ALLOW FILTERING", 1),
+ row(1, 2, 1),
+ row(1, 3, 2));
+
+ assertRows(execute("SELECT * FROM %s WHERE k = ? AND c > ? ALLOW FILTERING", 1, 2), row(1, 3, 2));
+
+ assertRows(execute("SELECT * FROM %s WHERE k = ? AND c = ? ALLOW FILTERING", 1, 2), row(1, 2, 1));
+
+ // Require filtering, allowed only with ALLOW FILTERING
+ assertInvalidMessage("Cannot execute this query as it might involve data filtering",
+ "SELECT * FROM %s WHERE c = ?", 2);
+ assertInvalidMessage("Cannot execute this query as it might involve data filtering",
+ "SELECT * FROM %s WHERE c > ? AND c <= ?", 2, 4);
+
+ assertRows(execute("SELECT * FROM %s WHERE c = ? ALLOW FILTERING", 2),
+ row(1, 2, 1),
+ row(2, 2, 3));
+
+ assertRows(execute("SELECT * FROM %s WHERE c > ? AND c <= ? ALLOW FILTERING", 2, 4), row(1, 3, 2));
+ }
+
+ @Test
+ public void testAllowFilteringWithIndexedColumn() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)");
+ createIndex("CREATE INDEX ON %s(a)");
+
+ execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 1, 10, 100);
+ execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 2, 20, 200);
+ execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 3, 30, 300);
+ execute("INSERT INTO %s(k, a, b) VALUES(?, ?, ?)", 4, 40, 400);
+
+ // Don't require filtering, always allowed
+ assertRows(execute("SELECT * FROM %s WHERE k = ?", 1), row(1, 10, 100));
+ assertRows(execute("SELECT * FROM %s WHERE a = ?", 20), row(2, 20, 200));
+ assertRows(execute("SELECT * FROM %s WHERE k = ? ALLOW FILTERING", 1), row(1, 10, 100));
+ assertRows(execute("SELECT * FROM %s WHERE a = ? ALLOW FILTERING", 20), row(2, 20, 200));
+
+ assertInvalid("SELECT * FROM %s WHERE a = ? AND b = ?");
+ assertRows(execute("SELECT * FROM %s WHERE a = ? AND b = ? ALLOW FILTERING", 20, 200), row(2, 20, 200));
+ }
+
+ @Test
+ public void testIndexQueriesOnComplexPrimaryKey() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk0 int, pk1 int, ck0 int, ck1 int, ck2 int, value int, PRIMARY KEY ((pk0, pk1), ck0, ck1, ck2))");
+
+ createIndex("CREATE INDEX ON %s (ck1)");
+ createIndex("CREATE INDEX ON %s (ck2)");
+ createIndex("CREATE INDEX ON %s (pk0)");
+ createIndex("CREATE INDEX ON %s (ck0)");
+
+ execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 0, 1, 2, 3, 4, 5);
+ execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 1, 2, 3, 4, 5, 0);
+ execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 2, 3, 4, 5, 0, 1);
+ execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 3, 4, 5, 0, 1, 2);
+ execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 4, 5, 0, 1, 2, 3);
+ execute("INSERT INTO %s (pk0, pk1, ck0, ck1, ck2, value) VALUES (?, ?, ?, ?, ?, ?)", 5, 0, 1, 2, 3, 4);
+
+ assertRows(execute("SELECT value FROM %s WHERE pk0 = 2"), row(1));
+ assertRows(execute("SELECT value FROM %s WHERE ck0 = 0"), row(3));
+ assertRows(execute("SELECT value FROM %s WHERE pk0 = 3 AND pk1 = 4 AND ck1 = 0"), row(2));
+ assertRows(execute("SELECT value FROM %s WHERE pk0 = 5 AND pk1 = 0 AND ck0 = 1 AND ck2 = 3 ALLOW FILTERING"), row(4));
+ }
+
+ @Test
+ public void testIndexOnClusteringColumns() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id1 int, id2 int, author text, time bigint, v1 text, v2 text, PRIMARY KEY ((id1, id2), author, time))");
+ createIndex("CREATE INDEX ON %s(time)");
+ createIndex("CREATE INDEX ON %s(id2)");
+
+ execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 0, 'bob', 0, 'A', 'A')");
+ execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 0, 'bob', 1, 'B', 'B')");
+ execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 1, 'bob', 2, 'C', 'C')");
+ execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 0, 'tom', 0, 'D', 'D')");
+ execute("INSERT INTO %s(id1, id2, author, time, v1, v2) VALUES(0, 1, 'tom', 1, 'E', 'E')");
+
+ assertRows(execute("SELECT v1 FROM %s WHERE time = 1"), row("B"), row("E"));
+
+ assertRows(execute("SELECT v1 FROM %s WHERE id2 = 1"), row("C"), row("E"));
+
+ assertRows(execute("SELECT v1 FROM %s WHERE id1 = 0 AND id2 = 0 AND author = 'bob' AND time = 0"), row("A"));
+
+ // Test for CASSANDRA-8206
+ execute("UPDATE %s SET v2 = null WHERE id1 = 0 AND id2 = 0 AND author = 'bob' AND time = 1");
+
+ assertRows(execute("SELECT v1 FROM %s WHERE id2 = 0"), row("A"), row("B"), row("D"));
+
+ assertRows(execute("SELECT v1 FROM %s WHERE time = 1"), row("B"), row("E"));
+
+ assertInvalidMessage("IN restrictions are not supported on indexed columns",
+ "SELECT v1 FROM %s WHERE id2 = 0 and time IN (1, 2) ALLOW FILTERING");
+ }
+
+ @Test
+ public void testCompositeIndexWithPrimaryKey() throws Throwable
+ {
+ createTable("CREATE TABLE %s (blog_id int, time1 int, time2 int, author text, content text, PRIMARY KEY (blog_id, time1, time2))");
+
+ createIndex("CREATE INDEX ON %s(author)");
+
+ String req = "INSERT INTO %s (blog_id, time1, time2, author, content) VALUES (?, ?, ?, ?, ?)";
+ execute(req, 1, 0, 0, "foo", "bar1");
+ execute(req, 1, 0, 1, "foo", "bar2");
+ execute(req, 2, 1, 0, "foo", "baz");
+ execute(req, 3, 0, 1, "gux", "qux");
+
+ assertRows(execute("SELECT blog_id, content FROM %s WHERE author='foo'"),
+ row(1, "bar1"),
+ row(1, "bar2"),
+ row(2, "baz"));
+ assertRows(execute("SELECT blog_id, content FROM %s WHERE time1 > 0 AND author='foo' ALLOW FILTERING"), row(2, "baz"));
+ assertRows(execute("SELECT blog_id, content FROM %s WHERE time1 = 1 AND author='foo' ALLOW FILTERING"), row(2, "baz"));
+ assertRows(execute("SELECT blog_id, content FROM %s WHERE time1 = 1 AND time2 = 0 AND author='foo' ALLOW FILTERING"),
+ row(2, "baz"));
+ assertEmpty(execute("SELECT content FROM %s WHERE time1 = 1 AND time2 = 1 AND author='foo' ALLOW FILTERING"));
+ assertEmpty(execute("SELECT content FROM %s WHERE time1 = 1 AND time2 > 0 AND author='foo' ALLOW FILTERING"));
+
+ assertInvalidMessage("Cannot execute this query as it might involve data filtering",
+ "SELECT content FROM %s WHERE time2 >= 0 AND author='foo'");
+ }
+
+ @Test
+ public void testRangeQueryOnIndex() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int primary key, row int, setid int);");
+ createIndex("CREATE INDEX ON %s (setid)");
+
+ String q = "INSERT INTO %s (id, row, setid) VALUES (?, ?, ?);";
+ execute(q, 0, 0, 0);
+ execute(q, 1, 1, 0);
+ execute(q, 2, 2, 0);
+ execute(q, 3, 3, 0);
+
+ assertInvalidMessage("Cannot execute this query as it might involve data filtering",
+ "SELECT * FROM %s WHERE setid = 0 AND row < 1;");
+ assertRows(execute("SELECT * FROM %s WHERE setid = 0 AND row < 1 ALLOW FILTERING;"), row(0, 0, 0));
+ }
+
+ @Test
+ public void testEmptyIN() throws Throwable
+ {
+ for (String compactOption : new String[] { "", " WITH COMPACT STORAGE" })
+ {
+ createTable("CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY (k1, k2))" + compactOption);
+
+ for (int i = 0; i <= 2; i++)
+ for (int j = 0; j <= 2; j++)
+ execute("INSERT INTO %s (k1, k2, v) VALUES (?, ?, ?)", i, j, i + j);
+
+ assertEmpty(execute("SELECT v FROM %s WHERE k1 IN ()"));
+ assertEmpty(execute("SELECT v FROM %s WHERE k1 = 0 AND k2 IN ()"));
+ }
}
+
+ @Test
+ public void testLargeClusteringINValues() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c))");
+ execute("INSERT INTO %s (k, c, v) VALUES (0, 0, 0)");
+ List<Integer> inValues = new ArrayList<>(10000);
+ for (int i = 0; i < 10000; i++)
+ inValues.add(i);
+ assertRows(execute("SELECT * FROM %s WHERE k=? AND c IN ?", 0, inValues),
+ row(0, 0, 0)
+ );
+ }
}