You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:44 UTC
[13/15] cassandra git commit: New 2i API and implementations for
built in indexes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index d63a832..1d5d477 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -22,20 +22,16 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.Lists;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -43,7 +39,6 @@ import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.ClientWarn;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
@@ -261,14 +256,16 @@ public abstract class ReadCommand implements ReadQuery
: ReadResponse.createDataResponse(iterator, selection);
}
- protected SecondaryIndexSearcher getIndexSearcher(ColumnFamilyStore cfs)
+ protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace)
{
- return cfs.indexManager.getBestIndexSearcherFor(this);
+ return cfs.indexManager.getBestIndexFor(this, includeInTrace);
}
/**
* Executes this command on the local host.
*
+ * @param orderGroup the operation group spanning this command
+ *
* @return an iterator over the result of executing this command locally.
*/
@SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
@@ -278,10 +275,12 @@ public abstract class ReadCommand implements ReadQuery
long startTimeNanos = System.nanoTime();
ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
- SecondaryIndexSearcher searcher = getIndexSearcher(cfs);
+ Index index = getIndex(cfs, true);
+ Index.Searcher searcher = index == null ? null : index.searcherFor(this);
+
UnfilteredPartitionIterator resultIterator = searcher == null
? queryStorage(cfs, orderGroup)
- : searcher.search(this, orderGroup);
+ : searcher.search(orderGroup);
try
{
@@ -291,7 +290,7 @@ public abstract class ReadCommand implements ReadQuery
// no point in checking it again.
RowFilter updatedFilter = searcher == null
? rowFilter()
- : rowFilter().without(searcher.primaryClause(this));
+ : index.getPostIndexQueryFilter(rowFilter());
// TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
// we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
index 0a5bee8..44befa2 100644
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -17,7 +17,7 @@
*/
package org.apache.cassandra.db;
-import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.utils.concurrent.OpOrder;
public class ReadOrderGroup implements AutoCloseable
@@ -98,14 +98,8 @@ public class ReadOrderGroup implements AutoCloseable
private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
{
- SecondaryIndexSearcher searcher = command.getIndexSearcher(baseCfs);
- if (searcher == null)
- return null;
-
- SecondaryIndex index = searcher.highestSelectivityIndex(command.rowFilter());
- return index == null || !(index instanceof AbstractSimplePerColumnSecondaryIndex)
- ? null
- : ((AbstractSimplePerColumnSecondaryIndex)index).getIndexCfs();
+ Index index = baseCfs.indexManager.getBestIndexFor(command);
+ return index == null ? null : index.getBackingTable().orElse(null);
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 38cfed6..fb9eb48 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -17,18 +17,22 @@
*/
package org.apache.cassandra.db;
-import java.io.*;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
-import com.google.common.collect.*;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.SetMultimap;
import com.google.common.io.ByteStreams;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,25 +41,18 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.NIODataInputStream;
+import org.apache.cassandra.io.util.*;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.*;
-import org.apache.cassandra.schema.Tables;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
@@ -65,7 +62,6 @@ import org.apache.cassandra.utils.*;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
-
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
@@ -516,7 +512,14 @@ public final class SystemKeyspace
if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
return;
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
- executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
+ executeInternal(String.format(req, COMPACTION_HISTORY),
+ UUIDGen.getTimeUUID(),
+ ksname,
+ cfname,
+ ByteBufferUtil.bytes(compactedAt),
+ bytesIn,
+ bytesOut,
+ rowsMerged);
}
public static TabularData getCompactionHistory() throws OpenDataException
@@ -1030,6 +1033,16 @@ public final class SystemKeyspace
forceBlockingFlush(BUILT_INDEXES);
}
+ public static List<String> getBuiltIndexes(String keyspaceName, Set<String> indexNames)
+ {
+ List<String> names = new ArrayList<>(indexNames);
+ String req = "SELECT index_name from %s.\"%s\" WHERE table_name=? AND index_name IN ?";
+ UntypedResultSet results = executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, names);
+ return StreamSupport.stream(results.spliterator(), false)
+ .map(r -> r.getString("index_name"))
+ .collect(Collectors.toList());
+ }
+
/**
* Read the host ID from the system keyspace, creating (and storing) one if
* none exists.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index cab96fb..4aaa17a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -17,15 +17,19 @@
*/
package org.apache.cassandra.db.compaction;
-import java.util.UUID;
import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.PurgingPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.transactions.CompactionTransaction;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.metrics.CompactionMetrics;
@@ -47,6 +51,7 @@ import org.apache.cassandra.metrics.CompactionMetrics;
*/
public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator
{
+ private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
private final OperationType type;
@@ -148,29 +153,33 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes())
return null;
- // If we have a 2ndary index, we must update it with deleted/shadowed cells.
- // TODO: this should probably be done asynchronously and batched.
- final SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec);
- final RowDiffListener diffListener = new RowDiffListener()
+ Columns statics = Columns.NONE;
+ Columns regulars = Columns.NONE;
+ for (UnfilteredRowIterator iter : versions)
{
- public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
- {
- }
-
- public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
- {
- }
-
- public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+ if (iter != null)
{
+ statics = statics.mergeTo(iter.columns().statics);
+ regulars = regulars.mergeTo(iter.columns().regulars);
}
+ }
+ final PartitionColumns partitionColumns = new PartitionColumns(statics, regulars);
- public void onCell(int i, Clustering clustering, Cell merged, Cell original)
- {
- if (original != null && (merged == null || !merged.isLive(nowInSec)))
- indexer.remove(clustering, original);
- }
- };
+ // If we have a 2ndary index, we must update it with deleted/shadowed cells.
+ // we can reuse a single CleanupTransaction for the duration of a partition.
+ // Currently, it doesn't do any batching of row updates, so every merge event
+ // for a single partition results in a fresh cycle of:
+ // * Get new Indexer instances
+ // * Indexer::start
+ // * Indexer::onRowMerge (for every row being merged by the compaction)
+ // * Indexer::commit
+ // A new OpOrder.Group is opened in an ARM block wrapping the commits
+ // TODO: this should probably be done asynchronously and batched.
+ final CompactionTransaction indexTransaction =
+ controller.cfs.indexManager.newCompactionTransaction(partitionKey,
+ partitionColumns,
+ versions.size(),
+ nowInSec);
return new UnfilteredRowIterators.MergeListener()
{
@@ -180,7 +189,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
public void onMergedRows(Row merged, Columns columns, Row[] versions)
{
- Rows.diff(merged, columns, versions, diffListener);
+ indexTransaction.start();
+ indexTransaction.onRowMerge(columns, merged, versions);
+ indexTransaction.commit();
}
public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 66f9ed5..0890341 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -20,22 +20,8 @@ package org.apache.cassandra.db.compaction;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
@@ -56,33 +42,27 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndexBuilder;
-import org.apache.cassandra.db.view.MaterializedViewBuilder;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.MaterializedViewBuilder;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.MerkleTrees;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.Refs;
import static java.util.Collections.singleton;
@@ -847,7 +827,7 @@ public class CompactionManager implements CompactionManagerMBean
}
// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
- cfs.indexManager.flushIndexesBlocking();
+ cfs.indexManager.flushAllIndexesBlocking();
finished = writer.finish();
}
@@ -939,11 +919,7 @@ public class CompactionManager implements CompactionManagerMBean
cfs.invalidateCachedPartition(partition.partitionKey());
- // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
- try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
- {
- cfs.indexManager.deleteFromIndexes(partition, opGroup, nowInSec);
- }
+ cfs.indexManager.deletePartition(partition, nowInSec);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index a5212aa..bbec004 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -27,10 +27,9 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -38,7 +37,9 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import static org.apache.cassandra.cql3.statements.RequestValidations.*;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
/**
* A filter on which rows a given query should include or exclude.
@@ -91,6 +92,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
expressions.add(new ThriftExpression(metadata, name, op, value));
}
+ public List<Expression> getExpressions()
+ {
+ return expressions;
+ }
+
/**
* Filters the provided iterator so that only the row satisfying the expression of this filter
* are included in the resulting iterator.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
deleted file mode 100644
index a631b9a..0000000
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Implements a secondary index for a column family using a second column family
- * in which the row keys are indexed values, and column names are base row keys.
- */
-public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSecondaryIndex
-{
- protected ColumnFamilyStore indexCfs;
-
- // SecondaryIndex "forces" a set of ColumnDefinition. However this class (and thus it's subclass)
- // only support one def per index. So inline it in a field for 1) convenience and 2) avoid creating
- // an iterator each time we need to access it.
- // TODO: we should fix SecondaryIndex API
- protected ColumnDefinition columnDef;
-
- public void init()
- {
- assert baseCfs != null && columnDefs != null && columnDefs.size() == 1;
-
- columnDef = columnDefs.iterator().next();
-
- CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, indexMetadata, getIndexKeyComparator());
- indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
- indexedCfMetadata.cfName,
- indexedCfMetadata,
- baseCfs.getTracker().loadsstables);
- }
-
- protected AbstractType<?> getIndexKeyComparator()
- {
- return columnDef.type;
- }
-
- public ColumnDefinition indexedColumn()
- {
- return columnDef;
- }
-
- @Override
- String indexTypeForGrouping()
- {
- return "_internal_";
- }
-
- protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, Cell cell)
- {
- return makeIndexClustering(rowKey, clustering, cell == null ? null : cell.path());
- }
-
- protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, CellPath path)
- {
- return buildIndexClusteringPrefix(rowKey, clustering, path).build();
- }
-
- protected Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
- {
- return buildIndexClusteringPrefix(rowKey, bound, null).buildBound(bound.isStart(), bound.isInclusive());
- }
-
- protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path);
-
- protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, Cell cell)
- {
- return cell == null
- ? getIndexedValue(rowKey, clustering, null, null)
- : getIndexedValue(rowKey, clustering, cell.value(), cell.path());
- }
-
- protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath cellPath);
-
- public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
- {
- deleteForCleanup(rowKey, clustering, cell, opGroup, nowInSec);
- }
-
- public void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
- {
- delete(rowKey, clustering, cell.value(), cell.path(), new DeletionTime(cell.timestamp(), nowInSec), opGroup);
- }
-
- public void delete(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path, DeletionTime deletion, OpOrder.Group opGroup)
- {
- DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
-
- Row row = BTreeRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
- PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
-
- indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
- if (logger.isDebugEnabled())
- logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, upd);
- }
-
- public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup)
- {
- insert(rowKey, clustering, cell, LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), opGroup);
- }
-
- public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, LivenessInfo info, OpOrder.Group opGroup)
- {
- DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
-
- Row row = BTreeRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
- PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
-
- if (logger.isDebugEnabled())
- logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), upd);
-
- indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
- }
-
- public void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec)
- {
- // insert the new value before removing the old one, so we never have a period
- // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
- insert(rowKey, clustering, cell, opGroup);
- if (SecondaryIndexManager.shouldCleanupOldValue(oldCell, cell))
- delete(rowKey, clustering, oldCell, opGroup, nowInSec);
- }
-
- public boolean indexes(ColumnDefinition column)
- {
- return column.name.equals(columnDef.name);
- }
-
- public void removeIndex(ByteBuffer columnName)
- {
- // interrupt in-progress compactions
- Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
- CompactionManager.instance.interruptCompactionForCFs(cfss, true);
- CompactionManager.instance.waitForCessation(cfss);
-
- indexCfs.keyspace.writeOrder.awaitNewBarrier();
- indexCfs.forceBlockingFlush();
-
- indexCfs.readOrdering.awaitNewBarrier();
- indexCfs.invalidate();
- }
-
- public void forceBlockingFlush()
- {
- Future<?> wait;
- // we synchronise on the baseCfs to make sure we are ordered correctly with other flushes to the base CFS
- synchronized (baseCfs.getTracker())
- {
- wait = indexCfs.forceFlush();
- }
- FBUtilities.waitOnFuture(wait);
- }
-
- public void invalidate()
- {
- indexCfs.invalidate();
- }
-
- public void truncateBlocking(long truncatedAt)
- {
- indexCfs.discardSSTables(truncatedAt);
- }
-
- public ColumnFamilyStore getIndexCfs()
- {
- return indexCfs;
- }
-
- protected ClusteringComparator getIndexComparator()
- {
- assert indexCfs != null;
- return indexCfs.metadata.comparator;
- }
-
- public String getIndexName()
- {
- return indexCfs.name;
- }
-
- public void reload()
- {
- indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
- indexCfs.reload();
- }
-
- public long estimateResultRows()
- {
- return getIndexCfs().getMeanColumns();
- }
-
- public void validate(DecoratedKey partitionKey) throws InvalidRequestException
- {
- if (columnDef.kind == ColumnDefinition.Kind.PARTITION_KEY)
- validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null, null));
- }
-
- public void validate(Clustering clustering) throws InvalidRequestException
- {
- if (columnDef.kind == ColumnDefinition.Kind.CLUSTERING)
- validateIndexedValue(getIndexedValue(null, clustering, null, null));
- }
-
- public void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException
- {
- if (!columnDef.isPrimaryKeyColumn())
- validateIndexedValue(getIndexedValue(null, null, cellValue, path));
- }
-
- private void validateIndexedValue(ByteBuffer value)
- {
- if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
- throw new InvalidRequestException(String.format("Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
- value.remaining(), getIndexName(), baseKeyspace(), baseTable(), columnDef.name, FBUtilities.MAX_UNSIGNED_SHORT));
- }
-
- @Override
- public String toString()
- {
- return String.format("%s(%s)", baseTable(), columnDef.name);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
deleted file mode 100644
index 897aa9c..0000000
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Base class for Secondary indexes that implement a unique index per column
- *
- */
-public abstract class PerColumnSecondaryIndex extends SecondaryIndex
-{
- /**
- * Called when a column has been tombstoned or replaced.
- *
- * @param rowKey the underlying row key which is indexed
- * @param col all the column info
- */
- public abstract void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
-
- /**
- * Called when a column has been removed due to a cleanup operation.
- */
- public abstract void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
-
- /**
- * For indexes on the primary key, index the given PK.
- */
- public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
- {
- }
-
- /**
- * For indexes on the primary key, delete the given PK.
- */
- public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
- {
- }
-
- /**
- * insert a column to the index
- *
- * @param rowKey the underlying row key which is indexed
- * @param col all the column info
- */
- public abstract void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup);
-
- /**
- * update a column from the index
- *
- * @param rowKey the underlying row key which is indexed
- * @param oldCol the previous column info
- * @param col all the column info
- */
- public abstract void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec);
-
- protected boolean indexPrimaryKeyColumn()
- {
- return false;
- }
-
- public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
- {
- Clustering clustering = row.clustering();
- if (indexPrimaryKeyColumn())
- {
- // Same as in AtomicBTreePartition.maybeIndexPrimaryKeyColumn
- long timestamp = row.primaryKeyLivenessInfo().timestamp();
- int ttl = row.primaryKeyLivenessInfo().ttl();
-
- for (Cell cell : row.cells())
- {
- if (cell.isLive(nowInSec) && cell.timestamp() > timestamp)
- {
- timestamp = cell.timestamp();
- ttl = cell.ttl();
- }
- }
- maybeIndex(key.getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
- }
-
- for (Cell cell : row.cells())
- {
- if (!indexes(cell.column()))
- continue;
-
- if (cell.isLive(nowInSec))
- insert(key.getKey(), clustering, cell, opGroup);
- }
- }
-
- public String getNameForSystemKeyspace(ByteBuffer column)
- {
- return getIndexName();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
deleted file mode 100644
index 502b213..0000000
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- * Base class for Secondary indexes that implement a unique index per row
- */
-public abstract class PerRowSecondaryIndex extends SecondaryIndex
-{
- /**
- * Index the given partition.
- */
- public abstract void index(ByteBuffer key, UnfilteredRowIterator atoms);
-
- /**
- * cleans up deleted columns from cassandra cleanup compaction
- *
- * @param key
- */
- public abstract void delete(ByteBuffer key, OpOrder.Group opGroup);
-
- public String getNameForSystemKeyspace(ByteBuffer columnName)
- {
- try
- {
- return getIndexName()+ByteBufferUtil.string(columnName);
- }
- catch (CharacterCodingException e)
- {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
deleted file mode 100644
index 4302112..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-
-import com.google.common.base.Objects;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.index.composites.CompositesIndex;
-import org.apache.cassandra.db.index.keys.KeysIndex;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-/**
- * Abstract base class for different types of secondary indexes.
- *
- * Do not extend this directly, please pick from PerColumnSecondaryIndex or PerRowSecondaryIndex
- */
-public abstract class SecondaryIndex
-{
- protected static final Logger logger = LoggerFactory.getLogger(SecondaryIndex.class);
-
- public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
-
- /**
- * The name of the option used to specify that the index is on the collection keys.
- */
- public static final String INDEX_KEYS_OPTION_NAME = "index_keys";
-
- /**
- * The name of the option used to specify that the index is on the collection values.
- */
- public static final String INDEX_VALUES_OPTION_NAME = "index_values";
-
- /**
- * The name of the option used to specify that the index is on the collection (map) entries.
- */
- public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
-
- /**
- * Base CF that has many indexes
- */
- protected ColumnFamilyStore baseCfs;
-
-
- /**
- * The column definitions which this index is responsible for
- */
- protected final Set<ColumnDefinition> columnDefs = Collections.newSetFromMap(new ConcurrentHashMap<ColumnDefinition,Boolean>());
-
- protected IndexMetadata indexMetadata;
-
- /**
- * Perform any initialization work
- */
- public abstract void init();
-
- /**
- * Reload an existing index following a change to its configuration,
- * or that of the indexed column(s). Differs from init() in that we expect
- * expect new resources (such as CFS for a KEYS index) to be created by
- * init() but not here
- */
- public abstract void reload();
-
- /**
- * Validates the index_options passed in the IndexMetadata
- * @throws ConfigurationException
- */
- public abstract void validateOptions(CFMetaData baseCfm, IndexMetadata def) throws ConfigurationException;
-
- /**
- * @return The name of the index
- */
- abstract public String getIndexName();
-
- /**
- * All internal 2ndary indexes will return "_internal_" for this. Custom
- * 2ndary indexes will return their class name. This only matter for
- * SecondaryIndexManager.groupByIndexType.
- */
- String indexTypeForGrouping()
- {
- // Our internal indexes overwrite this
- return getClass().getCanonicalName();
- }
-
- /**
- * Return the unique name for this index and column
- * to be stored in the SystemKeyspace that tracks if each column is built
- *
- * @param columnName the name of the column
- * @return the unique name
- */
- abstract public String getNameForSystemKeyspace(ByteBuffer columnName);
-
- /**
- * Checks if the index for specified column is fully built
- *
- * @param columnName the column
- * @return true if the index is fully built
- */
- public boolean isIndexBuilt(ByteBuffer columnName)
- {
- return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnName));
- }
-
- public void setIndexBuilt()
- {
- for (ColumnDefinition columnDef : columnDefs)
- SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
- }
-
- public void setIndexRemoved()
- {
- for (ColumnDefinition columnDef : columnDefs)
- SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
- }
-
- /**
- * Called at query time
- * Creates a implementation specific searcher instance for this index type
- * @param columns the list of columns which belong to this index type
- * @return the secondary index search impl
- */
- protected abstract SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns);
-
- /**
- * Forces this indexes' in memory data to disk
- */
- public abstract void forceBlockingFlush();
-
- /**
- * Allow access to the underlying column family store if there is one
- * @return the underlying column family store or null
- */
- public abstract ColumnFamilyStore getIndexCfs();
-
-
- /**
- * Delete all files and references to this index
- * @param columnName the indexed column to remove
- */
- public abstract void removeIndex(ByteBuffer columnName);
-
- /**
- * Remove the index and unregisters this index's mbean if one exists
- */
- public abstract void invalidate();
-
- /**
- * Truncate all the data from the current index
- *
- * @param truncatedAt The truncation timestamp, all data before that timestamp should be rejected.
- */
- public abstract void truncateBlocking(long truncatedAt);
-
- /**
- * Builds the index using the data in the underlying CFS
- * Blocks till it's complete
- */
- protected void buildIndexBlocking()
- {
- logger.info(String.format("Submitting index build of %s for data in %s",
- getIndexName(), StringUtils.join(baseCfs.getSSTables(SSTableSet.CANONICAL), ", ")));
-
- try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)).refs)
- {
- SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
- Collections.singleton(getIndexName()),
- new ReducingKeyIterator(sstables));
- Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
- FBUtilities.waitOnFuture(future);
- forceBlockingFlush();
- setIndexBuilt();
- }
- logger.info("Index build of {} complete", getIndexName());
- }
-
-
- /**
- * Builds the index using the data in the underlying CF, non blocking
- *
- *
- * @return A future object which the caller can block on (optional)
- */
- public Future<?> buildIndexAsync()
- {
- // if we're just linking in the index to indexedColumns on an already-built index post-restart, we're done
- boolean allAreBuilt = true;
- for (ColumnDefinition cdef : columnDefs)
- {
- if (!SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(cdef.name.bytes)))
- {
- allAreBuilt = false;
- break;
- }
- }
-
- if (allAreBuilt)
- return null;
-
- // build it asynchronously; addIndex gets called by CFS open and schema update, neither of which
- // we want to block for a long period. (actual build is serialized on CompactionManager.)
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- baseCfs.forceBlockingFlush();
- buildIndexBlocking();
- }
- };
- FutureTask<?> f = new FutureTask<Object>(runnable, null);
-
- new Thread(f, "Creating index: " + getIndexName()).start();
- return f;
- }
-
- public ColumnFamilyStore getBaseCfs()
- {
- return baseCfs;
- }
-
- private void setBaseCfs(ColumnFamilyStore baseCfs)
- {
- this.baseCfs = baseCfs;
- }
-
- public Set<ColumnDefinition> getColumnDefs()
- {
- return columnDefs;
- }
-
- void setIndexMetadata(IndexMetadata indexDef)
- {
- this.indexMetadata = indexDef;
- for (ColumnIdentifier col : indexDef.columns)
- this.columnDefs.add(baseCfs.metadata.getColumnDefinition(col));
- }
-
- void addColumnDef(ColumnDefinition columnDef)
- {
- columnDefs.add(columnDef);
- }
-
- void removeColumnDef(ByteBuffer name)
- {
- Iterator<ColumnDefinition> it = columnDefs.iterator();
- while (it.hasNext())
- {
- if (it.next().name.bytes.equals(name))
- it.remove();
- }
- }
-
- /** Returns true if the index supports lookups for the given operator, false otherwise. */
- public boolean supportsOperator(Operator operator)
- {
- return operator == Operator.EQ;
- }
-
- /**
- * Returns the decoratedKey for a column value. Assumes an index CFS is present.
- * @param value column value
- * @return decorated key
- */
- public DecoratedKey getIndexKeyFor(ByteBuffer value)
- {
- return getIndexCfs().decorateKey(value);
- }
-
- /**
- * Returns true if the provided column is indexed by this secondary index.
- *
- * The default implementation checks whether the name is one the columnDef name,
- * but this should be overriden but subclass if needed.
- */
- public abstract boolean indexes(ColumnDefinition column);
-
- /**
- * This is the primary way to create a secondary index instance for a CF column.
- * It will validate the index_options before initializing.
- *
- * @param baseCfs the source of data for the Index
- * @param indexDef the meta information about this index (index_type, index_options, name, etc...)
- *
- * @return The secondary index instance for this column
- * @throws ConfigurationException
- */
- public static SecondaryIndex createInstance(ColumnFamilyStore baseCfs,
- IndexMetadata indexDef) throws ConfigurationException
- {
- SecondaryIndex index = uninitializedInstance(baseCfs.metadata, indexDef);
- index.validateOptions(baseCfs.metadata, indexDef);
- index.setBaseCfs(baseCfs);
- index.setIndexMetadata(indexDef);
-
- return index;
- }
-
- public static void validate(CFMetaData baseMetadata,
- IndexMetadata indexDef) throws ConfigurationException
- {
- SecondaryIndex index = uninitializedInstance(baseMetadata, indexDef);
- index.validateOptions(baseMetadata, indexDef);
- }
-
- private static SecondaryIndex uninitializedInstance(CFMetaData baseMetadata,
- IndexMetadata indexDef) throws ConfigurationException
- {
- if (indexDef.isKeys())
- {
- return new KeysIndex();
- }
- else if (indexDef.isComposites())
- {
- return CompositesIndex.create(indexDef, baseMetadata);
- }
- else if (indexDef.isCustom())
- {
- assert indexDef.options != null;
- String class_name = indexDef.options.get(CUSTOM_INDEX_OPTION_NAME);
- assert class_name != null;
- try
- {
- return (SecondaryIndex) Class.forName(class_name).newInstance();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- throw new AssertionError("Unknown index type: " + indexDef.name);
- }
-
- public abstract void validate(DecoratedKey partitionKey) throws InvalidRequestException;
- public abstract void validate(Clustering clustering) throws InvalidRequestException;
- public abstract void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException;
-
- public abstract long estimateResultRows();
-
- protected String baseKeyspace()
- {
- return baseCfs.metadata.ksName;
- }
-
- protected String baseTable()
- {
- return baseCfs.metadata.cfName;
- }
-
- /**
- * Create the index metadata for the index on a given column of a given table.
- */
- public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, IndexMetadata def)
- {
- return newIndexMetadata(baseMetadata, def, def.indexedColumn(baseMetadata).type);
- }
-
- /**
- * Create the index metadata for the index on a given column of a given table.
- */
- static CFMetaData newIndexMetadata(CFMetaData baseMetadata, IndexMetadata def, AbstractType<?> comparator)
- {
- assert !def.isCustom();
-
- CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
- .withId(baseMetadata.cfId)
- .withPartitioner(new LocalPartitioner(comparator))
- .addPartitionKey(def.indexedColumn(baseMetadata).name, comparator);
-
- if (def.isComposites())
- {
- CompositesIndex.addIndexClusteringColumns(builder, baseMetadata, def);
- }
- else
- {
- assert def.isKeys();
- KeysIndex.addIndexClusteringColumns(builder, baseMetadata);
- }
-
- return builder.build().reloadIndexMetadataProperties(baseMetadata);
- }
-
- @Override
- public String toString()
- {
- return Objects.toStringHelper(this).add("columnDefs", columnDefs).toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
deleted file mode 100644
index a117f6d..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.compaction.CompactionInterruptedException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-/**
- * Manages building an entire index from column family data. Runs on to compaction manager.
- */
-public class SecondaryIndexBuilder extends CompactionInfo.Holder
-{
- private final ColumnFamilyStore cfs;
- private final Set<String> idxNames;
- private final ReducingKeyIterator iter;
- private final UUID compactionId;
-
- public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<String> idxNames, ReducingKeyIterator iter)
- {
- this.cfs = cfs;
- this.idxNames = idxNames;
- this.iter = iter;
- this.compactionId = UUIDGen.getTimeUUID();
- }
-
- public CompactionInfo getCompactionInfo()
- {
- return new CompactionInfo(cfs.metadata,
- OperationType.INDEX_BUILD,
- iter.getBytesRead(),
- iter.getTotalBytes(),
- compactionId);
- }
-
- public void build()
- {
- try
- {
- while (iter.hasNext())
- {
- if (isStopRequested())
- throw new CompactionInterruptedException(getCompactionInfo());
- DecoratedKey key = iter.next();
- Keyspace.indexPartition(key, cfs, idxNames);
- }
- }
- finally
- {
- try
- {
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
deleted file mode 100644
index 996c730..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ /dev/null
@@ -1,792 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Manages all the indexes associated with a given CFS
- * Different types of indexes can be created across the same CF
- */
-public class SecondaryIndexManager
-{
- private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
-
- public static final Updater nullUpdater = new Updater()
- {
- public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion) {}
- public void insert(Clustering clustering, Cell cell) {}
- public void update(Clustering clustering, Cell oldCell, Cell cell) {}
- public void remove(Clustering clustering, Cell current) {}
-
- public void updateRowLevelIndexes() {}
- };
-
- /**
- * Organizes the indexes by column name
- */
- private final ConcurrentNavigableMap<ByteBuffer, SecondaryIndex> indexesByColumn;
-
-
- /**
- * Keeps a single instance of a SecondaryIndex for many columns when the index type
- * has isRowLevelIndex() == true
- *
- * This allows updates to happen to an entire row at once
- */
- private final ConcurrentMap<Class<? extends SecondaryIndex>, SecondaryIndex> rowLevelIndexMap;
-
-
- /**
- * Keeps all secondary index instances, either per-column or per-row
- */
- private final Set<SecondaryIndex> allIndexes;
-
-
- /**
- * The underlying column family containing the source data for these indexes
- */
- public final ColumnFamilyStore baseCfs;
-
- public SecondaryIndexManager(ColumnFamilyStore baseCfs)
- {
- indexesByColumn = new ConcurrentSkipListMap<>();
- rowLevelIndexMap = new ConcurrentHashMap<>();
- allIndexes = Collections.newSetFromMap(new ConcurrentHashMap<SecondaryIndex, Boolean>());
-
- this.baseCfs = baseCfs;
- }
-
- /**
- * Drops and adds new indexes associated with the underlying CF
- */
- public void reload()
- {
- // figure out what needs to be added and dropped.
- // future: if/when we have modifiable settings for secondary indexes,
- // they'll need to be handled here.
- Collection<ByteBuffer> indexedColumnNames = indexesByColumn.keySet();
- for (ByteBuffer indexedColumn : indexedColumnNames)
- {
- ColumnDefinition def = baseCfs.metadata.getColumnDefinition(indexedColumn);
- if (def == null || !baseCfs.metadata.getIndexes().get(def).isPresent())
- removeIndexedColumn(indexedColumn);
- }
-
- // TODO: allow all ColumnDefinition type
- for (IndexMetadata indexDef : baseCfs.metadata.getIndexes())
- {
- if (!indexedColumnNames.contains(indexDef.indexedColumn(baseCfs.metadata).name.bytes))
- addIndexedColumn(indexDef);
- }
-
- for (SecondaryIndex index : allIndexes)
- index.reload();
- }
-
- public Set<String> allIndexesNames()
- {
- Set<String> names = new HashSet<>(allIndexes.size());
- for (SecondaryIndex index : allIndexes)
- names.add(index.getIndexName());
- return names;
- }
-
- public Set<PerColumnSecondaryIndex> perColumnIndexes()
- {
- Set<PerColumnSecondaryIndex> s = new HashSet<>();
- for (SecondaryIndex index : allIndexes)
- if (index instanceof PerColumnSecondaryIndex)
- s.add((PerColumnSecondaryIndex)index);
- return s;
- }
-
- public Set<PerRowSecondaryIndex> perRowIndexes()
- {
- Set<PerRowSecondaryIndex> s = new HashSet<>();
- for (SecondaryIndex index : allIndexes)
- if (index instanceof PerRowSecondaryIndex)
- s.add((PerRowSecondaryIndex)index);
- return s;
- }
-
- /**
- * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
- * Does nothing if columns is empty.
- *
- * Caller must acquire and release references to the sstables used here.
- *
- * @param sstables the data to build from
- * @param idxNames the list of columns to index, ordered by comparator
- */
- public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, Set<String> idxNames)
- {
- idxNames = filterByColumn(idxNames);
- if (idxNames.isEmpty())
- return;
-
- logger.info(String.format("Submitting index build of %s for data in %s",
- idxNames, StringUtils.join(sstables, ", ")));
-
- SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, idxNames, new ReducingKeyIterator(sstables));
- Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
- FBUtilities.waitOnFuture(future);
-
- flushIndexesBlocking();
-
- logger.info("Index build of {} complete", idxNames);
- }
-
- public boolean indexes(ColumnDefinition column)
- {
- for (SecondaryIndex index : allIndexes)
- if (index.indexes(column))
- return true;
- return false;
- }
-
- private Set<SecondaryIndex> indexFor(ColumnDefinition column)
- {
- Set<SecondaryIndex> matching = null;
- for (SecondaryIndex index : allIndexes)
- {
- if (index.indexes(column))
- {
- if (matching == null)
- matching = new HashSet<>();
- matching.add(index);
- }
- }
- return matching == null ? Collections.<SecondaryIndex>emptySet() : matching;
- }
-
- /**
- * Removes a existing index
- * @param column the indexed column to remove
- */
- public void removeIndexedColumn(ByteBuffer column)
- {
- SecondaryIndex index = indexesByColumn.remove(column);
-
- if (index == null)
- return;
-
- // Remove this column from from row level index map as well as all indexes set
- if (index instanceof PerRowSecondaryIndex)
- {
- index.removeColumnDef(column);
-
- // If no columns left remove from row level lookup as well as all indexes set
- if (index.getColumnDefs().isEmpty())
- {
- allIndexes.remove(index);
- rowLevelIndexMap.remove(index.getClass());
- }
- }
- else
- {
- allIndexes.remove(index);
- }
-
- index.removeIndex(column);
- SystemKeyspace.setIndexRemoved(baseCfs.metadata.ksName, index.getNameForSystemKeyspace(column));
- }
-
- /**
- * Adds and builds a index for a column
- * @param indexDef the index metadata
- * @return a future which the caller can optionally block on signaling the index is built
- */
- public synchronized Future<?> addIndexedColumn(IndexMetadata indexDef)
- {
- ColumnDefinition cdef = indexDef.indexedColumn(baseCfs.metadata);
- if (indexesByColumn.containsKey(cdef.name.bytes))
- return null;
-
- SecondaryIndex index = SecondaryIndex.createInstance(baseCfs, indexDef);
-
- // Keep a single instance of the index per-cf for row level indexes
- // since we want all columns to be under the index
- if (index instanceof PerRowSecondaryIndex)
- {
- SecondaryIndex currentIndex = rowLevelIndexMap.get(index.getClass());
-
- if (currentIndex == null)
- {
- rowLevelIndexMap.put(index.getClass(), index);
- index.init();
- }
- else
- {
- index = currentIndex;
- index.setIndexMetadata(indexDef);
- logger.info("Creating new index : {}",indexDef.name);
- }
- }
- else
- {
- // TODO: We sould do better than throw a RuntimeException
- if (indexDef.isCustom() && index instanceof AbstractSimplePerColumnSecondaryIndex)
- throw new RuntimeException("Cannot use a subclass of AbstractSimplePerColumnSecondaryIndex as a CUSTOM index, as they assume they are CFS backed");
- index.init();
- }
-
- // link in indexedColumns. this means that writes will add new data to
- // the index immediately,
- // so we don't have to lock everything while we do the build. it's up to
- // the operator to wait
- // until the index is actually built before using in queries.
- indexesByColumn.put(cdef.name.bytes, index);
-
- // Add to all indexes set:
- allIndexes.add(index);
-
- // if we're just linking in the index to indexedColumns on an
- // already-built index post-restart, we're done
- if (index.isIndexBuilt(cdef.name.bytes))
- return null;
-
- return index.buildIndexAsync();
- }
-
- /**
- *
- * @param column the name of indexes column
- * @return the index
- */
- public SecondaryIndex getIndexForColumn(ColumnDefinition column)
- {
- return indexesByColumn.get(column.name.bytes);
- }
-
- /**
- * Remove the index
- */
- public void invalidate()
- {
- for (SecondaryIndex index : allIndexes)
- index.invalidate();
- }
-
- /**
- * Flush all indexes to disk
- */
- public void flushIndexesBlocking()
- {
- // despatch flushes for all CFS backed indexes
- List<Future<?>> wait = new ArrayList<>();
- synchronized (baseCfs.getTracker())
- {
- for (SecondaryIndex index : allIndexes)
- if (index.getIndexCfs() != null)
- wait.add(index.getIndexCfs().forceFlush());
- }
-
- // blockingFlush any non-CFS-backed indexes
- for (SecondaryIndex index : allIndexes)
- if (index.getIndexCfs() == null)
- index.forceBlockingFlush();
-
- // wait for the CFS-backed index flushes to complete
- FBUtilities.waitOnFutures(wait);
- }
-
- /**
- * @return all built indexes (ready to use)
- */
- public List<String> getBuiltIndexes()
- {
- List<String> indexList = new ArrayList<>();
-
- for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
- {
- SecondaryIndex index = entry.getValue();
-
- if (index.isIndexBuilt(entry.getKey()))
- indexList.add(entry.getValue().getIndexName());
- }
-
- return indexList;
- }
-
- /**
- * @return all CFS from indexes which use a backing CFS internally (KEYS)
- */
- public Set<ColumnFamilyStore> getIndexesBackedByCfs()
- {
- Set<ColumnFamilyStore> cfsList = new HashSet<>();
-
- for (SecondaryIndex index: allIndexes)
- {
- ColumnFamilyStore cfs = index.getIndexCfs();
- if (cfs != null)
- cfsList.add(cfs);
- }
-
- return cfsList;
- }
-
- /**
- * @return all indexes which do *not* use a backing CFS internally
- */
- public Set<SecondaryIndex> getIndexesNotBackedByCfs()
- {
- // we use identity map because per row indexes use same instance across many columns
- Set<SecondaryIndex> indexes = Collections.newSetFromMap(new IdentityHashMap<SecondaryIndex, Boolean>());
- for (SecondaryIndex index: allIndexes)
- if (index.getIndexCfs() == null)
- indexes.add(index);
- return indexes;
- }
-
- /**
- * @return all of the secondary indexes without distinction to the (non-)backed by secondary ColumnFamilyStore.
- */
- public Set<SecondaryIndex> getIndexes()
- {
- return allIndexes;
- }
-
- /**
- * @return if there are ANY indexes for this table..
- */
- public boolean hasIndexes()
- {
- return !indexesByColumn.isEmpty();
- }
-
- /**
- * When building an index against existing data, add the given partition to the index
- */
- public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<SecondaryIndex> allIndexes, int nowInSec)
- {
- Set<PerRowSecondaryIndex> perRowIndexes = perRowIndexes();
- Set<PerColumnSecondaryIndex> perColumnIndexes = perColumnIndexes();
-
- if (!perRowIndexes.isEmpty())
- {
- // TODO: This is passing the same partition iterator to all perRow index, which means this only
- // work if there is only one of them. We should change the API so it doesn't work directly on the
- // partition, but rather on individual rows, so we can do a single iteration on the partition in this
- // method and pass the rows to index to all indexes.
-
- // Update entire partition only once per row level index
- Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = new HashSet<>();
- for (PerRowSecondaryIndex index : perRowIndexes)
- {
- if (appliedRowLevelIndexes.add(index.getClass()))
- ((PerRowSecondaryIndex)index).index(partition.partitionKey().getKey(), partition);
- }
- }
-
- if (!perColumnIndexes.isEmpty())
- {
- DecoratedKey key = partition.partitionKey();
-
- if (!partition.staticRow().isEmpty())
- {
- for (PerColumnSecondaryIndex index : perColumnIndexes)
- index.indexRow(key, partition.staticRow(), opGroup, nowInSec);
- }
-
- try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
- {
- while (filtered.hasNext())
- {
- Row row = filtered.next();
- for (PerColumnSecondaryIndex index : perColumnIndexes)
- index.indexRow(key, row, opGroup, nowInSec);
- }
- }
- }
- }
-
- /**
- * Delete all data from all indexes for this partition. For when cleanup rips a partition out entirely.
- */
- public void deleteFromIndexes(UnfilteredRowIterator partition, OpOrder.Group opGroup, int nowInSec)
- {
- ByteBuffer key = partition.partitionKey().getKey();
-
- for (PerRowSecondaryIndex index : perRowIndexes())
- index.delete(key, opGroup);
-
- Set<PerColumnSecondaryIndex> indexes = perColumnIndexes();
-
- while (partition.hasNext())
- {
- Unfiltered unfiltered = partition.next();
- if (unfiltered.kind() != Unfiltered.Kind.ROW)
- continue;
-
- Row row = (Row) unfiltered;
- Clustering clustering = row.clustering();
- if (!row.deletion().isLive())
- for (PerColumnSecondaryIndex index : indexes)
- index.maybeDelete(key, clustering, row.deletion(), opGroup);
-
- for (Cell cell : row.cells())
- {
- for (PerColumnSecondaryIndex index : indexes)
- {
- if (!index.indexes(cell.column()))
- continue;
-
- ((PerColumnSecondaryIndex) index).deleteForCleanup(key, clustering, cell, opGroup, nowInSec);
- }
- }
- }
- }
-
- /**
- * This helper acts as a closure around the indexManager and updated data
- * to ensure that down in Memtable's ColumnFamily implementation, the index
- * can get updated.
- */
- public Updater updaterFor(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
- {
- return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty())
- ? nullUpdater
- : new StandardUpdater(update, opGroup, nowInSec);
- }
-
- /**
- * Updated closure with only the modified row key.
- */
- public Updater gcUpdaterFor(DecoratedKey key, int nowInSec)
- {
- return new GCUpdater(key, nowInSec);
- }
-
- /**
- * Get a list of IndexSearchers from the union of expression index types
- * @param command the query
- * @return the searchers needed to query the index
- */
- public List<SecondaryIndexSearcher> getIndexSearchersFor(ReadCommand command)
- {
- Map<String, Set<ColumnDefinition>> groupByIndexType = new HashMap<>();
-
- //Group columns by type
- for (RowFilter.Expression e : command.rowFilter())
- {
- SecondaryIndex index = getIndexForColumn(e.column());
-
- if (index == null || !index.supportsOperator(e.operator()))
- continue;
-
- Set<ColumnDefinition> columns = groupByIndexType.get(index.indexTypeForGrouping());
-
- if (columns == null)
- {
- columns = new HashSet<>();
- groupByIndexType.put(index.indexTypeForGrouping(), columns);
- }
-
- columns.add(e.column());
- }
-
- List<SecondaryIndexSearcher> indexSearchers = new ArrayList<>(groupByIndexType.size());
-
- //create searcher per type
- for (Set<ColumnDefinition> column : groupByIndexType.values())
- indexSearchers.add(getIndexForColumn(column.iterator().next()).createSecondaryIndexSearcher(column));
-
- return indexSearchers;
- }
-
- public SecondaryIndexSearcher getBestIndexSearcherFor(ReadCommand command)
- {
- List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersFor(command);
-
- if (indexSearchers.isEmpty())
- return null;
-
- SecondaryIndexSearcher mostSelective = null;
- long bestEstimate = Long.MAX_VALUE;
- for (SecondaryIndexSearcher searcher : indexSearchers)
- {
- SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter());
- long estimate = highestSelectivityIndex.estimateResultRows();
- if (estimate <= bestEstimate)
- {
- bestEstimate = estimate;
- mostSelective = searcher;
- }
- }
- return mostSelective;
- }
-
- /**
- * Validates an union of expression index types. It will throw an {@link InvalidRequestException} if
- * any of the expressions in the provided clause is not valid for its index implementation.
- * @param filter the filter to check
- * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
- */
- public void validateFilter(RowFilter filter) throws InvalidRequestException
- {
- for (RowFilter.Expression expression : filter)
- {
- SecondaryIndex index = getIndexForColumn(expression.column());
- if (index != null && index.supportsOperator(expression.operator()))
- expression.validateForIndexing();
- }
- }
-
- public Set<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
- {
- Set<SecondaryIndex> result = new HashSet<>();
- for (SecondaryIndex index : allIndexes)
- if (idxNames.contains(index.getIndexName()))
- result.add(index);
- return result;
- }
-
- public SecondaryIndex getIndexByName(String idxName)
- {
- for (SecondaryIndex index : allIndexes)
- if (idxName.equals(index.getIndexName()))
- return index;
-
- return null;
- }
-
- public void setIndexBuilt(Set<String> idxNames)
- {
- for (SecondaryIndex index : getIndexesByNames(idxNames))
- index.setIndexBuilt();
- }
-
- public void setIndexRemoved(Set<String> idxNames)
- {
- for (SecondaryIndex index : getIndexesByNames(idxNames))
- index.setIndexRemoved();
- }
-
- public void validate(DecoratedKey partitionKey) throws InvalidRequestException
- {
- for (SecondaryIndex index : perColumnIndexes())
- index.validate(partitionKey);
- }
-
- public void validate(Clustering clustering) throws InvalidRequestException
- {
- for (SecondaryIndex index : perColumnIndexes())
- index.validate(clustering);
- }
-
- public void validate(ColumnDefinition column, ByteBuffer value, CellPath path) throws InvalidRequestException
- {
- for (SecondaryIndex index : indexFor(column))
- index.validate(value, path);
- }
-
- static boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
- {
- // If either the value or timestamp is different, then we
- // should delete from the index. If not, then we can infer that
- // at least one of the cells is an ExpiringColumn and that the
- // difference is in the expiry time. In this case, we don't want to
- // delete the old value from the index as the tombstone we insert
- // will just hide the inserted value.
- // Completely identical cells (including expiring columns with
- // identical ttl & localExpirationTime) will not get this far due
- // to the oldCell.equals(newCell) in StandardUpdater.update
- return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
- }
-
- private Set<String> filterByColumn(Set<String> idxNames)
- {
- Set<SecondaryIndex> indexes = getIndexesByNames(idxNames);
- Set<String> filtered = new HashSet<>(idxNames.size());
- for (SecondaryIndex candidate : indexes)
- {
- for (ColumnDefinition column : baseCfs.metadata.allColumns())
- {
- if (candidate.indexes(column))
- {
- filtered.add(candidate.getIndexName());
- break;
- }
- }
- }
- return filtered;
- }
-
- public static interface Updater
- {
- /** Called when a row with the provided clustering and row infos is inserted */
- public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion);
-
- /** called when constructing the index against pre-existing data */
- public void insert(Clustering clustering, Cell cell);
-
- /** called when updating the index from a memtable */
- public void update(Clustering clustering, Cell oldCell, Cell cell);
-
- /** called when lazy-updating the index during compaction (CASSANDRA-2897) */
- public void remove(Clustering clustering, Cell current);
-
- /** called after memtable updates are complete (CASSANDRA-5397) */
- public void updateRowLevelIndexes();
- }
-
- private final class GCUpdater implements Updater
- {
- private final DecoratedKey key;
- private final int nowInSec;
-
- public GCUpdater(DecoratedKey key, int nowInSec)
- {
- this.key = key;
- this.nowInSec = nowInSec;
- }
-
- public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
- {
- throw new UnsupportedOperationException();
- }
-
- public void insert(Clustering clustering, Cell cell)
- {
- throw new UnsupportedOperationException();
- }
-
- public void update(Clustering clustering, Cell oldCell, Cell newCell)
- {
- throw new UnsupportedOperationException();
- }
-
- public void remove(Clustering clustering, Cell cell)
- {
- for (SecondaryIndex index : indexFor(cell.column()))
- {
- if (index instanceof PerColumnSecondaryIndex)
- {
- try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
- {
- ((PerColumnSecondaryIndex) index).delete(key.getKey(), clustering, cell, opGroup, nowInSec);
- }
- }
- }
- }
-
- public void updateRowLevelIndexes()
- {
- for (SecondaryIndex index : rowLevelIndexMap.values())
- ((PerRowSecondaryIndex) index).index(key.getKey(), null);
- }
- }
-
- private final class StandardUpdater implements Updater
- {
- private final PartitionUpdate update;
- private final OpOrder.Group opGroup;
- private final int nowInSec;
-
- public StandardUpdater(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
- {
- this.update = update;
- this.opGroup = opGroup;
- this.nowInSec = nowInSec;
- }
-
- public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
- {
- for (PerColumnSecondaryIndex index : perColumnIndexes())
- {
- if (timestamp != LivenessInfo.NO_TIMESTAMP)
- index.maybeIndex(update.partitionKey().getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
- if (!deletion.isLive())
- index.maybeDelete(update.partitionKey().getKey(), clustering, deletion, opGroup);
- }
- }
-
- public void insert(Clustering clustering, Cell cell)
- {
- if (!cell.isLive(nowInSec))
- return;
-
- for (SecondaryIndex index : indexFor(cell.column()))
- if (index instanceof PerColumnSecondaryIndex)
- ((PerColumnSecondaryIndex) index).insert(update.partitionKey().getKey(), clustering, cell, opGroup);
- }
-
- public void update(Clustering clustering, Cell oldCell, Cell cell)
- {
- if (oldCell.equals(cell))
- return;
-
- for (SecondaryIndex index : indexFor(cell.column()))
- {
- if (index instanceof PerColumnSecondaryIndex)
- {
- if (cell.isLive(nowInSec))
- {
- ((PerColumnSecondaryIndex) index).update(update.partitionKey().getKey(), clustering, oldCell, cell, opGroup, nowInSec);
- }
- else
- {
- // Usually we want to delete the old value from the index, except when
- // name/value/timestamp are all equal, but the columns themselves
- // are not (as is the case when overwriting expiring columns with
- // identical values and ttl) Then, we don't want to delete as the
- // tombstone will hide the new value we just inserted; see CASSANDRA-7268
- if (shouldCleanupOldValue(oldCell, cell))
- ((PerColumnSecondaryIndex) index).delete(update.partitionKey().getKey(), clustering, oldCell, opGroup, nowInSec);
- }
- }
- }
- }
-
- public void remove(Clustering clustering, Cell cell)
- {
- throw new UnsupportedOperationException();
- }
-
- public void updateRowLevelIndexes()
- {
- for (SecondaryIndex index : rowLevelIndexMap.values())
- ((PerRowSecondaryIndex) index).index(update.partitionKey().getKey(), update.unfilteredIterator());
- }
-
- }
-}