You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/08/16 13:38:50 UTC
[10/10] cassandra git commit: Merge branch cassandra-2.2 into
cassandra-3.0
Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb56193a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb56193a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb56193a
Branch: refs/heads/cassandra-3.0
Commit: bb56193a3f2ac865efb18c9d6944f2927f667771
Parents: 6528fbf 9583b6b
Author: Benjamin Lerer <b....@gmail.com>
Authored: Tue Aug 16 15:29:42 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Tue Aug 16 15:34:35 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 10 +
.../cassandra/db/compaction/Scrubber.java | 169 ++++-
.../cassandra/db/marshal/ReversedType.java | 10 -
.../db/partitions/AbstractBTreePartition.java | 7 +-
.../db/partitions/ImmutableBTreePartition.java | 34 +-
.../db/rows/UnfilteredRowIterators.java | 28 +
.../org/apache/cassandra/cql3/CQLTester.java | 18 +
.../validation/entities/SecondaryIndexTest.java | 131 ++++
.../cql3/validation/operations/DeleteTest.java | 113 +++
.../operations/SelectOrderByTest.java | 719 ++++++++++---------
.../cql3/validation/operations/SelectTest.java | 357 ++++++++-
.../cassandra/db/marshal/ReversedTypeTest.java | 4 +-
13 files changed, 1210 insertions(+), 391 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a202755,c421398..7605952
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -49,9 -14,12 +49,10 @@@ Merged from 2.2
* Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
* Synchronize ThriftServer::stop() (CASSANDRA-12105)
* Use dedicated thread for JMX notifications (CASSANDRA-12146)
- * NPE when trying to remove purgable tombstones from result (CASSANDRA-12143)
* Improve streaming synchronization and fault tolerance (CASSANDRA-11414)
* MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973)
- * Don't write shadowed range tombstone (CASSANDRA-12030)
Merged from 2.1:
+ * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127)
* Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
* Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
* cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 6b8dfd3,0a3ab36..0bd3920
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,55 -13,17 +13,65 @@@ restore snapshots created with the prev
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
-2.2.8
++3.0.9
+ =====
+
+ Upgrading
+ ---------
- - The ReversedType behaviour has been corrected for clustering columns of
- BYTES type containing empty value. Scrub should be run on the existing
- SSTables containing a descending clustering column of BYTES type to correct
- their ordering. See CASSANDRA-12127 for more details.
++ - The ReversedType behaviour has been corrected for clustering columns of
++ BYTES type containing empty value. Scrub should be run on the existing
++ SSTables containing a descending clustering column of BYTES type to correct
++ their ordering. See CASSANDRA-12127 for more details.
+
-2.2.7
+3.0.8
+=====
+
+Upgrading
+---------
+ - Ec2MultiRegionSnitch will no longer automatically set broadcast_rpc_address
+ to the public instance IP if this property is defined on cassandra.yaml.
+
+3.0.7
+=====
+
+Upgrading
+---------
+ - A maximum size for SSTables values has been introduced, to prevent out of memory
+ exceptions when reading corrupt SSTables. This maximum size can be set via
+ max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
+ value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
+ they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
+
+Deprecation
+-----------
+ - DateTieredCompactionStrategy has been deprecated - new tables should use
+ TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might
+ cause increased compaction load for a while after the migration so make sure you run
+ tests before migrating. Read CASSANDRA-9666 for background on this.
+
+New features
+------------
+ - TimeWindowCompactionStrategy has been added. This has proven to be a better approach
+ to time series compaction and new tables should use this instead of DTCS. See
+ CASSANDRA-9666 for details.
+
+3.0.6
+=====
+
+New features
+------------
+ - JSON timestamps are now in UTC and contain the timezone information, see
+ CASSANDRA-11137 for more details.
+
+3.0.5
+=====
+
+Upgrading
+---------
+ - Nothing specific to this release, but please see previous versions upgrading section,
+ especially if you are upgrading from 2.2.
+
+3.0.4
=====
New features
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 539c4c7,99ee62e..c010891
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -23,11 -23,11 +23,12 @@@ import java.util.*
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
-import com.google.common.collect.AbstractIterator;
++import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@@ -327,6 -312,32 +302,38 @@@ public class Scrubber implements Closea
}
}
+ @SuppressWarnings("resource")
+ private boolean tryAppend(DecoratedKey prevKey, DecoratedKey key, SSTableRewriter writer)
+ {
- // OrderCheckerIterator will check, at iteration time, that the cells are in the proper order. If it detects
- // that one cell is out of order, it will stop returning them. The remaining cells will be sorted and added
- // to the outOfOrderRows that will be later written to a new SSTable.
- OrderCheckerIterator atoms = new OrderCheckerIterator(new SSTableIdentityIterator(sstable, dataFile, key, checkData),
- cfs.metadata.comparator.onDiskAtomComparator());
- if (prevKey != null && prevKey.compareTo(key) > 0)
++ // OrderCheckerIterator will check, at iteration time, that the rows are in the proper order. If it detects
++ // that one row is out of order, it will stop returning them. The remaining rows will be sorted and added
++ // to the outOfOrder set that will be later written to a new SSTable.
++ OrderCheckerIterator sstableIterator = new OrderCheckerIterator(new RowMergingSSTableIterator(sstable, dataFile, key),
++ cfs.metadata.comparator);
++
++ try (UnfilteredRowIterator iterator = withValidation(sstableIterator, dataFile.getPath()))
+ {
- saveOutOfOrderRow(prevKey, key, atoms);
- return false;
- }
++ if (prevKey != null && prevKey.compareTo(key) > 0)
++ {
++ saveOutOfOrderRow(prevKey, key, iterator);
++ return false;
++ }
+
- AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms));
- if (writer.tryAppend(compactedRow) == null)
- emptyRows++;
- else
- goodRows++;
++ if (writer.tryAppend(iterator) == null)
++ emptyRows++;
++ else
++ goodRows++;
++ }
+
- if (atoms.hasOutOfOrderCells())
- saveOutOfOrderRow(key, atoms);
++ if (sstableIterator.hasRowsOutOfOrder())
++ {
++ outputHandler.warn(String.format("Out of order rows found in partition: %s", key));
++ outOfOrder.add(sstableIterator.getRowsOutOfOrder());
++ }
+
+ return true;
+ }
+
private void updateIndexKey()
{
currentIndexKey = nextIndexKey;
@@@ -469,44 -523,88 +476,146 @@@
}
/**
+ * During 2.x migration, under some circumstances rows might have gotten duplicated.
+ * Merging iterator merges rows with same clustering.
+ *
+ * For more details, refer to CASSANDRA-12144.
+ */
+ private static class RowMergingSSTableIterator extends SSTableIdentityIterator
+ {
+ RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
+ {
+ super(sstable, file, key);
+ }
+
+ @Override
+ protected Unfiltered doCompute()
+ {
+ if (!iterator.hasNext())
+ return endOfData();
+
+ Unfiltered next = iterator.next();
+ if (!next.isRow())
+ return next;
+
+ while (iterator.hasNext())
+ {
+ Unfiltered peek = iterator.peek();
+ // If there was a duplicate row, merge it.
+ if (next.clustering().equals(peek.clustering()) && peek.isRow())
+ {
+ iterator.next(); // Make sure that the peeked item was consumed.
+ next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return next;
+ }
+ }
++
++ /**
+ * In some case like CASSANDRA-12127 the cells might have been stored in the wrong order. This decorator check the
+ * cells order and collect the out of order cells to correct the problem.
+ */
- private static final class OrderCheckerIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator
++ private static final class OrderCheckerIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator
+ {
+ /**
+ * The decorated iterator.
+ */
- private final OnDiskAtomIterator iterator;
++ private final UnfilteredRowIterator iterator;
+
- /**
- * The atom comparator.
- */
- private final Comparator<OnDiskAtom> comparator;
++ private final ClusteringComparator comparator;
+
- /**
- * The Column family containing the cells which are out of order.
- */
- private ColumnFamily outOfOrderCells;
++ private Unfiltered previous;
+
+ /**
- * The previous atom returned
++ * The partition containing the rows which are out of order.
+ */
- private OnDiskAtom previous;
++ private Partition rowsOutOfOrder;
+
- public OrderCheckerIterator(OnDiskAtomIterator iterator, Comparator<OnDiskAtom> comparator)
++ public OrderCheckerIterator(UnfilteredRowIterator iterator, ClusteringComparator comparator)
+ {
+ this.iterator = iterator;
+ this.comparator = comparator;
+ }
+
- public ColumnFamily getColumnFamily()
++ public CFMetaData metadata()
+ {
- return iterator.getColumnFamily();
++ return iterator.metadata();
+ }
+
- public DecoratedKey getKey()
++ public boolean isReverseOrder()
+ {
- return iterator.getKey();
++ return iterator.isReverseOrder();
+ }
+
- public void close() throws IOException
++ public PartitionColumns columns()
+ {
- iterator.close();
++ return iterator.columns();
++ }
++
++ public DecoratedKey partitionKey()
++ {
++ return iterator.partitionKey();
++ }
++
++ public Row staticRow()
++ {
++ return iterator.staticRow();
+ }
+
+ @Override
- protected OnDiskAtom computeNext()
++ public boolean isEmpty()
++ {
++ return iterator.isEmpty();
++ }
++
++ public void close()
++ {
++ iterator.close();
++ }
++
++ public DeletionTime partitionLevelDeletion()
++ {
++ return iterator.partitionLevelDeletion();
++ }
++
++ public EncodingStats stats()
++ {
++ return iterator.stats();
++ }
++
++ public boolean hasRowsOutOfOrder()
++ {
++ return rowsOutOfOrder != null;
++ }
++
++ public Partition getRowsOutOfOrder()
++ {
++ return rowsOutOfOrder;
++ }
++
++ protected Unfiltered computeNext()
+ {
+ if (!iterator.hasNext())
+ return endOfData();
+
- OnDiskAtom next = iterator.next();
++ Unfiltered next = iterator.next();
+
- // If we detect that some cells are out of order we will store and sort the remaining once to insert them
++ // If we detect that some rows are out of order we will store and sort the remaining ones to insert them
+ // in a separate SSTable.
+ if (previous != null && comparator.compare(next, previous) < 0)
+ {
- outOfOrderCells = collectOutOfOrderCells(next, iterator);
++ rowsOutOfOrder = ImmutableBTreePartition.create(UnfilteredRowIterators.concat(next, iterator), false);
+ return endOfData();
+ }
+ previous = next;
+ return next;
+ }
+
- public boolean hasOutOfOrderCells()
- {
- return outOfOrderCells != null;
- }
-
- public ColumnFamily getOutOfOrderCells()
- {
- return outOfOrderCells;
- }
-
- private static ColumnFamily collectOutOfOrderCells(OnDiskAtom atom, OnDiskAtomIterator iterator)
- {
- ColumnFamily cf = iterator.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false);
- cf.addAtom(atom);
- while (iterator.hasNext())
- cf.addAtom(iterator.next());
- return cf;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/marshal/ReversedType.java
index 02320c7,19bee5f..82a1895
--- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
@@@ -66,18 -65,8 +66,8 @@@ public class ReversedType<T> extends Ab
return baseType.isEmptyValueMeaningless();
}
- public int compare(ByteBuffer o1, ByteBuffer o2)
+ public int compareCustom(ByteBuffer o1, ByteBuffer o2)
{
- // An empty byte buffer is always smaller
- if (o1.remaining() == 0)
- {
- return o2.remaining() == 0 ? 0 : -1;
- }
- if (o2.remaining() == 0)
- {
- return 1;
- }
-
return baseType.compare(o2, o1);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 1fa3324,0000000..c63acc2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@@ -1,408 -1,0 +1,413 @@@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.partitions;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
+
+import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
+
+public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
+{
+ protected static final Holder EMPTY = new Holder(PartitionColumns.NONE, BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+
+ protected final CFMetaData metadata;
+ protected final DecoratedKey partitionKey;
+
+ protected abstract Holder holder();
+ protected abstract boolean canHaveShadowedData();
+
+ protected AbstractBTreePartition(CFMetaData metadata, DecoratedKey partitionKey)
+ {
+ this.metadata = metadata;
+ this.partitionKey = partitionKey;
+ }
+
+ protected static final class Holder
+ {
+ final PartitionColumns columns;
+ final DeletionInfo deletionInfo;
+ // the btree of rows
+ final Object[] tree;
+ final Row staticRow;
+ final EncodingStats stats;
+
+ Holder(PartitionColumns columns, Object[] tree, DeletionInfo deletionInfo, Row staticRow, EncodingStats stats)
+ {
+ this.columns = columns;
+ this.tree = tree;
+ this.deletionInfo = deletionInfo;
+ this.staticRow = staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+ this.stats = stats;
+ }
+ }
+
+ public DeletionInfo deletionInfo()
+ {
+ return holder().deletionInfo;
+ }
+
+ public Row staticRow()
+ {
+ return holder().staticRow;
+ }
+
+ public boolean isEmpty()
+ {
+ Holder holder = holder();
+ return holder.deletionInfo.isLive() && BTree.isEmpty(holder.tree) && holder.staticRow.isEmpty();
+ }
+
+ public boolean hasRows()
+ {
+ Holder holder = holder();
+ return !BTree.isEmpty(holder.tree);
+ }
+
+ public CFMetaData metadata()
+ {
+ return metadata;
+ }
+
+ public DecoratedKey partitionKey()
+ {
+ return partitionKey;
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return holder().deletionInfo.getPartitionDeletion();
+ }
+
+ public PartitionColumns columns()
+ {
+ return holder().columns;
+ }
+
+ public EncodingStats stats()
+ {
+ return holder().stats;
+ }
+
+ public Row getRow(Clustering clustering)
+ {
+ Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
+ // Note that for statics, this will never return null, this will return an empty row. However,
+ // it's more consistent for this method to return null if we don't really have a static row.
+ return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+ }
+
+ private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow)
+ {
+ DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+ if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive()))
+ return Rows.EMPTY_STATIC_ROW;
+
+ Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata);
+ return row == null ? Rows.EMPTY_STATIC_ROW : row;
+ }
+
+ public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed)
+ {
+ // TODO: we could optimize comparison for "NativeRow" � la #6755
+ final Holder current = holder();
+ return new SearchIterator<Clustering, Row>()
+ {
+ private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
+ private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+
+ public boolean hasNext()
+ {
+ return rawIter.hasNext();
+ }
+
+ public Row next(Clustering clustering)
+ {
+ if (clustering == Clustering.STATIC_CLUSTERING)
+ return staticRow(current, columns, true);
+
+ Row row = rawIter.next(clustering);
+ RangeTombstone rt = current.deletionInfo.rangeCovering(clustering);
+
+ // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row
+ // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion
+ // to carry the proper deletion on the row.
+ DeletionTime activeDeletion = partitionDeletion;
+ if (rt != null && rt.deletionTime().supersedes(activeDeletion))
+ activeDeletion = rt.deletionTime();
+
+ if (row == null)
+ return activeDeletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion));
+
+ return row.filter(columns, activeDeletion, true, metadata);
+ }
+ };
+ }
+
+ public UnfilteredRowIterator unfilteredIterator()
+ {
+ return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false);
+ }
+
+ public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+ {
+ return unfilteredIterator(holder(), selection, slices, reversed);
+ }
+
+ public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed)
+ {
+ Row staticRow = staticRow(current, selection, false);
+ if (slices.size() == 0)
+ {
+ DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
+ return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed);
+ }
+
+ return slices.size() == 1
+ ? sliceIterator(selection, slices.get(0), reversed, current, staticRow)
+ : new SlicesIterator(selection, slices, reversed, current, staticRow);
+ }
+
+ private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow)
+ {
+ Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start();
+ Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
+ Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed));
+ Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(slice, reversed);
+
+ return merge(rowIter, deleteIter, selection, reversed, current, staticRow);
+ }
+
+ private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter,
+ ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
+ {
+ return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(),
+ selection, staticRow, reversed, current.stats,
+ rowIter, deleteIter,
+ canHaveShadowedData());
+ }
+
+ private abstract class AbstractIterator extends AbstractUnfilteredRowIterator
+ {
+ final Holder current;
+ final ColumnFilter selection;
+
+ private AbstractIterator(ColumnFilter selection, boolean isReversed)
+ {
+ this(AbstractBTreePartition.this.holder(), selection, isReversed);
+ }
+
+ private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed)
+ {
+ this(current,
+ AbstractBTreePartition.this.staticRow(current, selection, false),
+ selection, isReversed);
+ }
+
+ private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed)
+ {
+ super(AbstractBTreePartition.this.metadata,
+ AbstractBTreePartition.this.partitionKey,
+ current.deletionInfo.getPartitionDeletion(),
+ selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
+ // it would also be more precise to return the intersection of the selection and current.columns,
+ // but its probably not worth spending time on computing that.
+ staticRow,
+ isReversed,
+ current.stats);
+ this.current = current;
+ this.selection = selection;
+ }
+ }
+
+ public class SlicesIterator extends AbstractIterator
+ {
+ private final Slices slices;
+
+ private int idx;
+ private Iterator<Unfiltered> currentSlice;
+
+ private SlicesIterator(ColumnFilter selection,
+ Slices slices,
+ boolean isReversed,
+ Holder current,
+ Row staticRow)
+ {
+ super(current, staticRow, selection, isReversed);
+ this.slices = slices;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (true)
+ {
+ if (currentSlice == null)
+ {
+ if (idx >= slices.size())
+ return endOfData();
+
+ int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx;
+ currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW);
+ idx++;
+ }
+
+ if (currentSlice.hasNext())
+ return currentSlice.next();
+
+ currentSlice = null;
+ }
+ }
+ }
+
+ public class SliceableIterator extends AbstractIterator implements SliceableUnfilteredRowIterator
+ {
+ private Iterator<Unfiltered> iterator;
+
+ protected SliceableIterator(ColumnFilter selection, boolean isReversed)
+ {
+ super(selection, isReversed);
+ }
+
+ protected Unfiltered computeNext()
+ {
+ if (iterator == null)
+ iterator = unfilteredIterator(selection, Slices.ALL, isReverseOrder);
+ if (!iterator.hasNext())
+ return endOfData();
+ return iterator.next();
+ }
+
+ public Iterator<Unfiltered> slice(Slice slice)
+ {
+ return sliceIterator(selection, slice, isReverseOrder, current, staticRow);
+ }
+ }
+
+ public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
+ {
+ return new SliceableIterator(columns, reversed);
+ }
+
+ protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
+ {
+ return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false);
+ }
+
+ protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity)
+ {
++ return build(iterator, initialRowCapacity, true);
++ }
++
++ protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
++ {
+ CFMetaData metadata = iterator.metadata();
+ PartitionColumns columns = iterator.columns();
+ boolean reversed = iterator.isReverseOrder();
+
+ BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
- builder.auto(false);
++ builder.auto(!ordered);
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed);
+
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ if (unfiltered.kind() == Unfiltered.Kind.ROW)
+ builder.add((Row)unfiltered);
+ else
+ deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+ }
+
+ if (reversed)
+ builder.reverse();
+
+ return new Holder(columns, builder.build(), deletionBuilder.build(), iterator.staticRow(), iterator.stats());
+ }
+
+ // Note that when building with a RowIterator, deletion will generally be LIVE, but we allow to pass it nonetheless because PartitionUpdate
+ // passes a MutableDeletionInfo that it mutates later.
+ protected static Holder build(RowIterator rows, DeletionInfo deletion, boolean buildEncodingStats, int initialRowCapacity)
+ {
+ CFMetaData metadata = rows.metadata();
+ PartitionColumns columns = rows.columns();
+ boolean reversed = rows.isReverseOrder();
+
+ BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
+ builder.auto(false);
+ while (rows.hasNext())
+ {
+ Row row = rows.next();
+ builder.add(row);
+ }
+
+ if (reversed)
+ builder.reverse();
+
+ Row staticRow = rows.staticRow();
+ Object[] tree = builder.build();
+ EncodingStats stats = buildEncodingStats ? EncodingStats.Collector.collect(staticRow, BTree.iterator(tree), deletion)
+ : EncodingStats.NO_STATS;
+ return new Holder(columns, tree, deletion, staticRow, stats);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append(String.format("[%s.%s] key=%s columns=%s",
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(partitionKey().getKey()),
+ columns()));
+
+ if (staticRow() != Rows.EMPTY_STATIC_ROW)
+ sb.append("\n ").append(staticRow().toString(metadata));
+
+ for (Row row : this)
+ sb.append("\n ").append(row.toString(metadata));
+
+ return sb.toString();
+ }
+
+ public int rowCount()
+ {
+ return BTree.size(holder().tree);
+ }
+
+ public Iterator<Row> iterator()
+ {
+ return BTree.<Row>iterator(holder().tree);
+ }
+
+ public Row lastRow()
+ {
+ Object[] tree = holder().tree;
+ if (BTree.isEmpty(tree))
+ return null;
+
+ return BTree.findByIndex(tree, BTree.size(tree) - 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
index 9af4bad,0000000..8d96f1e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
@@@ -1,91 -1,0 +1,123 @@@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.rows.*;
+
+public class ImmutableBTreePartition extends AbstractBTreePartition
+{
+
+ protected final Holder holder;
+
+ public ImmutableBTreePartition(CFMetaData metadata,
+ DecoratedKey partitionKey,
+ PartitionColumns columns,
+ Row staticRow,
+ Object[] tree,
+ DeletionInfo deletionInfo,
+ EncodingStats stats)
+ {
+ super(metadata, partitionKey);
+ this.holder = new Holder(columns, tree, deletionInfo, staticRow, stats);
+ }
+
+ protected ImmutableBTreePartition(CFMetaData metadata,
+ DecoratedKey partitionKey,
+ Holder holder)
+ {
+ super(metadata, partitionKey);
+ this.holder = holder;
+ }
+
+ /**
+ * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
+ *
+ * Warning: Note that this method does not close the provided iterator and it is
+ * up to the caller to do so.
+ *
+ * @param iterator the iterator to gather in memory.
+ * @return the created partition.
+ */
+ public static ImmutableBTreePartition create(UnfilteredRowIterator iterator)
+ {
+ return create(iterator, 16);
+ }
+
+ /**
+ * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
+ *
+ * Warning: Note that this method does not close the provided iterator and it is
+ * up to the caller to do so.
+ *
+ * @param iterator the iterator to gather in memory.
++ * @param ordered {@code true} if the iterator will return the rows in order, {@code false} otherwise.
++ * @return the created partition.
++ */
++ public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, boolean ordered)
++ {
++ return create(iterator, 16, ordered);
++ }
++
++ /**
++ * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
++ *
++ * Warning: Note that this method does not close the provided iterator and it is
++ * up to the caller to do so.
++ *
++ * @param iterator the iterator to gather in memory.
+ * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
+ * correspond or be a good estimation of the number or rows in {@code iterator}.
+ * @return the created partition.
+ */
+ public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity)
+ {
- return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity));
++ return create(iterator, initialRowCapacity, true);
++ }
++
++ /**
++ * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator.
++ *
++ * Warning: Note that this method does not close the provided iterator and it is
++ * up to the caller to do so.
++ *
++ * @param iterator the iterator to gather in memory.
++ * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
++ * correspond or be a good estimation of the number or rows in {@code iterator}.
++ * @param ordered {@code true} if the iterator will return the rows in order, {@code false} otherwise.
++ * @return the created partition.
++ */
++ public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered)
++ {
++ return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity, ordered));
+ }
+
+ protected Holder holder()
+ {
+ return holder;
+ }
+
+ protected boolean canHaveShadowedData()
+ {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 3218ff2,0000000..43653a9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@@ -1,558 -1,0 +1,586 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.*;
+import java.security.MessageDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.transform.FilteredRows;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IMergeIterator;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * Static methods to work with atom iterators.
+ */
+public abstract class UnfilteredRowIterators
+{
+ private static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIterators.class);
+
+ private UnfilteredRowIterators() {}
+
+ /**
+ * Interface for a listener interested in the result of merging multiple versions of a given row.
+ * <p>
+ * Implementors of this interface are given enough information that they can easily reconstruct the difference
+ * between the merged result and each individual input. This is used when reconciling results on replias for
+ * instance to figure out what to send as read-repair to each source.
+ */
+ public interface MergeListener
+ {
+ /**
+ * Called once for the merged partition.
+ *
+ * @param mergedDeletion the partition level deletion for the merged partition. Implementors can test if the
+ * merged partition actually has a partition level deletion or not by calling {@code mergedDeletion.isLive()}.
+ * @param versions the partition level deletion for the sources of the merge. Elements of the array will never
+ * be null, but be "live".
+ **/
+ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions);
+
+ /**
+ * Called once for every row participating in the merge.
+ * <p>
+ * Note that this is called for every clustering where at least one of the source merged has a row. In
+ * particular, this may be called in cases where there is no row in the merged output (if a source has a row
+ * that is shadowed by another source range tombstone or partition level deletion).
+ *
+ * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a
+ * placeholder for when at least one source has a row, but that row is shadowed in the merged output.
+ * @param versions for each source, the row in that source corresponding to {@code merged}. This can be
+ * {@code null} for some sources if the source has not such row.
+ */
+ public void onMergedRows(Row merged, Row[] versions);
+
+ /**
+ * Called once for every range tombstone marker participating in the merge.
+ * <p>
+ * Note that this is called for every "clustering position" where at least one of the source merged has a range
+ * tombstone marker.
+ *
+ * @param merged the marker in the merged output. This can be {@code null} if there is no such marker, which
+ * means that at least one source has a marker in {@code versions} but the merged out has nothing corresponding
+ * (this basically mean the merged output has a currently open deletion that shadows whatever marker the source
+ * had).
+ * @param versions the marker for each source merged. This can be {@code null} for some source if that source
+ * has not such marker.
+ */
+ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions);
+
+ public void close();
+ }
+
+ /**
+ * Returns a iterator that only returns rows with only live content.
+ *
+ * This is mainly used in the CQL layer when we know we don't care about deletion
+ * infos (and since an UnfilteredRowIterator cannot shadow it's own data, we know everyting
+ * returned isn't shadowed by a tombstone).
+ */
+ public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec)
+ {
+ return FilteredRows.filter(iter, nowInSec);
+ }
+
+ /**
+ * Returns an iterator that is the result of merging other iterators.
+ */
+ public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec)
+ {
+ assert !iterators.isEmpty();
+ if (iterators.size() == 1)
+ return iterators.get(0);
+
+ return UnfilteredRowMergeIterator.create(iterators, nowInSec, null);
+ }
+
+ /**
+ * Returns an iterator that is the result of merging other iterators, and (optionally) using
+ * specific MergeListener.
+ *
+ * Note that this method assumes that there is at least 2 iterators to merge.
+ */
+ public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener)
+ {
+ return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener);
+ }
+
+ /**
+ * Returns an empty unfiltered iterator for a given partition.
+ */
+ public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder)
+ {
+ return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion);
+ }
+
+ /**
+ * Digests the partition represented by the provided iterator.
+ *
+ * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
+ * as this is only used when producing digest to be sent to legacy nodes.
+ * @param iterator the iterator to digest.
+ * @param digest the {@code MessageDigest} to use for the digest.
+ * @param version the messaging protocol to use when producing the digest.
+ */
+ public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version)
+ {
+ if (version < MessagingService.VERSION_30)
+ {
+ LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest);
+ return;
+ }
+
+ digest.update(iterator.partitionKey().getKey().duplicate());
+ iterator.partitionLevelDeletion().digest(digest);
+ iterator.columns().regulars.digest(digest);
+ // When serializing an iterator, we skip the static columns if the iterator has not static row, even if the
+ // columns() object itself has some (the columns() is a superset of what the iterator actually contains, and
+ // will correspond to the queried columns pre-serialization). So we must avoid taking the satic column names
+ // into account if there is no static row or we'd have a digest mismatch between depending on whether the digest
+ // is computed on an iterator that has been serialized or not (see CASSANDRA-12090)
+ // TODO: in practice we could completely skip digesting the columns since they are more informative of what the
+ // iterator may contain, and digesting the actual content is enough. And in fact, that would be more correct
+ // (since again, the columns could be different without the information represented by the iterator being
+ // different), but removing them entirely is stricly speaking a breaking change (it would create mismatches on
+ // upgrade) so we can only do on the next protocol version bump.
+ if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW)
+ iterator.columns().statics.digest(digest);
+ FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
+ iterator.staticRow().digest(digest);
+
+ while (iterator.hasNext())
+ {
+ Unfiltered unfiltered = iterator.next();
+ unfiltered.digest(digest);
+ }
+ }
+
+ /**
+ * Returns an iterator that concatenate two atom iterators.
+ * This method assumes that both iterator are from the same partition and that the atom from
+ * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator
+ * make sense).
+ */
+ public static UnfilteredRowIterator concat(final UnfilteredRowIterator iter1, final UnfilteredRowIterator iter2)
+ {
+ assert iter1.metadata().cfId.equals(iter2.metadata().cfId)
+ && iter1.partitionKey().equals(iter2.partitionKey())
+ && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion())
+ && iter1.isReverseOrder() == iter2.isReverseOrder()
+ && iter1.columns().equals(iter2.columns())
+ && iter1.staticRow().equals(iter2.staticRow());
+
+ class Extend implements MoreRows<UnfilteredRowIterator>
+ {
+ boolean returned = false;
+ public UnfilteredRowIterator moreContents()
+ {
+ if (returned)
+ return null;
+ returned = true;
+ return iter2;
+ }
+ }
+
+ return MoreRows.extend(iter1, new Extend());
+ }
+
++ /**
++ * Returns an iterator that concatenate the specified atom with the iterator.
++ */
++ public static UnfilteredRowIterator concat(final Unfiltered first, final UnfilteredRowIterator rest)
++ {
++ return new WrappingUnfilteredRowIterator(rest)
++ {
++ private boolean hasReturnedFirst;
++
++ @Override
++ public boolean hasNext()
++ {
++ return hasReturnedFirst ? super.hasNext() : true;
++ }
++
++ @Override
++ public Unfiltered next()
++ {
++ if (!hasReturnedFirst)
++ {
++ hasReturnedFirst = true;
++ return first;
++ }
++ return super.next();
++ }
++ };
++ }
++
+ public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator)
+ {
+ class Cloner extends Transformation
+ {
+ private final Row.Builder builder = allocator.cloningBTreeRowBuilder();
+
+ public Row applyToStatic(Row row)
+ {
+ return Rows.copy(row, builder).build();
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ return Rows.copy(row, builder).build();
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ return marker.copy(allocator);
+ }
+ }
+ return Transformation.apply(iterator, new Cloner());
+ }
+
+ /**
+ * Validate that the data of the provided iterator is valid, that is that the values
+ * it contains are valid for the type they represent, and more generally that the
+ * infos stored are sensible.
+ *
+ * This is mainly used by scrubber to detect problems in sstables.
+ *
+ * @param iterator the partition to check.
+ * @param filename the name of the file the data is comming from.
+ * @return an iterator that returns the same data than {@code iterator} but that
+ * checks said data and throws a {@code CorruptedSSTableException} if it detects
+ * invalid data.
+ */
+ public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename)
+ {
+ class Validator extends Transformation
+ {
+ @Override
+ public Row applyToStatic(Row row)
+ {
+ validate(row);
+ return row;
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ validate(row);
+ return row;
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ validate(marker);
+ return marker;
+ }
+
+ private void validate(Unfiltered unfiltered)
+ {
+ try
+ {
+ unfiltered.validateData(iterator.metadata());
+ }
+ catch (MarshalException me)
+ {
+ throw new CorruptSSTableException(me, filename);
+ }
+ }
+ }
+ return Transformation.apply(iterator, new Validator());
+ }
+
+ /**
+ * Wraps the provided iterator so it logs the returned atoms for debugging purposes.
+ * <p>
+ * Note that this is only meant for debugging as this can log a very large amount of
+ * logging at INFO.
+ */
+ public static UnfilteredRowIterator loggingIterator(UnfilteredRowIterator iterator, final String id, final boolean fullDetails)
+ {
+ CFMetaData metadata = iterator.metadata();
+ logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}, deletion={}",
+ id,
+ metadata.ksName,
+ metadata.cfName,
+ metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
+ iterator.isReverseOrder(),
+ iterator.partitionLevelDeletion().markedForDeleteAt());
+
+ class Logger extends Transformation
+ {
+ @Override
+ public Row applyToStatic(Row row)
+ {
+ if (!row.isEmpty())
+ logger.info("[{}] {}", id, row.toString(metadata, fullDetails));
+ return row;
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ logger.info("[{}] {}", id, row.toString(metadata, fullDetails));
+ return row;
+ }
+
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+ {
+ logger.info("[{}] {}", id, marker.toString(metadata));
+ return marker;
+ }
+ }
+ return Transformation.apply(iterator, new Logger());
+ }
+
+ /**
+ * A wrapper over MergeIterator to implement the UnfilteredRowIterator interface.
+ */
+ private static class UnfilteredRowMergeIterator extends AbstractUnfilteredRowIterator
+ {
+ private final IMergeIterator<Unfiltered, Unfiltered> mergeIterator;
+ private final MergeListener listener;
+
+ private UnfilteredRowMergeIterator(CFMetaData metadata,
+ List<UnfilteredRowIterator> iterators,
+ PartitionColumns columns,
+ DeletionTime partitionDeletion,
+ int nowInSec,
+ boolean reversed,
+ MergeListener listener)
+ {
+ super(metadata,
+ iterators.get(0).partitionKey(),
+ partitionDeletion,
+ columns,
+ mergeStaticRows(iterators, columns.statics, nowInSec, listener, partitionDeletion),
+ reversed,
+ mergeStats(iterators));
+
+ this.mergeIterator = MergeIterator.get(iterators,
+ reversed ? metadata.comparator.reversed() : metadata.comparator,
+ new MergeReducer(iterators.size(), reversed, nowInSec, listener));
+ this.listener = listener;
+ }
+
+ private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener)
+ {
+ try
+ {
+ checkForInvalidInput(iterators);
+ return new UnfilteredRowMergeIterator(iterators.get(0).metadata(),
+ iterators,
+ collectColumns(iterators),
+ collectPartitionLevelDeletion(iterators, listener),
+ nowInSec,
+ iterators.get(0).isReverseOrder(),
+ listener);
+ }
+ catch (RuntimeException | Error e)
+ {
+ try
+ {
+ FBUtilities.closeAll(iterators);
+ }
+ catch (Exception suppressed)
+ {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ }
+ }
+
+ @SuppressWarnings("resource") // We're not really creating any resource here
+ private static void checkForInvalidInput(List<UnfilteredRowIterator> iterators)
+ {
+ if (iterators.isEmpty())
+ return;
+
+ UnfilteredRowIterator first = iterators.get(0);
+ for (int i = 1; i < iterators.size(); i++)
+ {
+ UnfilteredRowIterator iter = iterators.get(i);
+ assert first.metadata().cfId.equals(iter.metadata().cfId);
+ assert first.partitionKey().equals(iter.partitionKey());
+ assert first.isReverseOrder() == iter.isReverseOrder();
+ }
+ }
+
+ @SuppressWarnings("resource") // We're not really creating any resource here
+ private static DeletionTime collectPartitionLevelDeletion(List<UnfilteredRowIterator> iterators, MergeListener listener)
+ {
+ DeletionTime[] versions = listener == null ? null : new DeletionTime[iterators.size()];
+
+ DeletionTime delTime = DeletionTime.LIVE;
+ for (int i = 0; i < iterators.size(); i++)
+ {
+ UnfilteredRowIterator iter = iterators.get(i);
+ DeletionTime iterDeletion = iter.partitionLevelDeletion();
+ if (listener != null)
+ versions[i] = iterDeletion;
+ if (!delTime.supersedes(iterDeletion))
+ delTime = iterDeletion;
+ }
+ if (listener != null)
+ listener.onMergedPartitionLevelDeletion(delTime, versions);
+ return delTime;
+ }
+
+ private static Row mergeStaticRows(List<UnfilteredRowIterator> iterators,
+ Columns columns,
+ int nowInSec,
+ MergeListener listener,
+ DeletionTime partitionDeletion)
+ {
+ if (columns.isEmpty())
+ return Rows.EMPTY_STATIC_ROW;
+
+ if (iterators.stream().allMatch(iter -> iter.staticRow().isEmpty()))
+ return Rows.EMPTY_STATIC_ROW;
+
+ Row.Merger merger = new Row.Merger(iterators.size(), nowInSec, columns.hasComplex());
+ for (int i = 0; i < iterators.size(); i++)
+ merger.add(i, iterators.get(i).staticRow());
+
+ Row merged = merger.merge(partitionDeletion);
+ if (merged == null)
+ merged = Rows.EMPTY_STATIC_ROW;
+ if (listener != null)
+ listener.onMergedRows(merged, merger.mergedRows());
+ return merged;
+ }
+
+ private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators)
+ {
+ PartitionColumns first = iterators.get(0).columns();
+ Columns statics = first.statics;
+ Columns regulars = first.regulars;
+ for (int i = 1; i < iterators.size(); i++)
+ {
+ PartitionColumns cols = iterators.get(i).columns();
+ statics = statics.mergeTo(cols.statics);
+ regulars = regulars.mergeTo(cols.regulars);
+ }
+ return statics == first.statics && regulars == first.regulars
+ ? first
+ : new PartitionColumns(statics, regulars);
+ }
+
+ private static EncodingStats mergeStats(List<UnfilteredRowIterator> iterators)
+ {
+ EncodingStats stats = EncodingStats.NO_STATS;
+ for (UnfilteredRowIterator iter : iterators)
+ stats = stats.mergeWith(iter.stats());
+ return stats;
+ }
+
+ protected Unfiltered computeNext()
+ {
+ while (mergeIterator.hasNext())
+ {
+ Unfiltered merged = mergeIterator.next();
+ if (merged != null)
+ return merged;
+ }
+ return endOfData();
+ }
+
+ public void close()
+ {
+ // This will close the input iterators
+ FileUtils.closeQuietly(mergeIterator);
+
+ if (listener != null)
+ listener.close();
+ }
+
+ private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered>
+ {
+ private final MergeListener listener;
+
+ private Unfiltered.Kind nextKind;
+
+ private final Row.Merger rowMerger;
+ private final RangeTombstoneMarker.Merger markerMerger;
+
+ private MergeReducer(int size, boolean reversed, int nowInSec, MergeListener listener)
+ {
+ this.rowMerger = new Row.Merger(size, nowInSec, columns().regulars.hasComplex());
+ this.markerMerger = new RangeTombstoneMarker.Merger(size, partitionLevelDeletion(), reversed);
+ this.listener = listener;
+ }
+
+ @Override
+ public boolean trivialReduceIsTrivial()
+ {
+ // If we have a listener, we must signal it even when we have a single version
+ return listener == null;
+ }
+
+ public void reduce(int idx, Unfiltered current)
+ {
+ nextKind = current.kind();
+ if (nextKind == Unfiltered.Kind.ROW)
+ rowMerger.add(idx, (Row)current);
+ else
+ markerMerger.add(idx, (RangeTombstoneMarker)current);
+ }
+
+ protected Unfiltered getReduced()
+ {
+ if (nextKind == Unfiltered.Kind.ROW)
+ {
+ Row merged = rowMerger.merge(markerMerger.activeDeletion());
+ if (listener != null)
+ listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows());
+ return merged;
+ }
+ else
+ {
+ RangeTombstoneMarker merged = markerMerger.merge();
+ if (listener != null)
+ listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers());
+ return merged;
+ }
+ }
+
+ protected void onKeyChange()
+ {
+ if (nextKind == Unfiltered.Kind.ROW)
+ rowMerger.clear();
+ else
+ markerMerger.clear();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index e3dc220,98b8e23..c9c4631
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -366,21 -278,20 +366,39 @@@ public abstract class CQLTeste
return list.isEmpty() ? Collections.<String>emptyList() : new ArrayList<>(list);
}
+ public ColumnFamilyStore getCurrentColumnFamilyStore()
+ {
+ String currentTable = currentTable();
+ return currentTable == null
+ ? null
+ : Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable);
+ }
+
public void flush()
{
- try
- {
- String currentTable = currentTable();
- if (currentTable != null)
- Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable).forceFlush().get();
- }
- catch (InterruptedException | ExecutionException e)
- {
- throw new RuntimeException(e);
- }
+ ColumnFamilyStore store = getCurrentColumnFamilyStore();
+ if (store != null)
+ store.forceBlockingFlush();
+ }
+
++ @FunctionalInterface
++ public interface CheckedFunction {
++ void apply() throws Throwable;
++ }
++
++ /**
++ * Runs the given function before and after a flush of sstables. This is useful for checking that behavior is
++ * the same whether data is in memtables or sstables.
++ * @param runnable
++ * @throws Throwable
++ */
++ public void beforeAndAfterFlush(CheckedFunction runnable) throws Throwable
++ {
++ runnable.apply();
++ flush();
++ runnable.apply();
+ }
+
public void compact()
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index f9802d7,11d2462..0cf13bd
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@@ -17,47 -17,38 +17,49 @@@
*/
package org.apache.cassandra.cql3.validation.entities;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
+import org.apache.cassandra.cql3.statements.IndexTarget;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.index.IndexNotAvailableException;
-import org.apache.cassandra.db.index.PerRowSecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.index.composites.CompositesSearcher;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder.Group;
-import org.apache.commons.lang3.StringUtils;
-import org.junit.Test;
-
+import org.apache.cassandra.index.IndexNotAvailableException;
+import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.index.internal.CustomCassandraIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MD5Digest;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.Util.throwAssert;
+ import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class SecondaryIndexTest extends CQLTester
{
@@@ -949,77 -684,190 +951,206 @@@
}
@Test
+ public void droppingIndexInvalidatesPreparedStatements() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
+ createIndex("CREATE INDEX c_idx ON %s(c)");
+ MD5Digest cqlId = prepareStatement("SELECT * FROM %s.%s WHERE c=?", false).statementId;
+ Integer thriftId = prepareStatement("SELECT * FROM %s.%s WHERE c=?", true).toThriftPreparedResult().getItemId();
+
+ assertNotNull(QueryProcessor.instance.getPrepared(cqlId));
+ assertNotNull(QueryProcessor.instance.getPreparedForThrift(thriftId));
+
+ dropIndex("DROP INDEX %s.c_idx");
+
+ assertNull(QueryProcessor.instance.getPrepared(cqlId));
+ assertNull(QueryProcessor.instance.getPreparedForThrift(thriftId));
+ }
+
+ // See CASSANDRA-11021
+ @Test
+ public void testIndexesOnNonStaticColumnsWhereSchemaIncludesStaticColumns() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a int, b int, c int static, d int, PRIMARY KEY (a, b))");
+ createIndex("CREATE INDEX b_idx on %s(b)");
+ createIndex("CREATE INDEX d_idx on %s(d)");
+
+ execute("INSERT INTO %s (a, b, c ,d) VALUES (0, 0, 0, 0)");
+ execute("INSERT INTO %s (a, b, c, d) VALUES (1, 1, 1, 1)");
+ assertRows(execute("SELECT * FROM %s WHERE b = 0"), row(0, 0, 0, 0));
+ assertRows(execute("SELECT * FROM %s WHERE d = 1"), row(1, 1, 1, 1));
+
+ execute("UPDATE %s SET c = 2 WHERE a = 0");
+ execute("UPDATE %s SET c = 3, d = 4 WHERE a = 1 AND b = 1");
+ assertRows(execute("SELECT * FROM %s WHERE b = 0"), row(0, 0, 2, 0));
+ assertRows(execute("SELECT * FROM %s WHERE d = 4"), row(1, 1, 3, 4));
+
+ execute("DELETE FROM %s WHERE a = 0");
+ execute("DELETE FROM %s WHERE a = 1 AND b = 1");
+ assertEmpty(execute("SELECT * FROM %s WHERE b = 0"));
+ assertEmpty(execute("SELECT * FROM %s WHERE d = 3"));
+ }
+
++ @Test
+ public void testWithEmptyRestrictionValueAndSecondaryIndex() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk blob, c blob, v blob, PRIMARY KEY ((pk), c))");
+ createIndex("CREATE INDEX on %s(c)");
+ createIndex("CREATE INDEX on %s(v)");
+
+ execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", bytes("foo123"), bytes("1"), bytes("1"));
+ execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", bytes("foo123"), bytes("2"), bytes("1"));
+
- for (boolean flush : new boolean[]{false, true})
- {
- if (flush)
- flush();
-
++ beforeAndAfterFlush(() -> {
+ // Test clustering columns restrictions
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c = textAsBlob('');"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) = (textAsBlob(''));"));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c IN (textAsBlob(''), textAsBlob('1'));"),
+ row(bytes("foo123"), bytes("1"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) IN ((textAsBlob('')), (textAsBlob('1')));"),
+ row(bytes("foo123"), bytes("1"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), bytes("1"), bytes("1")),
+ row(bytes("foo123"), bytes("2"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), bytes("1"), bytes("1")),
+ row(bytes("foo123"), bytes("2"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), bytes("1"), bytes("1")),
+ row(bytes("foo123"), bytes("2"), bytes("1")));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c <= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) <= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) > (textAsBlob('')) AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
- }
++ });
+
+ execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)",
+ bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1"));
+
- for (boolean flush : new boolean[]{false, true})
- {
- if (flush)
- flush();
++ beforeAndAfterFlush(() -> {
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c = textAsBlob('');"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) = (textAsBlob(''));"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c IN (textAsBlob(''), textAsBlob('1'));"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+ row(bytes("foo123"), bytes("1"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) IN ((textAsBlob('')), (textAsBlob('1')));"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+ row(bytes("foo123"), bytes("1"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), bytes("1"), bytes("1")),
+ row(bytes("foo123"), bytes("2"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+ row(bytes("foo123"), bytes("1"), bytes("1")),
+ row(bytes("foo123"), bytes("2"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")),
+ row(bytes("foo123"), bytes("1"), bytes("1")),
+ row(bytes("foo123"), bytes("2"), bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c <= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) <= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"),
+ row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"));
+
+ // Test restrictions on non-primary key value
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');"));
- }
++ });
+
+ execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)",
+ bytes("foo123"), bytes("3"), EMPTY_BYTE_BUFFER);
+
- for (boolean flush : new boolean[]{false, true})
- {
- if (flush)
- flush();
++ beforeAndAfterFlush(() -> {
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');"),
+ row(bytes("foo123"), bytes("3"), EMPTY_BYTE_BUFFER));
- }
++ });
+ }
+
+ @Test
+ public void testEmptyRestrictionValueWithSecondaryIndexAndCompactTables() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk blob, c blob, v blob, PRIMARY KEY ((pk), c)) WITH COMPACT STORAGE");
- assertInvalidMessage("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables",
++ assertInvalidMessage("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns",
+ "CREATE INDEX on %s(c)");
+
+ createTable("CREATE TABLE %s (pk blob PRIMARY KEY, v blob) WITH COMPACT STORAGE");
+ createIndex("CREATE INDEX on %s(v)");
+
+ execute("INSERT INTO %s (pk, v) VALUES (?, ?)", bytes("foo123"), bytes("1"));
+
+ // Test restrictions on non-primary key value
+ assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');"));
+
+ execute("INSERT INTO %s (pk, v) VALUES (?, ?)", bytes("foo124"), EMPTY_BYTE_BUFFER);
+
+ assertRows(execute("SELECT * FROM %s WHERE v = textAsBlob('');"),
+ row(bytes("foo124"), EMPTY_BYTE_BUFFER));
+ }
+
- /**
- * Custom index used to test the behavior of the system when the index is not ready.
- * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code>
- * to avoid the check but return a <code>CompositesSearcher</code>.
- */
- public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex
+ private ResultMessage.Prepared prepareStatement(String cql, boolean forThrift)
{
- private volatile CountDownLatch latch = new CountDownLatch(1);
-
- @Override
- public void index(ByteBuffer rowKey, ColumnFamily cf)
- {
- try
- {
- latch.await();
- }
- catch (InterruptedException e)
- {
- Thread.interrupted();
- }
- }
-
- @Override
- public void delete(DecoratedKey key, Group opGroup)
- {
- }
+ return QueryProcessor.prepare(String.format(cql, KEYSPACE, currentTable()),
+ ClientState.forInternalCalls(),
+ forThrift);
+ }
- @Override
- public void init()
- {
- }
+ private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp)
+ {
+ assertNotNull(cell);
+ assertEquals(0, def.type.compare(cell.value(), val));
+ assertEquals(timestamp, cell.timestamp());
+ }
- @Override
- public void reload()
- {
- }
+ private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm)
+ {
+ ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
+ AbstractType<?> type = col.type;
+ assertEquals(expected, type.compose(row.getCell(col).value()));
+ }
- @Override
- public void validateOptions() throws ConfigurationException
- {
- }
+ /**
+ * <code>CassandraIndex</code> that blocks during the initialization.
+ */
+ public static class IndexBlockingOnInitialization extends CustomCassandraIndex
+ {
+ private final CountDownLatch latch = new CountDownLatch(1);
- @Override
- public String getIndexName()
+ public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
{
- return "testIndex";
+ super(baseCfs, indexDef);
}
@Override