You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:40 UTC
[09/15] cassandra git commit: New 2i API and implementations for
built in indexes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java b/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java
new file mode 100644
index 0000000..f1dc3af
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/RegularColumnIndex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on a REGULAR column definition on a composite type.
+ *
+ * A cell indexed by this index will have the general form:
+ * ck_0 ... ck_n c_name : v
+ * where ck_i are the cluster keys, c_name the last component of the cell
+ * composite name (or second to last if collections are in use, but this
+ * has no impact) and v the cell value.
+ *
+ * Such a cell is indexed if c_name == columnDef.name, and it will generate
+ * (makeIndexColumnName()) an index entry whose:
+ * - row key will be the value v (getIndexedValue()).
+ * - cell name will
+ * rk ck_0 ... ck_n
+ * where rk is the row key of the initial cell. I.e. the index entry store
+ * all the information require to locate back the indexed cell.
+ */
+public class RegularColumnIndex extends CassandraIndex
+{
+ public RegularColumnIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path,
+ ByteBuffer cellValue)
+ {
+ return cellValue;
+ }
+
+ public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path)
+ {
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(partitionKey);
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+
+ return builder;
+ }
+
+ public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+ {
+ Clustering clustering = indexEntry.clustering();
+ ClusteringComparator baseComparator = baseCfs.getComparator();
+ CBuilder builder = CBuilder.create(baseComparator);
+ for (int i = 0; i < baseComparator.size(); i++)
+ builder.add(clustering.get(i + 1));
+
+ return new IndexEntry(indexedValue,
+ clustering,
+ indexEntry.primaryKeyLivenessInfo().timestamp(),
+ clustering.get(0),
+ builder.build());
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ Cell cell = data.getCell(indexedColumn);
+ return cell == null
+ || !cell.isLive(nowInSec)
+ || indexedColumn.type.compare(indexValue, cell.value()) != 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
new file mode 100644
index 0000000..53ecd01
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
@@ -0,0 +1,62 @@
+package org.apache.cassandra.index.internal.keys;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+public class KeysIndex extends CassandraIndex
+{
+ public KeysIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+ CFMetaData baseMetadata,
+ ColumnDefinition cfDef)
+ {
+ // no additional clustering columns required
+ return builder;
+ }
+
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path)
+ {
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(partitionKey);
+ return builder;
+ }
+
+ protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path, ByteBuffer cellValue)
+ {
+ return cellValue;
+ }
+
+ public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+ {
+ throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format");
+ }
+
+ public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
+ {
+ if (row == null)
+ return true;
+
+ Cell cell = row.getCell(indexedColumn);
+
+ return (cell == null
+ || !cell.isLive(nowInSec)
+ || indexedColumn.type.compare(indexValue, cell.value()) != 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
new file mode 100644
index 0000000..b60d2d9
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.keys;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.CassandraIndexSearcher;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class KeysSearcher extends CassandraIndexSearcher
+{
+ private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
+
+ public KeysSearcher(ReadCommand command,
+ RowFilter.Expression expression,
+ CassandraIndex indexer)
+ {
+ super(command, expression, indexer);
+ }
+
+ protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
+ final RowIterator indexHits,
+ final ReadCommand command,
+ final ReadOrderGroup orderGroup)
+ {
+ assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
+
+ return new UnfilteredPartitionIterator()
+ {
+ private UnfilteredRowIterator next;
+
+ public boolean isForThrift()
+ {
+ return command.isForThrift();
+ }
+
+ public CFMetaData metadata()
+ {
+ return command.metadata();
+ }
+
+ public boolean hasNext()
+ {
+ return prepareNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ if (next == null)
+ prepareNext();
+
+ UnfilteredRowIterator toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ private boolean prepareNext()
+ {
+ while (next == null && indexHits.hasNext())
+ {
+ Row hit = indexHits.next();
+ DecoratedKey key = index.baseCfs.decorateKey(hit.clustering().get(0));
+
+ SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
+ index.baseCfs.metadata,
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ DataLimits.NONE,
+ key,
+ command.clusteringIndexFilter(key));
+
+ @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
+ // Otherwise, we close right away if empty, and if it's assigned to next it will be called either
+ // by the next caller of next, or through closing this iterator is this come before.
+ UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(index.baseCfs,
+ orderGroup.baseReadOpOrderGroup()),
+ hit,
+ indexKey.getKey(),
+ orderGroup.writeOpOrderGroup(),
+ isForThrift(),
+ command.nowInSec());
+
+ if (dataIter != null)
+ {
+ if (dataIter.isEmpty())
+ dataIter.close();
+ else
+ next = dataIter;
+ }
+ }
+ return next != null;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ indexHits.close();
+ if (next != null)
+ next.close();
+ }
+ };
+ }
+
+ private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
+ Row indexHit,
+ ByteBuffer indexedValue,
+ OpOrder.Group writeOp,
+ boolean isForThrift,
+ int nowInSec)
+ {
+ if (isForThrift)
+ {
+ // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
+ // is the indexed name. Ans so we need to materialize the partition.
+ ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
+ iterator.close();
+ Row data = result.getRow(new Clustering(index.getIndexedColumn().name.bytes));
+
+ // for thrift tables, we need to compare the index entry against the compact value column,
+ // not the column actually designated as the indexed column so we don't use the index function
+ // lib for the staleness check like we do in every other case
+ Cell baseData = data.getCell(index.baseCfs.metadata.compactValueColumn());
+ if (baseData == null || !baseData.isLive(nowInSec) || index.getIndexedColumn().type.compare(indexedValue, baseData.value()) != 0)
+ {
+ // Index is stale, remove the index entry and ignore
+ index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
+ new Clustering(index.getIndexedColumn().name.bytes),
+ new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+ writeOp);
+ return null;
+ }
+ else
+ {
+ return result.unfilteredIterator();
+ }
+ }
+ else
+ {
+ assert iterator.metadata().isCompactTable();
+ Row data = iterator.staticRow();
+ if (index.isStale(data, indexedValue, nowInSec))
+ {
+ // Index is stale, remove the index entry and ignore
+ index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
+ makeIndexClustering(iterator.partitionKey().getKey(), Clustering.EMPTY),
+ new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
+ writeOp);
+ iterator.close();
+ return null;
+ }
+ else
+ {
+ return iterator;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java b/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java
new file mode 100644
index 0000000..1d6ba56
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/CleanupTransaction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.rows.Row;
+
+/**
+ * Performs garbage collection of index entries during a cleanup.
+ *
+ * Notifies registered indexers of each partition being removed and
+ *
+ * Compaction & Cleanup are somewhat simpler than dealing with incoming writes,
+ * being only concerned with cleaning up stale index entries.
+ *
+ * When multiple versions of a row are compacted, the CleanupTransaction is
+ * notified of the versions being merged, which it diffs against the merge result
+ * and forwards to the registered Index.Indexer instances when on commit.
+ *
+ * Instances are currently scoped to a single row within a partition, but this could be improved to batch process
+ * multiple rows within a single partition.
+ */
+public interface CleanupTransaction extends IndexTransaction
+{
+
+ void onPartitionDeletion(DeletionTime deletionTime);
+ void onRowDelete(Row row);
+
+ CleanupTransaction NO_OP = new CleanupTransaction()
+ {
+ public void start(){}
+ public void onPartitionDeletion(DeletionTime deletionTime){}
+ public void onRowDelete(Row row){}
+ public void commit(){}
+ };
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java b/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java
new file mode 100644
index 0000000..a9fbf41
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/CompactionTransaction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.rows.Row;
+
+/**
+ * Performs garbage collection of stale index entries during a regular compaction.
+ *
+ * A CompactionTransaction is concerned with cleaning up stale index entries.
+ * When multiple versions of a row are compacted, the CompactionTransaction is
+ * notified of the versions being merged, which it diffs against the merge result.
+ *
+ * Instances are currently scoped to a single row within a partition, but this could be improved to batch process
+ * multiple rows within a single partition.
+ */
+public interface CompactionTransaction extends IndexTransaction
+{
+ void onRowMerge(Columns columns, Row merged, Row...versions);
+
+ CompactionTransaction NO_OP = new CompactionTransaction()
+ {
+ public void start(){}
+ public void onRowMerge(Columns columns, Row merged, Row...versions){}
+ public void commit(){}
+ };
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java b/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java
new file mode 100644
index 0000000..3fb8235
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/IndexTransaction.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+/**
+ * Base interface for the handling of index updates.
+ * There are 3 types of transaction where indexes are updated to stay in sync with the base table, each represented by
+ * a subinterface:
+ * * {@code UpdateTransaction}
+ * Used on the regular write path and when indexing newly acquired SSTables from streaming or sideloading. This type
+ * of transaction may include both row inserts and updates to rows previously existing in the base Memtable. Instances
+ * are scoped to a single partition update and are obtained from the factory method
+ * @{code SecondaryIndexManager#newUpdateTransaction}
+ *
+ * * {@code CompactionTransaction}
+ * Used during compaction when stale entries which have been superceded are cleaned up from the index. As rows in a
+ * partition are merged during the compaction, index entries for any purged rows are cleaned from the index to
+ * compensate for the fact that they may not have been removed at write time if the data in the base table had been
+ * already flushed to disk (and so was processed as an insert, not an update by the UpdateTransaction). These
+ * transactions are currently scoped to a single row within a partition, but this could be improved to batch process
+ * multiple rows within a single partition.
+ *
+ * * @{code CleanupTransaction}
+ * During cleanup no merging is required, the only thing to do is to notify indexes of the partitions being removed,
+ * along with the rows within those partitions. Like with compaction, these transactions are currently scoped to a
+ * single row within a partition, but this could be improved with batching.
+ */
+public interface IndexTransaction
+{
+ /**
+ * Used to differentiate between type of index transaction when obtaining
+ * a handler from Index implementations.
+ */
+ public enum Type
+ {
+ UPDATE, COMPACTION, CLEANUP
+ }
+
+ void start();
+ void commit();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java b/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java
new file mode 100644
index 0000000..c78304a
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/transactions/UpdateTransaction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.transactions;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.rows.Row;
+
+/**
+ * Handling of index updates on the write path.
+ *
+ * Instances of an UpdateTransaction are scoped to a single partition update
+ * A new instance is used for every write, obtained from the
+ * newUpdateTransaction(PartitionUpdate) method. Likewise, a single
+ * CleanupTransaction instance is used for each partition processed during a
+ * compaction or cleanup.
+ *
+ * We make certain guarantees about the lifecycle of each UpdateTransaction
+ * instance. Namely that start() will be called before any other method, and
+ * commit() will be called at the end of the update.
+ * Each instance is initialized with 1..many Index.Indexer instances, one per
+ * registered Index. As with the transaction itself, these are scoped to a
+ * specific partition update, so implementations can be assured that all indexing
+ * events they receive relate to the same logical operation.
+ *
+ * onPartitionDelete(), onRangeTombstone(), onInserted() and onUpdated()
+ * calls may arrive in any order, but this should have no impact for the
+ * Indexers being notified as any events delivered to a single instance
+ * necessarily relate to a single partition.
+ *
+ * The typical sequence of events during a Memtable update would be:
+ * start() -- no-op, used to notify Indexers of the start of the transaction
+ * onPartitionDeletion(dt) -- if the PartitionUpdate implies one
+ * onRangeTombstone(rt)* -- for each in the PartitionUpdate, if any
+ *
+ * then:
+ * onInserted(row)* -- called for each Row not already present in the Memtable
+ * onUpdated(existing, updated)* -- called for any Row in the update for where a version was already present
+ * in the Memtable. It's important to note here that existing is the previous
+ * row from the Memtable & updated is the final version replacing it. It is
+ * *not* the incoming row, but the result of merging the incoming and existing
+ * rows.
+ * commit() -- finally, finish is called when the new Partition is swapped into the Memtable
+ */
+public interface UpdateTransaction extends IndexTransaction
+{
+ void onPartitionDeletion(DeletionTime deletionTime);
+ void onRangeTombstone(RangeTombstone rangeTombstone);
+ void onInserted(Row row);
+ void onUpdated(Row existing, Row updated);
+
+ UpdateTransaction NO_OP = new UpdateTransaction()
+ {
+ public void start(){}
+ public void onPartitionDeletion(DeletionTime deletionTime){}
+ public void onRangeTombstone(RangeTombstone rangeTombstone){}
+ public void onInserted(Row row){}
+ public void onUpdated(Row existing, Row updated){}
+ public void commit(){}
+ };
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 5908594..9f68cea 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -40,14 +40,18 @@ import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.lifecycle.TransactionLog;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.rows.SliceableUnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.sstable.*;
@@ -340,8 +344,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
IndexMetadata def = parent.getIndexes()
.get(indexName)
- .orElseThrow(() -> new AssertionError("Could not find index metadata for index cf " + i));
- metadata = SecondaryIndex.newIndexMetadata(parent, def);
+ .orElseThrow(() -> new AssertionError(
+ "Could not find index metadata for index cf " + i));
+ metadata = CassandraIndex.indexCfsMetadata(parent, def);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index af07cee..40a75c6 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.schema;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -25,20 +26,26 @@ import java.util.Set;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.cql3.statements.IndexTarget;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.FBUtilities;
/**
* An immutable representation of secondary index metadata.
*/
public final class IndexMetadata
{
+ private static final Logger logger = LoggerFactory.getLogger(IndexMetadata.class);
+
public enum IndexType
{
KEYS, CUSTOM, COMPOSITES
@@ -68,20 +75,20 @@ public final class IndexMetadata
this.columns = columns == null ? ImmutableSet.of() : ImmutableSet.copyOf(columns);
}
- public static IndexMetadata legacyIndex(ColumnIdentifier column,
- String name,
- IndexType type,
- Map<String, String> options)
+ public static IndexMetadata singleColumnIndex(ColumnIdentifier column,
+ String name,
+ IndexType type,
+ Map<String, String> options)
{
return new IndexMetadata(name, options, type, TargetType.COLUMN, Collections.singleton(column));
}
- public static IndexMetadata legacyIndex(ColumnDefinition column,
- String name,
- IndexType type,
- Map<String, String> options)
+ public static IndexMetadata singleColumnIndex(ColumnDefinition column,
+ String name,
+ IndexType type,
+ Map<String, String> options)
{
- return legacyIndex(column.name, name, type, options);
+ return singleColumnIndex(column.name, name, type, options);
}
public static boolean isNameValid(String name)
@@ -107,11 +114,54 @@ public final class IndexMetadata
throw new ConfigurationException("Target type is null for index " + name);
if (indexType == IndexMetadata.IndexType.CUSTOM)
- if (options == null || !options.containsKey(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME))
+ {
+ if (options == null || !options.containsKey(IndexTarget.CUSTOM_INDEX_OPTION_NAME))
throw new ConfigurationException(String.format("Required option missing for index %s : %s",
- name, SecondaryIndex.CUSTOM_INDEX_OPTION_NAME));
+ name, IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+ String className = options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
+ Class<Index> indexerClass = FBUtilities.classForName(className, "custom indexer");
+ if(!Index.class.isAssignableFrom(indexerClass))
+ throw new ConfigurationException(String.format("Specified Indexer class (%s) does not implement the Indexer interface", className));
+ validateCustomIndexOptions(indexerClass, options);
+ }
+ }
+
+ private void validateCustomIndexOptions(Class<? extends Index> indexerClass, Map<String, String> options) throws ConfigurationException
+ {
+ try
+ {
+ Map<String, String> filteredOptions =
+ Maps.filterKeys(options,key -> !key.equals(IndexTarget.CUSTOM_INDEX_OPTION_NAME));
+
+ if (filteredOptions.isEmpty())
+ return;
+
+ Map<?,?> unknownOptions = (Map) indexerClass.getMethod("validateOptions", Map.class).invoke(null, filteredOptions);
+ if (!unknownOptions.isEmpty())
+ throw new ConfigurationException(String.format("Properties specified %s are not understood by %s", unknownOptions.keySet(), indexerClass.getSimpleName()));
+ }
+ catch (NoSuchMethodException e)
+ {
+ logger.info("Indexer {} does not have a static validateOptions method. Validation ignored",
+ indexerClass.getName());
+ }
+ catch (InvocationTargetException e)
+ {
+ if (e.getTargetException() instanceof ConfigurationException)
+ throw (ConfigurationException) e.getTargetException();
+ throw new ConfigurationException("Failed to validate custom indexer options: " + options);
+ }
+ catch (ConfigurationException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new ConfigurationException("Failed to validate custom indexer options: " + options);
+ }
}
+ // to be removed in CASSANDRA-10124 with multi-target & row based indexes
public ColumnDefinition indexedColumn(CFMetaData cfm)
{
return cfm.getColumnDefinition(columns.iterator().next());
@@ -132,11 +182,29 @@ public final class IndexMetadata
return indexType == IndexType.COMPOSITES;
}
+ public boolean isRowIndex()
+ {
+ return targetType == TargetType.ROW;
+ }
+
+ public boolean isColumnIndex()
+ {
+ return targetType == TargetType.COLUMN;
+ }
+
public int hashCode()
{
return Objects.hashCode(name, indexType, targetType, options, columns);
}
+ public boolean equalsWithoutName(IndexMetadata other)
+ {
+ return Objects.equal(indexType, other.indexType)
+ && Objects.equal(targetType, other.targetType)
+ && Objects.equal(columns, other.columns)
+ && Objects.equal(options, other.options);
+ }
+
public boolean equals(Object obj)
{
if (obj == this)
@@ -147,11 +215,7 @@ public final class IndexMetadata
IndexMetadata other = (IndexMetadata)obj;
- return Objects.equal(name, other.name)
- && Objects.equal(indexType, other.indexType)
- && Objects.equal(targetType, other.targetType)
- && Objects.equal(options, other.options)
- && Objects.equal(columns, other.columns);
+ return Objects.equal(name, other.name) && equalsWithoutName(other);
}
public String toString()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 7c930b3..6227e0b 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.schema;
import java.util.*;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
@@ -39,17 +40,13 @@ import static com.google.common.collect.Iterables.filter;
*/
public class Indexes implements Iterable<IndexMetadata>
{
- // lookup for index by target column
- private final ImmutableMap<ColumnIdentifier, IndexMetadata> indexes;
+ private final ImmutableMap<String, IndexMetadata> indexes;
+ private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
private Indexes(Builder builder)
{
- ImmutableMap.Builder<ColumnIdentifier, IndexMetadata> internalBuilder = ImmutableMap.builder();
- builder.indexes.build()
- .values()
- .stream()
- .forEach(def -> internalBuilder.put(def.columns.iterator().next(), def));
- indexes = internalBuilder.build();
+ indexes = builder.indexes.build();
+ indexesByColumn = builder.indexesByColumn.build();
}
public static Builder builder()
@@ -105,9 +102,9 @@ public class Indexes implements Iterable<IndexMetadata>
* @param column a column definition for which an {@link IndexMetadata} is being sought
* @return an empty {@link Optional} if the named index is not found; a non-empty optional of {@link IndexMetadata} otherwise
*/
- public Optional<IndexMetadata> get(ColumnDefinition column)
+ public Collection<IndexMetadata> get(ColumnDefinition column)
{
- return Optional.ofNullable(indexes.get(column.name));
+ return indexesByColumn.get(column.name);
}
/**
@@ -117,7 +114,7 @@ public class Indexes implements Iterable<IndexMetadata>
*/
public boolean hasIndexFor(ColumnDefinition column)
{
- return indexes.get(column.name) != null;
+ return !indexesByColumn.get(column.name).isEmpty();
}
/**
@@ -183,6 +180,7 @@ public class Indexes implements Iterable<IndexMetadata>
public static final class Builder
{
final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>();
+ final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
private Builder()
{
@@ -196,6 +194,13 @@ public class Indexes implements Iterable<IndexMetadata>
public Builder add(IndexMetadata index)
{
indexes.put(index.name, index);
+ // All indexes are column indexes at the moment
+ if (index.isColumnIndex())
+ {
+ for (ColumnIdentifier target : index.columns)
+ indexesByColumn.put(target, index);
+
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index c8b566c..7c0eadf 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -27,7 +27,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.*;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.functions.FunctionName;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.cql3.functions.UDFunction;
@@ -628,7 +630,7 @@ public final class LegacySchemaMigrator
isStaticCompactTable,
needsUpgrade);
- indexes.add(IndexMetadata.legacyIndex(column, indexName, indexType, indexOptions));
+ indexes.add(IndexMetadata.singleColumnIndex(column, indexName, indexType, indexOptions));
}
return indexes.build();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index d35ef82..acd9e93 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.partitions.*;
@@ -1427,7 +1426,7 @@ public final class SchemaKeyspace
findColumnIdentifierWithName(targetColumnName, cfm.allColumns()).ifPresent(targetColumns::add);
});
}
- return IndexMetadata.legacyIndex(targetColumns.iterator().next(), name, type, options);
+ return IndexMetadata.singleColumnIndex(targetColumns.iterator().next(), name, type, options);
}
private static Optional<ColumnIdentifier> findColumnIdentifierWithName(String name,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 3dc323e..cd69ef3 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -26,14 +26,12 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.net.AsyncOneResponse;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
@@ -237,7 +235,7 @@ public class DataResolver extends ResponseResolver
if (merged.isEmpty())
return;
- Rows.diff(merged, columns, versions, diffListener);
+ Rows.diff(diffListener, merged, columns, versions);
for (int i = 0; i < currentRows.length; i++)
{
if (currentRows[i] != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 12c2c24..9d999ee 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,11 +31,6 @@ import com.google.common.base.Predicate;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.db.view.MaterializedViewManager;
-import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.db.HintedHandOffManager;
-import org.apache.cassandra.metrics.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,31 +41,31 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.Bounds;
-import org.apache.cassandra.dht.RingPosition;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.view.MaterializedViewManager;
+import org.apache.cassandra.db.view.MaterializedViewUtils;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.LocalStrategy;
-import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.locator.*;
+import org.apache.cassandra.metrics.*;
import org.apache.cassandra.net.*;
-import org.apache.cassandra.service.paxos.*;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PrepareCallback;
+import org.apache.cassandra.service.paxos.ProposeCallback;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.*;
public class StorageProxy implements StorageProxyMBean
{
@@ -1698,11 +1693,10 @@ public class StorageProxy implements StorageProxyMBean
private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
{
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
- SecondaryIndexSearcher searcher = cfs.indexManager.getBestIndexSearcherFor(command);
-
- float maxExpectedResults = searcher == null
+ Index index = cfs.indexManager.getBestIndexFor(command);
+ float maxExpectedResults = index == null
? command.limits().estimateTotalResults(cfs)
- : searcher.highestSelectivityIndex(command.rowFilter()).estimateResultRows();
+ : index.getEstimatedResultRows();
// adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
@@ -1962,7 +1956,9 @@ public class StorageProxy implements StorageProxyMBean
}
Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
- return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), command.limits(), command.nowInSec());
+ // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
+ // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
+ return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), DataLimits.NONE, command.nowInSec());
}
public void close()
@@ -2003,7 +1999,8 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
// Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
- return command.postReconciliationProcessing(command.limits().filter(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel), command.nowInSec()));
+
+ return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
}
public Map<String, List<String>> getSchemaVersions()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 2bcbbc1..52c8884 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -17,10 +17,7 @@
*/
package org.apache.cassandra.streaming;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,21 +28,17 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.RowIterators;
-import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
-
import org.apache.cassandra.utils.concurrent.Refs;
/**
@@ -176,7 +169,7 @@ public class StreamReceiveTask extends StreamTask
// add sstables and build secondary indexes
cfs.addSSTables(readers);
- cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
+ cfs.indexManager.buildAllIndexesBlocking(readers);
}
}
catch (Throwable t)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 7a2126c..fd391aa 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -38,12 +38,12 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.statements.ParsedStatement;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TimeUUIDType;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.*;
@@ -826,6 +826,9 @@ public class CassandraServer implements Cassandra.Iface
Cell cell = cellFromColumn(metadata, name, column);
PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
+ // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details
+ Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);
+
mutation = new org.apache.cassandra.db.Mutation(update);
}
catch (MarshalException|UnknownColumnException e)
@@ -916,6 +919,8 @@ public class CassandraServer implements Cassandra.Iface
int nowInSec = FBUtilities.nowInSeconds();
PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec));
+ // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details
+ Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(partitionUpdates);
FilteredPartition partitionExpected = null;
if (!expected.isEmpty())
@@ -1121,6 +1126,9 @@ public class CassandraServer implements Cassandra.Iface
DecoratedKey dk = metadata.decorateKey(key);
PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()));
+ // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details
+ Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);
+
org.apache.cassandra.db.Mutation mutation;
if (metadata.isCounter())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index a74bcea..4c17a4b 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -23,21 +23,17 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
-import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.CompactTables;
import org.apache.cassandra.db.LegacyLayout;
import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.schema.*;
@@ -560,10 +556,10 @@ public class ThriftConversion
Map<String, String> indexOptions = def.getIndex_options();
IndexMetadata.IndexType indexType = IndexMetadata.IndexType.valueOf(def.index_type.name());
- indexes.add(IndexMetadata.legacyIndex(column,
- indexName,
- indexType,
- indexOptions));
+ indexes.add(IndexMetadata.singleColumnIndex(column,
+ indexName,
+ indexType,
+ indexOptions));
}
}
return indexes.build();
@@ -576,13 +572,22 @@ public class ThriftConversion
cd.setName(ByteBufferUtil.clone(column.name.bytes));
cd.setValidation_class(column.type.toString());
- Optional<IndexMetadata> index = cfMetaData.getIndexes().get(column);
- index.ifPresent(def -> {
- cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(def.indexType.name()));
- cd.setIndex_name(def.name);
- cd.setIndex_options(def.options == null || def.options.isEmpty() ? null : Maps.newHashMap(def.options));
- });
-
+ Collection<IndexMetadata> indexes = cfMetaData.getIndexes().get(column);
+ // we include the index in the ColumnDef iff
+ // * it is the only index on the column
+ // * it is the only target column for the index
+ if (indexes.size() == 1)
+ {
+ IndexMetadata index = indexes.iterator().next();
+ if (index.columns.size() == 1)
+ {
+ cd.setIndex_type(org.apache.cassandra.thrift.IndexType.valueOf(index.indexType.name()));
+ cd.setIndex_name(index.name);
+ cd.setIndex_options(index.options == null || index.options.isEmpty()
+ ? null
+ : Maps.newHashMap(index.options));
+ }
+ }
return cd;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index 054b466..71aa335 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -18,18 +18,24 @@
package org.apache.cassandra.thrift;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -449,8 +455,6 @@ public class ThriftValidation
LegacyLayout.LegacyCellName cn = LegacyLayout.decodeCellName(metadata, scName, column.name);
cn.column.validateCellValue(column.value);
- // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details
- Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(cn.column, column.value, null);
}
catch (UnknownColumnException e)
{
@@ -609,7 +613,8 @@ public class ThriftValidation
me.getMessage()));
}
- isIndexed |= (expression.op == IndexOperator.EQ) && idxManager.indexes(def);
+ for(Index index : idxManager.listIndexes())
+ isIndexed |= index.supportsExpression(def, Operator.valueOf(expression.op.name()));
}
return isIndexed;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 1c0dd76..8675d7f 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -27,15 +27,15 @@ import org.junit.BeforeClass;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.index.PerRowSecondaryIndexTest;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.schema.*;
+import org.apache.cassandra.index.StubIndex;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.*;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -218,10 +218,6 @@ public class SchemaLoader
schema.add(KeyspaceMetadata.create(ks_nocommit, KeyspaceParams.simpleTransient(1), Tables.of(
standardCFMD(ks_nocommit, "Standard1"))));
- // PerRowSecondaryIndexTest
- schema.add(KeyspaceMetadata.create(ks_prsi, KeyspaceParams.simple(1), Tables.of(
- perRowIndexedCFMD(ks_prsi, "Indexed1"))));
-
// CQLKeyspace
schema.add(KeyspaceMetadata.create(ks_cql, KeyspaceParams.simple(1), Tables.of(
@@ -291,8 +287,8 @@ public class SchemaLoader
public static CFMetaData perRowIndexedCFMD(String ksName, String cfName)
{
final Map<String, String> indexOptions = Collections.singletonMap(
- SecondaryIndex.CUSTOM_INDEX_OPTION_NAME,
- PerRowSecondaryIndexTest.TestIndex.class.getName());
+ IndexTarget.CUSTOM_INDEX_OPTION_NAME,
+ StubIndex.class.getName());
CFMetaData cfm = CFMetaData.Builder.create(ksName, cfName)
.addPartitionKey("key", AsciiType.instance)
@@ -303,10 +299,10 @@ public class SchemaLoader
cfm.indexes(
cfm.getIndexes()
- .with(IndexMetadata.legacyIndex(indexedColumn,
- "indexe1",
- IndexMetadata.IndexType.CUSTOM,
- indexOptions)));
+ .with(IndexMetadata.singleColumnIndex(indexedColumn,
+ "indexe1",
+ IndexMetadata.IndexType.CUSTOM,
+ indexOptions)));
return cfm;
}
@@ -414,10 +410,10 @@ public class SchemaLoader
if (withIndex)
cfm.indexes(
cfm.getIndexes()
- .with(IndexMetadata.legacyIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
- "birthdate_key_index",
- IndexMetadata.IndexType.COMPOSITES,
- Collections.EMPTY_MAP)));
+ .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+ "birthdate_key_index",
+ IndexMetadata.IndexType.COMPOSITES,
+ Collections.EMPTY_MAP)));
return cfm.compression(getCompressionParameters());
}
@@ -434,10 +430,10 @@ public class SchemaLoader
if (withIndex)
cfm.indexes(
cfm.getIndexes()
- .with(IndexMetadata.legacyIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
- "birthdate_composite_index",
- IndexMetadata.IndexType.KEYS,
- Collections.EMPTY_MAP)));
+ .with(IndexMetadata.singleColumnIndex(cfm.getColumnDefinition(new ColumnIdentifier("birthdate", true)),
+ "birthdate_composite_index",
+ IndexMetadata.IndexType.KEYS,
+ Collections.EMPTY_MAP)));
return cfm.compression(getCompressionParameters());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index e8451e0..5e19d5e 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -32,7 +32,6 @@ import java.util.function.Supplier;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.CFMetaData;
@@ -40,13 +39,15 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
@@ -531,4 +532,10 @@ public class Util
{
thread.join(10000);
}
+
+ // for use with Optional in tests, can be used as an argument to orElseThrow
+ public static Supplier<AssertionError> throwAssert(final String message)
+ {
+ return () -> new AssertionError(message);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index be3568a..02b2abd 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@ -18,20 +18,24 @@
package org.apache.cassandra.cql3.validation.entities;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import org.apache.commons.lang3.StringUtils;
-
import org.junit.Test;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.index.StubIndex;
+import static org.apache.cassandra.Util.throwAssert;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -70,13 +74,23 @@ public class SecondaryIndexTest extends CQLTester
private void testCreateAndDropIndex(String indexName, boolean addKeyspaceOnDrop) throws Throwable
{
execute("USE system");
- assertInvalidMessage("Index '" + removeQuotes(indexName.toLowerCase(Locale.US)) + "' could not be found", "DROP INDEX " + indexName + ";");
+ assertInvalidMessage(String.format("Index '%s' could not be found",
+ removeQuotes(indexName.toLowerCase(Locale.US))),
+ "DROP INDEX " + indexName + ";");
createTable("CREATE TABLE %s (a int primary key, b int);");
createIndex("CREATE INDEX " + indexName + " ON %s(b);");
createIndex("CREATE INDEX IF NOT EXISTS " + indexName + " ON %s(b);");
- assertInvalidMessage("Index already exists", "CREATE INDEX " + indexName + " ON %s(b)");
+ assertInvalidMessage(String.format("Index %s already exists",
+ removeQuotes(indexName.toLowerCase(Locale.US))),
+ "CREATE INDEX " + indexName + " ON %s(b)");
+
+ String otherIndexName = "index_" + System.nanoTime();
+ assertInvalidMessage(String.format("Index %s is a duplicate of existing index %s",
+ removeQuotes(otherIndexName.toLowerCase(Locale.US)),
+ removeQuotes(indexName.toLowerCase(Locale.US))),
+ "CREATE INDEX " + otherIndexName + " ON %s(b)");
execute("INSERT INTO %s (a, b) values (?, ?);", 0, 0);
execute("INSERT INTO %s (a, b) values (?, ?);", 1, 1);
@@ -84,7 +98,8 @@ public class SecondaryIndexTest extends CQLTester
execute("INSERT INTO %s (a, b) values (?, ?);", 3, 1);
assertRows(execute("SELECT * FROM %s where b = ?", 1), row(1, 1), row(3, 1));
- assertInvalidMessage("Index '" + removeQuotes(indexName.toLowerCase(Locale.US)) + "' could not be found in any of the tables of keyspace 'system'",
+ assertInvalidMessage(String.format("Index '%s' could not be found in any of the tables of keyspace 'system'",
+ removeQuotes(indexName.toLowerCase(Locale.US))),
"DROP INDEX " + indexName);
if (addKeyspaceOnDrop)
@@ -100,7 +115,9 @@ public class SecondaryIndexTest extends CQLTester
assertInvalidMessage("No supported secondary index found for the non primary key columns restrictions",
"SELECT * FROM %s where b = ?", 1);
dropIndex("DROP INDEX IF EXISTS " + indexName);
- assertInvalidMessage("Index '" + removeQuotes(indexName.toLowerCase(Locale.US)) + "' could not be found", "DROP INDEX " + indexName);
+ assertInvalidMessage(String.format("Index '%s' could not be found",
+ removeQuotes(indexName.toLowerCase(Locale.US))),
+ "DROP INDEX " + indexName);
}
/**
@@ -189,12 +206,10 @@ public class SecondaryIndexTest extends CQLTester
public void testUnknownCompressionOptions() throws Throwable
{
String tableName = createTableName();
- assertInvalidThrow(SyntaxException.class, String.format(
- "CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression_parameters:sstable_compressor = 'DeflateCompressor'", tableName));
+ assertInvalidThrow(SyntaxException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression_parameters:sstable_compressor = 'DeflateCompressor'", tableName));
-
- assertInvalidThrow(ConfigurationException.class, String.format(
- "CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression = { 'sstable_compressor': 'DeflateCompressor' }", tableName));
+ assertInvalidThrow(ConfigurationException.class, String.format("CREATE TABLE %s (key varchar PRIMARY KEY, password varchar, gender varchar) WITH compression = { 'sstable_compressor': 'DeflateCompressor' }",
+ tableName));
}
/**
@@ -400,9 +415,6 @@ public class SecondaryIndexTest extends CQLTester
assertRows(execute("SELECT k, v FROM %s WHERE k = 0 AND m CONTAINS KEY 'a'"), row(0, 0), row(0, 1));
assertRows(execute("SELECT k, v FROM %s WHERE m CONTAINS KEY 'c'"), row(0, 2));
assertEmpty(execute("SELECT k, v FROM %s WHERE m CONTAINS KEY 'd'"));
-
- // we're not allowed to create a value index if we already have a key one
- assertInvalid("CREATE INDEX ON %s(m)");
}
/**
@@ -412,7 +424,7 @@ public class SecondaryIndexTest extends CQLTester
@Test
public void testIndexOnKeyWithReverseClustering() throws Throwable
{
- createTable(" CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY ((k1, k2), v) ) WITH CLUSTERING ORDER BY (v DESC)");
+ createTable("CREATE TABLE %s (k1 int, k2 int, v int, PRIMARY KEY ((k1, k2), v) ) WITH CLUSTERING ORDER BY (v DESC)");
createIndex("CREATE INDEX ON %s (k2)");
@@ -591,4 +603,51 @@ public class SecondaryIndexTest extends CQLTester
assertInvalid("CREATE INDEX ON %s (c)");
}
+ @Test
+ public void testMultipleIndexesOnOneColumn() throws Throwable
+ {
+ String indexClassName = StubIndex.class.getName();
+ createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))");
+ // uses different options otherwise the two indexes are considered duplicates
+ createIndex(String.format("CREATE CUSTOM INDEX c_idx_1 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'a'}", indexClassName));
+ createIndex(String.format("CREATE CUSTOM INDEX c_idx_2 ON %%s(c) USING '%s' WITH OPTIONS = {'foo':'b'}", indexClassName));
+
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ CFMetaData cfm = cfs.metadata;
+ StubIndex index1 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
+ .get("c_idx_1")
+ .orElseThrow(throwAssert("index not found")));
+ StubIndex index2 = (StubIndex)cfs.indexManager.getIndex(cfm.getIndexes()
+ .get("c_idx_2")
+ .orElseThrow(throwAssert("index not found")));
+ Object[] row1a = row(0, 0, 0);
+ Object[] row1b = row(0, 0, 1);
+ Object[] row2 = row(2, 2, 2);
+ execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1a);
+ execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row1b);
+ execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", row2);
+
+ assertEquals(2, index1.rowsInserted.size());
+ assertColumnValue(0, "c", index1.rowsInserted.get(0), cfm);
+ assertColumnValue(2, "c", index1.rowsInserted.get(1), cfm);
+
+ assertEquals(2, index2.rowsInserted.size());
+ assertColumnValue(0, "c", index2.rowsInserted.get(0), cfm);
+ assertColumnValue(2, "c", index2.rowsInserted.get(1), cfm);
+
+ assertEquals(1, index1.rowsUpdated.size());
+ assertColumnValue(0, "c", index1.rowsUpdated.get(0).left, cfm);
+ assertColumnValue(1, "c", index1.rowsUpdated.get(0).right, cfm);
+
+ assertEquals(1, index2.rowsUpdated.size());
+ assertColumnValue(0, "c", index2.rowsUpdated.get(0).left, cfm);
+ assertColumnValue(1, "c", index2.rowsUpdated.get(0).right, cfm);
+ }
+
+ private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm)
+ {
+ ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
+ AbstractType<?> type = col.type;
+ assertEquals(expected, type.compose(row.getCell(col).value()));
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
index 8e1f438..e0879d2 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/CrcCheckChanceTest.java
@@ -21,6 +21,8 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.junit.Test;
+
import junit.framework.Assert;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
@@ -30,8 +32,6 @@ import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.utils.FBUtilities;
-import org.junit.Test;
-
public class CrcCheckChanceTest extends CQLTester
{
@@ -49,7 +49,7 @@ public class CrcCheckChanceTest extends CQLTester
ColumnFamilyStore cfs = Keyspace.open(CQLTester.KEYSPACE).getColumnFamilyStore(currentTable());
- ColumnFamilyStore indexCfs = cfs.indexManager.getIndexesBackedByCfs().iterator().next();
+ ColumnFamilyStore indexCfs = cfs.indexManager.getAllIndexColumnFamilyStores().iterator().next();
cfs.forceBlockingFlush();
Assert.assertEquals(0.99, cfs.metadata.params.compression.getCrcCheckChance());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 792aaa7..7b03640 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -27,17 +27,15 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.RowFilter;
-
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -115,12 +113,14 @@ public class CleanupTest
fillCF(cfs, "birthdate", LOOPS);
assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());
- SecondaryIndex index = cfs.indexManager.getIndexForColumn(cfs.metadata.getColumnDefinition(COLUMN));
+ ColumnDefinition cdef = cfs.metadata.getColumnDefinition(COLUMN);
+ String indexName = cfs.metadata.getIndexes()
+ .get(cdef)
+ .iterator().next().name;
long start = System.nanoTime();
- while (!index.isIndexBuilt(COLUMN) && System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10))
+ while (!cfs.getBuiltIndexes().contains(indexName) && System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10))
Thread.sleep(10);
- ColumnDefinition cdef = cfs.metadata.getColumnDefinition(COLUMN);
RowFilter cf = RowFilter.create();
cf.add(cdef, Operator.EQ, VALUE);
assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).filterOn("birthdate", Operator.EQ, VALUE).build()).size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index cb38e37..40093ea 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -17,33 +17,39 @@
*/
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Config.DiskFailurePolicy;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Directories.DataDirectory;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.utils.Pair;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class DirectoriesTest
{
@@ -161,12 +167,13 @@ public class DirectoriesTest
.addPartitionKey("thekey", UTF8Type.instance)
.addClusteringColumn("col", UTF8Type.instance)
.build();
- IndexMetadata indexDef = IndexMetadata.legacyIndex(PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col")),
- "idx",
- IndexMetadata.IndexType.KEYS,
- Collections.emptyMap());
+ ColumnDefinition col = PARENT_CFM.getColumnDefinition(ByteBufferUtil.bytes("col"));
+ IndexMetadata indexDef = IndexMetadata.singleColumnIndex(col,
+ "idx",
+ IndexMetadata.IndexType.KEYS,
+ Collections.emptyMap());
PARENT_CFM.indexes(PARENT_CFM.getIndexes().with(indexDef));
- CFMetaData INDEX_CFM = SecondaryIndex.newIndexMetadata(PARENT_CFM, indexDef);
+ CFMetaData INDEX_CFM = CassandraIndex.indexCfsMetadata(PARENT_CFM, indexDef);
Directories parentDirectories = new Directories(PARENT_CFM);
Directories indexDirectories = new Directories(INDEX_CFM);
// secondary index has its own directory