You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/22 22:13:17 UTC
[6/9] cassandra git commit: Merge branch cassandra-2.2 into
cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 1f0191a,0000000..1dadf20
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@@ -1,265 -1,0 +1,265 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.File;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.service.StorageService;
+
+/**
+ * A multi-threaded (by default) executor for dispatching hints.
+ *
+ * Most of dispatch is triggered by {@link HintsDispatchTrigger} running every ~10 seconds.
+ */
+final class HintsDispatchExecutor
+{
+ private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
+
+ private final File hintsDirectory;
+ private final ExecutorService executor;
+ private final AtomicBoolean isPaused;
+ private final Map<UUID, Future> scheduledDispatches;
+
+ HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused)
+ {
+ this.hintsDirectory = hintsDirectory;
+ this.isPaused = isPaused;
+
+ scheduledDispatches = new ConcurrentHashMap<>();
+ executor = new JMXEnabledThreadPoolExecutor(1,
+ maxThreads,
+ 1,
+ TimeUnit.MINUTES,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY),
+ "internal");
+ }
+
+ /*
+ * It's safe to terminate dispatch in process and to deschedule dispatch.
+ */
+ void shutdownBlocking()
+ {
+ scheduledDispatches.clear();
+ executor.shutdownNow();
+ }
+
+ boolean isScheduled(HintsStore store)
+ {
+ return scheduledDispatches.containsKey(store.hostId);
+ }
+
+ Future dispatch(HintsStore store)
+ {
+ return dispatch(store, store.hostId);
+ }
+
+ Future dispatch(HintsStore store, UUID hostId)
+ {
+ /*
+ * It is safe to perform dispatch for the same host id concurrently in two or more threads,
+ * however there is nothing to win from it - so we don't.
+ *
+ * Additionally, having just one dispatch task per host id ensures that we'll never violate our per-destination
+ * rate limit, without having to share a ratelimiter between threads.
+ *
+ * It also simplifies reasoning about dispatch sessions.
+ */
+ return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
+ }
+
+ Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
+ {
+ return executor.submit(new TransferHintsTask(catalog, hostIdSupplier));
+ }
+
+ void completeDispatchBlockingly(HintsStore store)
+ {
+ Future future = scheduledDispatches.get(store.hostId);
+ try
+ {
+ if (future != null)
+ future.get();
+ }
+ catch (ExecutionException | InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private final class TransferHintsTask implements Runnable
+ {
+ private final HintsCatalog catalog;
+
+ /*
+ * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest.
+ * We use a supplier here to be able to get a new host if the current one dies during streaming.
+ */
+ private final Supplier<UUID> hostIdSupplier;
+
+ private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
+ {
+ this.catalog = catalog;
+ this.hostIdSupplier = hostIdSupplier;
+ }
+
+ @Override
+ public void run()
+ {
+ UUID hostId = hostIdSupplier.get();
+ logger.info("Transferring all hints to {}", hostId);
+ if (transfer(hostId))
+ return;
+
+ logger.warn("Failed to transfer all hints to {}; will retry in {} seconds", hostId, 10);
+
+ try
+ {
+ TimeUnit.SECONDS.sleep(10);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ hostId = hostIdSupplier.get();
+ logger.info("Transferring all hints to {}", hostId);
+ if (!transfer(hostId))
+ {
+ logger.error("Failed to transfer all hints to {}", hostId);
+ throw new RuntimeException("Failed to transfer all hints to " + hostId);
+ }
+ }
+
+ private boolean transfer(UUID hostId)
+ {
+ catalog.stores()
+ .map(store -> new DispatchHintsTask(store, hostId))
+ .forEach(Runnable::run);
+
+ return !catalog.hasFiles();
+ }
+ }
+
+ private final class DispatchHintsTask implements Runnable
+ {
+ private final HintsStore store;
+ private final UUID hostId;
+ private final RateLimiter rateLimiter;
+
+ DispatchHintsTask(HintsStore store, UUID hostId)
+ {
+ this.store = store;
+ this.hostId = hostId;
+
+ // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
+ // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
+ // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
+ // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between
+ // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time).
+ int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
+ int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount;
+ this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
+ }
+
+ public void run()
+ {
+ try
+ {
+ dispatch();
+ }
+ finally
+ {
+ scheduledDispatches.remove(hostId);
+ }
+ }
+
+ private void dispatch()
+ {
+ while (true)
+ {
+ if (isPaused.get())
+ break;
+
+ HintsDescriptor descriptor = store.poll();
+ if (descriptor == null)
+ break;
+
+ try
+ {
+ if (!dispatch(descriptor))
+ break;
+ }
+ catch (FSReadError e)
+ {
+ logger.error("Failed to dispatch hints file {}: file is corrupted ({})", descriptor.fileName(), e);
+ store.cleanUp(descriptor);
+ store.blacklist(descriptor);
+ throw e;
+ }
+ }
+ }
+
+ /*
+ * Will return true if dispatch was successful, false if we hit a failure (destination node went down, for example).
+ */
+ private boolean dispatch(HintsDescriptor descriptor)
+ {
- logger.debug("Dispatching hints file {}", descriptor.fileName());
++ logger.trace("Dispatching hints file {}", descriptor.fileName());
+
+ File file = new File(hintsDirectory, descriptor.fileName());
+ Long offset = store.getDispatchOffset(descriptor).orElse(null);
+
+ try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused))
+ {
+ if (offset != null)
+ dispatcher.seek(offset);
+
+ if (dispatcher.dispatch())
+ {
+ if (!file.delete())
+ logger.error("Failed to delete hints file {}", descriptor.fileName());
+ store.cleanUp(descriptor);
+ logger.info("Finished hinted handoff of file {} to endpoint {}", descriptor.fileName(), hostId);
+ return true;
+ }
+ else
+ {
+ store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
+ store.offerFirst(descriptor);
+ logger.info("Finished hinted handoff of file {} to endpoint {}, partially", descriptor.fileName(), hostId);
+ return false;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 47364f6,0000000..3daf147
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -1,1002 -1,0 +1,1002 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index;
+
+import java.lang.reflect.Constructor;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.transactions.*;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
+ * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
+ * and so on.
+ *
+ * The Index interface defines a number of methods which return Callable<?>. These are primarily the
+ * management tasks for an index implementation. Most of them are currently executed in a blocking
+ * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
+ * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially
+ * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could
+ * then be defined with as void and called directly from SIM (rather than being run via the executor service).
+ * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
+ * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
+ *
+ * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
+ * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
+ * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
+ * indexes is performed on the CompactionManager.
+ *
+ * This class also provides instances of processors which listen to updates to the base table and forward to
+ * registered Indexes the info required to keep those indexes up to date.
+ * There are two variants of these processors, each with a factory method provided by SIM:
+ * IndexTransaction: deals with updates generated on the regular write path.
+ * CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
+ * Further details on their usage and lifecycles can be found in the interface definitions below.
+ *
+ * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
+ * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
+ * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
+ * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
+ * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
+ * a target replica.
+ */
+public class SecondaryIndexManager implements IndexRegistry
+{
+ private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
+
+ private Map<String, Index> indexes = Maps.newConcurrentMap();
+
+ // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
+ private static final ExecutorService asyncExecutor =
+ new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("SecondaryIndexManagement"),
+ "internal");
+
+ // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
+ private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
+
+ /**
+ * The underlying column family containing the source data for these indexes
+ */
+ public final ColumnFamilyStore baseCfs;
+
+ public SecondaryIndexManager(ColumnFamilyStore baseCfs)
+ {
+ this.baseCfs = baseCfs;
+ }
+
+
+ /**
+ * Drops and adds new indexes associated with the underlying CF
+ */
+ public void reload()
+ {
+ // figure out what needs to be added and dropped.
+ Indexes tableIndexes = baseCfs.metadata.getIndexes();
+ indexes.keySet()
+ .stream()
+ .filter(indexName -> !tableIndexes.has(indexName))
+ .forEach(this::removeIndex);
+
+ // we call add for every index definition in the collection as
+ // some may not have been created here yet, only added to schema
+ for (IndexMetadata tableIndex : tableIndexes)
+ addIndex(tableIndex);
+ }
+
+ private Future<?> reloadIndex(IndexMetadata indexDef)
+ {
+ // if the index metadata has changed, reload the index
+ IndexMetadata registered = indexes.get(indexDef.name).getIndexMetadata();
+ if (!registered.equals(indexDef))
+ {
+ Index index = indexes.remove(registered.name);
+ index.register(this);
+ return blockingExecutor.submit(index.getMetadataReloadTask(indexDef));
+ }
+
+ // otherwise, nothing to do
+ return Futures.immediateFuture(null);
+ }
+
+ private Future<?> createIndex(IndexMetadata indexDef)
+ {
+ Index index = createInstance(indexDef);
+ index.register(this);
+ final Callable<?> initialBuildTask = index.getInitializationTask();
+ return initialBuildTask == null
+ ? Futures.immediateFuture(null)
+ : asyncExecutor.submit(initialBuildTask);
+ }
+
+ /**
+ * Adds and builds a index
+ * @param indexDef the IndexMetadata describing the index
+ */
+ public synchronized Future<?> addIndex(IndexMetadata indexDef)
+ {
+ if (indexes.containsKey(indexDef.name))
+ return reloadIndex(indexDef);
+ else
+ return createIndex(indexDef);
+ }
+
+ public synchronized void removeIndex(String indexName)
+ {
+ Index index = indexes.remove(indexName);
+ if (null != index)
+ {
+ executeBlocking(index.getInvalidateTask());
+ unregisterIndex(index);
+ }
+ }
+
+
+ public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
+ {
+ if (indexes.isEmpty())
+ return Collections.emptySet();
+
+ Set<IndexMetadata> dependentIndexes = new HashSet<>();
+ for (Index index : indexes.values())
+ if (index.dependsOn(column))
+ dependentIndexes.add(index.getIndexMetadata());
+
+ return dependentIndexes;
+ }
+
+
+ /**
+ * Called when dropping a Table
+ */
+ public void markAllIndexesRemoved()
+ {
+ getBuiltIndexNames().forEach(this::markIndexRemoved);
+ }
+
+ /**
+ * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
+ * Caller must acquire and release references to the sstables used here.
+ * Note also that only this method of (re)building indexes:
+ * a) takes a set of index *names* rather than Indexers
+ * b) marks exsiting indexes removed prior to rebuilding
+ *
+ * @param sstables the data to build from
+ * @param indexNames the list of indexes to be rebuilt
+ */
+ public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
+ {
+ Set<Index> toRebuild = indexes.values().stream()
+ .filter(index -> indexNames.contains(index.getIndexName()))
+ .filter(Index::shouldBuildBlocking)
+ .collect(Collectors.toSet());
+ if (toRebuild.isEmpty())
+ {
+ logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
+ return;
+ }
+
+ toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexName()));
+
+ buildIndexesBlocking(sstables, toRebuild);
+
+ toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexName()));
+ }
+
+ public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
+ {
+ buildIndexesBlocking(sstables, indexes.values()
+ .stream()
+ .filter(Index::shouldBuildBlocking)
+ .collect(Collectors.toSet()));
+ }
+
+ // For convenience, may be called directly from Index impls
+ public void buildIndexBlocking(Index index)
+ {
+ if (index.shouldBuildBlocking())
+ {
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+ Refs<SSTableReader> sstables = viewFragment.refs)
+ {
+ buildIndexesBlocking(sstables, Collections.singleton(index));
+ markIndexBuilt(index.getIndexName());
+ }
+ }
+ }
+
+ /**
+ * Checks if the specified {@link ColumnFamilyStore} is a secondary index.
+ *
+ * @param cfs the <code>ColumnFamilyStore</code> to check.
+ * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
+ * <code>false</code> otherwise.
+ */
+ public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs)
+ {
+ return isIndexColumnFamily(cfs.name);
+ }
+
+ /**
+ * Checks if the specified {@link ColumnFamilyStore} is the one secondary index.
+ *
+ * @param cfs the name of the <code>ColumnFamilyStore</code> to check.
+ * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
+ * <code>false</code> otherwise.
+ */
+ public static boolean isIndexColumnFamily(String cfName)
+ {
+ return cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR);
+ }
+
+ /**
+ * Returns the parent of the specified {@link ColumnFamilyStore}.
+ *
+ * @param cfs the <code>ColumnFamilyStore</code>
+ * @return the parent of the specified <code>ColumnFamilyStore</code>
+ */
+ public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs)
+ {
+ String parentCfs = getParentCfsName(cfs.name);
+ return cfs.keyspace.getColumnFamilyStore(parentCfs);
+ }
+
+ /**
+ * Returns the parent name of the specified {@link ColumnFamilyStore}.
+ *
+ * @param cfName the <code>ColumnFamilyStore</code> name
+ * @return the parent name of the specified <code>ColumnFamilyStore</code>
+ */
+ public static String getParentCfsName(String cfName)
+ {
+ assert isIndexColumnFamily(cfName);
+ return StringUtils.substringBefore(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
+ }
+
+ /**
+ * Returns the index name
+ *
+ * @param cfName the <code>ColumnFamilyStore</code> name
+ * @return the index name
+ */
+ public static String getIndexName(ColumnFamilyStore cfs)
+ {
+ return getIndexName(cfs.name);
+ }
+
+ /**
+ * Returns the index name
+ *
+ * @param cfName the <code>ColumnFamilyStore</code> name
+ * @return the index name
+ */
+ public static String getIndexName(String cfName)
+ {
+ assert isIndexColumnFamily(cfName);
+ return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
+ }
+
+ private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
+ {
+ if (indexes.isEmpty())
+ return;
+
+ logger.info("Submitting index build of {} for data in {}",
+ indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")),
+ sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
+
+ SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+ indexes,
+ new ReducingKeyIterator(sstables));
+ Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+ FBUtilities.waitOnFuture(future);
+
+ flushIndexesBlocking(indexes);
+ logger.info("Index build of {} complete",
+ indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")));
+ }
+
+ private void markIndexBuilt(String indexName)
+ {
+ SystemKeyspace.setIndexBuilt(baseCfs.name, indexName);
+ }
+
+ private void markIndexRemoved(String indexName)
+ {
+ SystemKeyspace.setIndexRemoved(baseCfs.name, indexName);
+ }
+
+
+ public Index getIndexByName(String indexName)
+ {
+ return indexes.get(indexName);
+ }
+
+ private Index createInstance(IndexMetadata indexDef)
+ {
+ Index newIndex;
+ if (indexDef.isCustom())
+ {
+ assert indexDef.options != null;
+ String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
+ assert ! Strings.isNullOrEmpty(className);
+ try
+ {
+ Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
+ Constructor ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
+ newIndex = (Index)ctor.newInstance(baseCfs, indexDef);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ {
+ newIndex = CassandraIndex.newIndex(baseCfs, indexDef);
+ }
+ return newIndex;
+ }
+
+ /**
+ * Truncate all indexes
+ */
+ public void truncateAllIndexesBlocking(final long truncatedAt)
+ {
+ executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
+ }
+
+ /**
+ * Remove all indexes
+ */
+ public void invalidateAllIndexesBlocking()
+ {
+ executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
+ }
+
+ /**
+ * Perform a blocking flush all indexes
+ */
+ public void flushAllIndexesBlocking()
+ {
+ flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
+ }
+
+ /**
+ * Perform a blocking flush of selected indexes
+ */
+ public void flushIndexesBlocking(Set<Index> indexes)
+ {
+ if (indexes.isEmpty())
+ return;
+
+ List<Future<?>> wait = new ArrayList<>();
+ List<Index> nonCfsIndexes = new ArrayList<>();
+
+ // for each CFS backed index, submit a flush task which we'll wait on for completion
+ // for the non-CFS backed indexes, we'll flush those while we wait.
+ synchronized (baseCfs.getTracker())
+ {
+ indexes.forEach(index ->
+ index.getBackingTable()
+ .map(cfs -> wait.add(cfs.forceFlush()))
+ .orElseGet(() -> nonCfsIndexes.add(index)));
+ }
+
+ executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
+ FBUtilities.waitOnFutures(wait);
+ }
+
+ /**
+ * Performs a blocking flush of all custom indexes
+ */
+ public void flushAllNonCFSBackedIndexesBlocking()
+ {
+ Set<Index> customIndexers = indexes.values().stream()
+ .filter(index -> !(index.getBackingTable().isPresent()))
+ .collect(Collectors.toSet());
+ flushIndexesBlocking(customIndexers);
+ }
+
+ /**
+ * @return all indexes which are marked as built and ready to use
+ */
+ public List<String> getBuiltIndexNames()
+ {
+ Set<String> allIndexNames = new HashSet<>();
+ indexes.values().stream()
+ .map(Index::getIndexName)
+ .forEach(allIndexNames::add);
+ return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
+ }
+
+ /**
+ * @return all backing Tables used by registered indexes
+ */
+ public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores()
+ {
+ Set<ColumnFamilyStore> backingTables = new HashSet<>();
+ indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
+ return backingTables;
+ }
+
+ /**
+ * @return if there are ANY indexes registered for this table
+ */
+ public boolean hasIndexes()
+ {
+ return !indexes.isEmpty();
+ }
+
+ /**
+ * When building an index against existing data in sstables, add the given partition to the index
+ */
+ public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
+ {
+ if (!indexes.isEmpty())
+ {
+ DecoratedKey key = partition.partitionKey();
+ Set<Index.Indexer> indexers = indexes.stream()
+ .map(index -> index.indexerFor(key,
+ nowInSec,
+ opGroup,
+ IndexTransaction.Type.UPDATE))
+ .collect(Collectors.toSet());
+
+ indexers.forEach(Index.Indexer::begin);
+
+ try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+ {
+ if (!filtered.staticRow().isEmpty())
+ indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
+
+ while (filtered.hasNext())
+ {
+ Row row = filtered.next();
+ indexers.forEach(indexer -> indexer.insertRow(row));
+ }
+ }
+
+ indexers.forEach(Index.Indexer::finish);
+ }
+ }
+
+ /**
+ * Delete all data from all indexes for this partition.
+ * For when cleanup rips a partition out entirely.
+ *
+ * TODO : improve cleanup transaction to batch updates & perform them async
+ */
+ public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
+ {
+ // we need to acquire memtable lock because secondary index deletion may
+ // cause a race (see CASSANDRA-3712). This is done internally by the
+ // index transaction when it commits
+ CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(),
+ partition.columns(),
+ nowInSec);
+ indexTransaction.start();
+ indexTransaction.onPartitionDeletion(partition.partitionLevelDeletion());
+ indexTransaction.commit();
+
+ while (partition.hasNext())
+ {
+ Unfiltered unfiltered = partition.next();
+ if (unfiltered.kind() != Unfiltered.Kind.ROW)
+ continue;
+
+ indexTransaction = newCleanupTransaction(partition.partitionKey(),
+ partition.columns(),
+ nowInSec);
+ indexTransaction.start();
+ indexTransaction.onRowDelete((Row)unfiltered);
+ indexTransaction.commit();
+ }
+ }
+
+ /**
+ * Called at query time to choose which (if any) of the registered index implementations to use for a given query.
+ *
+ * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
+ * the search space the most.
+ *
+ * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
+ * specify are automatically included. Following that, the registered indexes are filtered to include only those
+ * which support the standard expressions in the RowFilter.
+ *
+ * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
+ * method.
+ *
+ * Implementation specific validation of the target expression, either custom or standard, by the selected
+ * index should be performed in the searcherFor method to ensure that we pick the right index regardless of
+ * the validity of the expression.
+ *
+ * This method is only called once during the lifecycle of a ReadCommand and the result is
+ * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
+ * ReadOrderGroup, or an estimate of the result size from an average index query.
+ *
+ * @param command ReadCommand to be executed
+ * @return an Index instance, ready to use during execution of the command, or null if none
+ * of the registered indexes can support the command.
+ */
+ public Index getBestIndexFor(ReadCommand command)
+ {
+ if (indexes.isEmpty() || command.rowFilter().isEmpty())
+ return null;
+
+ List<Index> searchableIndexes = new ArrayList<>();
+ for (RowFilter.Expression expression : command.rowFilter())
+ {
+ if (expression.isCustom())
+ {
+ RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
+ searchableIndexes.add(indexes.get(customExpression.getTargetIndex().name));
+ }
+ else
+ {
+ indexes.values().stream()
+ .filter(index -> index.supportsExpression(expression.column(), expression.operator()))
+ .forEach(searchableIndexes::add);
+ }
+ }
+
+ if (searchableIndexes.isEmpty())
+ {
- logger.debug("No applicable indexes found");
++ logger.trace("No applicable indexes found");
+ Tracing.trace("No applicable indexes found");
+ return null;
+ }
+
+ Index selected = searchableIndexes.size() == 1
+ ? searchableIndexes.get(0)
+ : searchableIndexes.stream()
+ .max((a, b) -> Longs.compare(a.getEstimatedResultRows(),
+ b.getEstimatedResultRows()))
+ .orElseThrow(() -> new AssertionError("Could not select most selective index"));
+
+ // pay for an additional threadlocal get() rather than build the strings unnecessarily
+ if (Tracing.isTracing())
+ {
+ Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
+ searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows())
+ .collect(Collectors.joining(",")),
+ selected.getIndexName());
+ }
+ return selected;
+ }
+
+ /**
+ * Called at write time to ensure that values present in the update
+ * are valid according to the rules of all registered indexes which
+ * will process it. The partition key as well as the clustering and
+ * cell values for each row in the update may be checked by index
+ * implementations
+ * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+ * @throws InvalidRequestException
+ */
+ public void validate(PartitionUpdate update) throws InvalidRequestException
+ {
+ indexes.values()
+ .stream()
+ .filter(i -> i.indexes(update.columns()))
+ .forEach(i -> i.validate(update));
+ }
+
+ /**
+ * IndexRegistry methods
+ */
+ public void registerIndex(Index index)
+ {
+ indexes.put(index.getIndexMetadata().name, index);
- logger.debug("Registered index {}", index.getIndexMetadata().name);
++ logger.trace("Registered index {}", index.getIndexMetadata().name);
+ }
+
+ public void unregisterIndex(Index index)
+ {
+ Index removed = indexes.remove(index.getIndexMetadata().name);
- logger.debug(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
++ logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
+ index.getIndexMetadata().name);
+ }
+
+ public Index getIndex(IndexMetadata metadata)
+ {
+ return indexes.get(metadata.name);
+ }
+
+ public Collection<Index> listIndexes()
+ {
+ return ImmutableSet.copyOf(indexes.values());
+ }
+
+ /**
+ * Handling of index updates.
+ * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
+ * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
+ */
+
+ /**
+ * Transaction for updates on the write path.
+ */
+ public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
+ {
+ if (!hasIndexes())
+ return UpdateTransaction.NO_OP;
+
+ // todo : optimize lookup, we can probably cache quite a bit of stuff, rather than doing
+ // a linear scan every time. Holding off that though until CASSANDRA-7771 to figure out
+ // exactly how indexes are to be identified & associated with a given partition update
+ Index.Indexer[] indexers = indexes.values().stream()
+ .filter(i -> i.indexes(update.columns()))
+ .map(i -> i.indexerFor(update.partitionKey(),
+ nowInSec,
+ opGroup,
+ IndexTransaction.Type.UPDATE))
+ .toArray(Index.Indexer[]::new);
+
+ return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
+ }
+
+ /**
+ * Transaction for use when merging rows during compaction
+ */
+ public CompactionTransaction newCompactionTransaction(DecoratedKey key,
+ PartitionColumns partitionColumns,
+ int versions,
+ int nowInSec)
+ {
+ // the check for whether there are any registered indexes is already done in CompactionIterator
+
+ Index[] interestedIndexes = indexes.values().stream()
+ .filter(i -> i.indexes(partitionColumns))
+ .toArray(Index[]::new);
+
+ return interestedIndexes.length == 0
+ ? CompactionTransaction.NO_OP
+ : new IndexGCTransaction(key, versions, nowInSec, interestedIndexes);
+ }
+
+ /**
+ * Transaction for use when removing partitions during cleanup
+ */
+ public CleanupTransaction newCleanupTransaction(DecoratedKey key,
+ PartitionColumns partitionColumns,
+ int nowInSec)
+ {
+ //
+ if (!hasIndexes())
+ return CleanupTransaction.NO_OP;
+
+ Index[] interestedIndexes = indexes.values().stream()
+ .filter(i -> i.indexes(partitionColumns))
+ .toArray(Index[]::new);
+
+ return interestedIndexes.length == 0
+ ? CleanupTransaction.NO_OP
+ : new CleanupGCTransaction(key, nowInSec, interestedIndexes);
+ }
+
+ /**
+ * A single use transaction for processing a partition update on the regular write path
+ */
+ private static final class WriteTimeTransaction implements UpdateTransaction
+ {
+ private final Index.Indexer[] indexers;
+
+ private WriteTimeTransaction(Index.Indexer...indexers)
+ {
+ // don't allow null indexers, if we don't need any use a NullUpdater object
+ for (Index.Indexer indexer : indexers) assert indexer != null;
+ this.indexers = indexers;
+ }
+
+ public void start()
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.begin();
+ }
+
+ public void onPartitionDeletion(DeletionTime deletionTime)
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.partitionDelete(deletionTime);
+ }
+
+ public void onRangeTombstone(RangeTombstone tombstone)
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.rangeTombstone(tombstone);
+ }
+
+ public void onInserted(Row row)
+ {
+ Arrays.stream(indexers).forEach(h -> h.insertRow(row));
+ }
+
+ public void onUpdated(Row existing, Row updated)
+ {
+ final Row.Builder toRemove = BTreeRow.sortedBuilder();
+ toRemove.newRow(existing.clustering());
+ final Row.Builder toInsert = BTreeRow.sortedBuilder();
+ toInsert.newRow(updated.clustering());
+ // diff listener collates the columns to be added & removed from the indexes
+ RowDiffListener diffListener = new RowDiffListener()
+ {
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ if (merged != null && merged != original)
+ toInsert.addPrimaryKeyLivenessInfo(merged);
+ }
+
+ public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
+ {
+ }
+
+ public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+ {
+ }
+
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+ {
+ if (merged != null && merged != original)
+ toInsert.addCell(merged);
+
+ if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
+ toRemove.addCell(original);
+
+ }
+ };
+ Rows.diff(diffListener, updated, existing);
+ Row oldRow = toRemove.build();
+ Row newRow = toInsert.build();
+ for (Index.Indexer indexer : indexers)
+ indexer.updateRow(oldRow, newRow);
+ }
+
+ public void commit()
+ {
+ for (Index.Indexer indexer : indexers)
+ indexer.finish();
+ }
+
+ private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
+ {
+ // If either the value or timestamp is different, then we
+ // should delete from the index. If not, then we can infer that
+ // at least one of the cells is an ExpiringColumn and that the
+ // difference is in the expiry time. In this case, we don't want to
+ // delete the old value from the index as the tombstone we insert
+ // will just hide the inserted value.
+ // Completely identical cells (including expiring columns with
+ // identical ttl & localExpirationTime) will not get this far due
+ // to the oldCell.equals(newCell) in StandardUpdater.update
+ return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
+ }
+ }
+
+ /**
+ * A single-use transaction for updating indexes for a single partition during compaction where the only
+ * operation is to merge rows
+ * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
+ * a single partition
+ */
+ private static final class IndexGCTransaction implements CompactionTransaction
+ {
+ private final DecoratedKey key;
+ private final int versions;
+ private final int nowInSec;
+ private final Index[] indexes;
+
+ private Row[] rows;
+
+ private IndexGCTransaction(DecoratedKey key,
+ int versions,
+ int nowInSec,
+ Index...indexes)
+ {
+ // don't allow null indexers, if we don't have any, use a noop transaction
+ for (Index index : indexes) assert index != null;
+
+ this.key = key;
+ this.versions = versions;
+ this.indexes = indexes;
+ this.nowInSec = nowInSec;
+ }
+
+ public void start()
+ {
+ if (versions > 0)
+ rows = new Row[versions];
+ }
+
+ public void onRowMerge(Row merged, Row...versions)
+ {
+ // Diff listener constructs rows representing deltas between the merged and original versions
+ // These delta rows are then passed to registered indexes for removal processing
+ final Row.Builder[] builders = new Row.Builder[versions.length];
+ RowDiffListener diffListener = new RowDiffListener()
+ {
+ public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
+ {
+ }
+
+ public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
+ {
+ }
+
+ public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+ {
+ }
+
+ public void onCell(int i, Clustering clustering, Cell merged, Cell original)
+ {
+ if (original != null && merged == null)
+ {
+ if (builders[i] == null)
+ {
+ builders[i] = BTreeRow.sortedBuilder();
+ builders[i].newRow(clustering);
+ }
+ builders[i].addCell(original);
+ }
+ }
+ };
+
+ Rows.diff(diffListener, merged, versions);
+
+ for(int i = 0; i < builders.length; i++)
+ if (builders[i] != null)
+ rows[i] = builders[i].build();
+ }
+
+ public void commit()
+ {
+ if (rows == null)
+ return;
+
+ try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+ {
+ for (Index index : indexes)
+ {
+ Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.COMPACTION);
+ indexer.begin();
+ for (Row row : rows)
+ if (row != null)
+ indexer.removeRow(row);
+ indexer.finish();
+ }
+ }
+ }
+ }
+
+ /**
+ * A single-use transaction for updating indexes for a single partition during cleanup, where
+ * partitions and rows are only removed
+ * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
+ * a single partition
+ */
+ private static final class CleanupGCTransaction implements CleanupTransaction
+ {
+ private final DecoratedKey key;
+ private final int nowInSec;
+ private final Index[] indexes;
+
+ private Row row;
+ private DeletionTime partitionDelete;
+
+ private CleanupGCTransaction(DecoratedKey key,
+ int nowInSec,
+ Index...indexes)
+ {
+ // don't allow null indexers, if we don't have any, use a noop transaction
+ for (Index index : indexes) assert index != null;
+
+ this.key = key;
+ this.indexes = indexes;
+ this.nowInSec = nowInSec;
+ }
+
+ public void start()
+ {
+ }
+
+ public void onPartitionDeletion(DeletionTime deletionTime)
+ {
+ partitionDelete = deletionTime;
+ }
+
+ public void onRowDelete(Row row)
+ {
+ this.row = row;
+ }
+
+ public void commit()
+ {
+ if (row == null && partitionDelete == null)
+ return;
+
+ try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
+ {
+ for (Index index : indexes)
+ {
+ Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.CLEANUP);
+ indexer.begin();
+ if (row != null)
+ indexer.removeRow(row);
+ indexer.finish();
+ }
+ }
+ }
+ }
+
+ private static void executeBlocking(Callable<?> task)
+ {
+ if (null != task)
+ FBUtilities.waitOnFuture(blockingExecutor.submit(task));
+ }
+
+ private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
+ {
+ List<Future<?>> waitFor = new ArrayList<>();
+ indexers.forEach(indexer -> {
+ Callable<?> task = function.apply(indexer);
+ if (null != task)
+ waitFor.add(blockingExecutor.submit(task));
+ });
+ FBUtilities.waitOnFutures(waitFor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index f6a10e5,0000000..93f5d61
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@@ -1,826 -1,0 +1,826 @@@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.index.internal.composites.CompositesSearcher;
+import org.apache.cassandra.index.internal.keys.KeysSearcher;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Index implementation which indexes the values for a single column in the base
+ * table and which stores its index data in a local, hidden table.
+ */
+public abstract class CassandraIndex implements Index
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
+
+ public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
+
+ public final ColumnFamilyStore baseCfs;
+ protected IndexMetadata metadata;
+ protected ColumnFamilyStore indexCfs;
+ protected ColumnDefinition indexedColumn;
+ protected CassandraIndexFunctions functions;
+
+ protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ this.baseCfs = baseCfs;
+ setMetadata(indexDef);
+ }
+
+ /**
+ * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
+ * @param indexedColumn
+ * @param operator
+ * @return
+ */
+ protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+ {
+ return operator == Operator.EQ;
+ }
+
+ /**
+ * Used to construct an the clustering for an entry in the index table based on values from the base data.
+ * The clustering columns in the index table encode the values required to retrieve the correct data from the base
+ * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
+ * Used whenever a row in the index table is written or deleted.
+ * @param partitionKey from the base data being indexed
+ * @param prefix from the base data being indexed
+ * @param path from the base data being indexed
+ * @return a clustering prefix to be used to insert into the index table
+ */
+ protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path);
+
+ /**
+ * Used at search time to convert a row in the index table into a simple struct containing the values required
+ * to retrieve the corresponding row from the base table.
+ * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
+ * @param indexEntry a row from the index table
+ * @return
+ */
+ public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
+ Row indexEntry);
+
+ /**
+ * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
+ * Used at read time to identify out of date index entries so that they can be excluded from search results and
+ * repaired
+ * @param row the current row from the primary data table
+ * @param indexValue the value we retrieved from the index
+ * @param nowInSec
+ * @return true if the index is out of date and the entry should be dropped
+ */
+ public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
+
+ /**
+ * Extract the value to be inserted into the index from the components of the base data
+ * @param partitionKey from the primary data
+ * @param clustering from the primary data
+ * @param path from the primary data
+ * @param cellValue from the primary data
+ * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
+ * key in the index table
+ */
+ protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path,
+ ByteBuffer cellValue);
+
+ public ColumnDefinition getIndexedColumn()
+ {
+ return indexedColumn;
+ }
+
+ public ClusteringComparator getIndexComparator()
+ {
+ return indexCfs.metadata.comparator;
+ }
+
+ public ColumnFamilyStore getIndexCfs()
+ {
+ return indexCfs;
+ }
+
+ public void register(IndexRegistry registry)
+ {
+ registry.registerIndex(this);
+ }
+
+ public Callable<?> getInitializationTask()
+ {
+ // if we're just linking in the index on an already-built index post-restart
+ // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
+ return isBuilt() ? null : getBuildIndexTask();
+ }
+
+ public IndexMetadata getIndexMetadata()
+ {
+ return metadata;
+ }
+
+ public String getIndexName()
+ {
+ return metadata.name;
+ }
+
+ public Optional<ColumnFamilyStore> getBackingTable()
+ {
+ return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
+ }
+
+ public Callable<Void> getBlockingFlushTask()
+ {
+ return () -> {
+ indexCfs.forceBlockingFlush();
+ return null;
+ };
+ }
+
+ public Callable<?> getInvalidateTask()
+ {
+ return () -> {
+ markRemoved();
+ invalidate();
+ return null;
+ };
+ }
+
+ public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
+ {
+ setMetadata(indexDef);
+ return () -> {
+ indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
+ indexCfs.reload();
+ return null;
+ };
+ }
+
+ private void setMetadata(IndexMetadata indexDef)
+ {
+ metadata = indexDef;
+ Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
+ functions = getFunctions(indexDef, target);
+ CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
+ indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
+ cfm.cfName,
+ cfm,
+ baseCfs.getTracker().loadsstables);
+ indexedColumn = target.left;
+ }
+
+ public Callable<?> getTruncateTask(final long truncatedAt)
+ {
+ return () -> {
+ indexCfs.discardSSTables(truncatedAt);
+ return null;
+ };
+ }
+
+ public boolean shouldBuildBlocking()
+ {
+ // built-in indexes are always included in builds initiated from SecondaryIndexManager
+ return true;
+ }
+
+ public boolean indexes(PartitionColumns columns)
+ {
+ // if we have indexes on the partition key or clustering columns, return true
+ return isPrimaryKeyIndex() || columns.contains(indexedColumn);
+ }
+
+ public boolean dependsOn(ColumnDefinition column)
+ {
+ return indexedColumn.name.equals(column.name);
+ }
+
+ public boolean supportsExpression(ColumnDefinition column, Operator operator)
+ {
+ return indexedColumn.name.equals(column.name)
+ && supportsOperator(indexedColumn, operator);
+ }
+
+ private boolean supportsExpression(RowFilter.Expression expression)
+ {
+ return supportsExpression(expression.column(), expression.operator());
+ }
+
+ public AbstractType<?> customExpressionValueType()
+ {
+ return null;
+ }
+
+ public long getEstimatedResultRows()
+ {
+ return indexCfs.getMeanColumns();
+ }
+
+ /**
+ * No post processing of query results, just return them unchanged
+ */
+ public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
+ {
+ return (partitionIterator, readCommand) -> partitionIterator;
+ }
+
+ public RowFilter getPostIndexQueryFilter(RowFilter filter)
+ {
+ return getTargetExpression(filter.getExpressions()).map(filter::without)
+ .orElse(filter);
+ }
+
+ private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
+ {
+ return expressions.stream().filter(this::supportsExpression).findFirst();
+ }
+
+ public Index.Searcher searcherFor(ReadCommand command)
+ {
+ Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
+
+ if (target.isPresent())
+ {
+ target.get().validateForIndexing();
+ switch (getIndexMetadata().kind)
+ {
+ case COMPOSITES:
+ return new CompositesSearcher(command, target.get(), this);
+ case KEYS:
+ return new KeysSearcher(command, target.get(), this);
+ default:
+ throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
+ metadata.kind,
+ metadata.name,
+ indexedColumn.name.toString()));
+ }
+ }
+
+ return null;
+
+ }
+
+ public void validate(PartitionUpdate update) throws InvalidRequestException
+ {
+ switch (indexedColumn.kind)
+ {
+ case PARTITION_KEY:
+ validatePartitionKey(update.partitionKey());
+ break;
+ case CLUSTERING:
+ validateClusterings(update);
+ break;
+ case REGULAR:
+ validateRows(update);
+ break;
+ case STATIC:
+ validateRows(Collections.singleton(update.staticRow()));
+ break;
+ }
+ }
+
+ public Indexer indexerFor(final DecoratedKey key,
+ final int nowInSec,
+ final OpOrder.Group opGroup,
+ final IndexTransaction.Type transactionType)
+ {
+ return new Indexer()
+ {
+ public void begin()
+ {
+ }
+
+ public void partitionDelete(DeletionTime deletionTime)
+ {
+ }
+
+ public void rangeTombstone(RangeTombstone tombstone)
+ {
+ }
+
+ public void insertRow(Row row)
+ {
+ if (isPrimaryKeyIndex())
+ {
+ indexPrimaryKey(row.clustering(),
+ getPrimaryKeyIndexLiveness(row),
+ row.deletion());
+ }
+ else
+ {
+ if (indexedColumn.isComplex())
+ indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+ else
+ indexCell(row.clustering(), row.getCell(indexedColumn));
+ }
+ }
+
+ public void removeRow(Row row)
+ {
+ if (isPrimaryKeyIndex())
+ indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
+
+ if (indexedColumn.isComplex())
+ removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+ else
+ removeCell(row.clustering(), row.getCell(indexedColumn));
+ }
+
+
+ public void updateRow(Row oldRow, Row newRow)
+ {
+ if (isPrimaryKeyIndex())
+ indexPrimaryKey(newRow.clustering(),
+ newRow.primaryKeyLivenessInfo(),
+ newRow.deletion());
+
+ if (indexedColumn.isComplex())
+ {
+ indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
+ removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
+ }
+ else
+ {
+ indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
+ removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
+ }
+ }
+
+ public void finish()
+ {
+ }
+
+ private void indexCells(Clustering clustering, Iterable<Cell> cells)
+ {
+ if (cells == null)
+ return;
+
+ for (Cell cell : cells)
+ indexCell(clustering, cell);
+ }
+
+ private void indexCell(Clustering clustering, Cell cell)
+ {
+ if (cell == null || !cell.isLive(nowInSec))
+ return;
+
+ insert(key.getKey(),
+ clustering,
+ cell,
+ LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
+ opGroup);
+ }
+
+ private void removeCells(Clustering clustering, Iterable<Cell> cells)
+ {
+ if (cells == null)
+ return;
+
+ for (Cell cell : cells)
+ removeCell(clustering, cell);
+ }
+
+ private void removeCell(Clustering clustering, Cell cell)
+ {
+ if (cell == null || !cell.isLive(nowInSec))
+ return;
+
+ delete(key.getKey(), clustering, cell, opGroup, nowInSec);
+ }
+
+ private void indexPrimaryKey(final Clustering clustering,
+ final LivenessInfo liveness,
+ final Row.Deletion deletion)
+ {
+ if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
+ insert(key.getKey(), clustering, null, liveness, opGroup);
+
+ if (!deletion.isLive())
+ delete(key.getKey(), clustering, deletion.time(), opGroup);
+ }
+
+ private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
+ {
+ long timestamp = row.primaryKeyLivenessInfo().timestamp();
+ int ttl = row.primaryKeyLivenessInfo().ttl();
+ for (Cell cell : row.cells())
+ {
+ long cellTimestamp = cell.timestamp();
+ if (cell.isLive(nowInSec))
+ {
+ if (cellTimestamp > timestamp)
+ {
+ timestamp = cellTimestamp;
+ ttl = cell.ttl();
+ }
+ }
+ }
+ return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
+ }
+ };
+ }
+
+ /**
+ * Specific to internal indexes, this is called by a
+ * searcher when it encounters a stale entry in the index
+ * @param indexKey the partition key in the index table
+ * @param indexClustering the clustering in the index table
+ * @param deletion deletion timestamp etc
+ * @param opGroup the operation under which to perform the deletion
+ */
+ public void deleteStaleEntry(DecoratedKey indexKey,
+ Clustering indexClustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ doDelete(indexKey, indexClustering, deletion, opGroup);
- logger.debug("Removed index entry for stale value {}", indexKey);
++ logger.trace("Removed index entry for stale value {}", indexKey);
+ }
+
+ /**
+ * Called when adding a new entry to the index
+ */
+ private void insert(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell,
+ LivenessInfo info,
+ OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ cell));
+ Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
+ PartitionUpdate upd = partitionUpdate(valueKey, row);
+ indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
- logger.debug("Inserted entry into index for value {}", valueKey);
++ logger.trace("Inserted entry into index for value {}", valueKey);
+ }
+
+ /**
+ * Called when deleting entries on non-primary key columns
+ */
+ private void delete(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell,
+ OpOrder.Group opGroup,
+ int nowInSec)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ cell));
+ doDelete(valueKey,
+ buildIndexClustering(rowKey, clustering, cell),
+ new DeletionTime(cell.timestamp(), nowInSec),
+ opGroup);
+ }
+
+ /**
+ * Called when deleting entries from indexes on primary key columns
+ */
+ private void delete(ByteBuffer rowKey,
+ Clustering clustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ null));
+ doDelete(valueKey,
+ buildIndexClustering(rowKey, clustering, null),
+ deletion,
+ opGroup);
+ }
+
+ private void doDelete(DecoratedKey indexKey,
+ Clustering indexClustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
+ PartitionUpdate upd = partitionUpdate(indexKey, row);
+ indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
- logger.debug("Removed index entry for value {}", indexKey);
++ logger.trace("Removed index entry for value {}", indexKey);
+ }
+
+ private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
+ {
+ assert indexedColumn.isPartitionKey();
+ validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
+ }
+
+ private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
+ {
+ assert indexedColumn.isClusteringColumn();
+ for (Row row : update)
+ validateIndexedValue(getIndexedValue(null, row.clustering(), null));
+ }
+
+ private void validateRows(Iterable<Row> rows)
+ {
+ assert !indexedColumn.isPrimaryKeyColumn();
+ for (Row row : rows)
+ {
+ if (indexedColumn.isComplex())
+ {
+ ComplexColumnData data = row.getComplexColumnData(indexedColumn);
+ if (data != null)
+ {
+ for (Cell cell : data)
+ {
+ validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
+ }
+ }
+ }
+ else
+ {
+ validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
+ }
+ }
+ }
+
+ private void validateIndexedValue(ByteBuffer value)
+ {
+ if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
+ throw new InvalidRequestException(String.format(
+ "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
+ value.remaining(),
+ getIndexName(),
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ indexedColumn.name.toString(),
+ FBUtilities.MAX_UNSIGNED_SHORT));
+ }
+
+ private ByteBuffer getIndexedValue(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell)
+ {
+ return getIndexedValue(rowKey,
+ clustering,
+ cell == null ? null : cell.path(),
+ cell == null ? null : cell.value()
+ );
+ }
+
+ private Clustering buildIndexClustering(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell)
+ {
+ return buildIndexClusteringPrefix(rowKey,
+ clustering,
+ cell == null ? null : cell.path()).build();
+ }
+
+ private DecoratedKey getIndexKeyFor(ByteBuffer value)
+ {
+ return indexCfs.decorateKey(value);
+ }
+
+ private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
+ {
+ return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+ }
+
+ private void invalidate()
+ {
+ // interrupt in-progress compactions
+ Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
+ CompactionManager.instance.interruptCompactionForCFs(cfss, true);
+ CompactionManager.instance.waitForCessation(cfss);
+ Keyspace.writeOrder.awaitNewBarrier();
+ indexCfs.forceBlockingFlush();
+ indexCfs.readOrdering.awaitNewBarrier();
+ indexCfs.invalidate();
+ }
+
+ private boolean isBuilt()
+ {
+ return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+ }
+
+ private void markBuilt()
+ {
+ SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+ }
+
+ private void markRemoved()
+ {
+ SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getIndexName());
+ }
+
+ private boolean isPrimaryKeyIndex()
+ {
+ return indexedColumn.isPrimaryKeyColumn();
+ }
+
+ private Callable<?> getBuildIndexTask()
+ {
+ return () -> {
+ buildBlocking();
+ return null;
+ };
+ }
+
+ private void buildBlocking()
+ {
+ baseCfs.forceBlockingFlush();
+
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+ Refs<SSTableReader> sstables = viewFragment.refs)
+ {
+ if (sstables.isEmpty())
+ {
+ logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ getIndexName());
+ markBuilt();
+ return;
+ }
+
+ logger.info("Submitting index build of {} for data in {}",
+ getIndexName(),
+ getSSTableNames(sstables));
+
+ SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+ Collections.singleton(this),
+ new ReducingKeyIterator(sstables));
+ Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+ FBUtilities.waitOnFuture(future);
+ indexCfs.forceBlockingFlush();
+ markBuilt();
+ }
+ logger.info("Index build of {} complete", getIndexName());
+ }
+
+ private static String getSSTableNames(Collection<SSTableReader> sstables)
+ {
+ return StreamSupport.stream(sstables.spliterator(), false)
+ .map(SSTableReader::toString)
+ .collect(Collectors.joining(", "));
+ }
+
+ /**
+ * Construct the CFMetadata for an index table, the clustering columns in the index table
+ * vary dependent on the kind of the indexed value.
+ * @param baseCfsMetadata
+ * @param indexMetadata
+ * @return
+ */
+ public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
+ {
+ Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
+ CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
+ ColumnDefinition indexedColumn = target.left;
+ AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
+ CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
+ baseCfsMetadata.indexColumnFamilyName(indexMetadata))
+ .withId(baseCfsMetadata.cfId)
+ .withPartitioner(new LocalPartitioner(indexedValueType))
+ .addPartitionKey(indexedColumn.name, indexedColumn.type);
+
+ builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
+ builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
+ return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
+ }
+
+ /**
+ * Factory method for new CassandraIndex instances
+ * @param baseCfs
+ * @param indexMetadata
+ * @return
+ */
+ public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
+ }
+
+ // Public because it's also used to convert index metadata into a thrift-compatible format
+ public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
+ IndexMetadata indexDef)
+ {
+ String target = indexDef.options.get("target");
+ assert target != null : String.format("No target definition found for index %s", indexDef.name);
+
+ // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
+ // if not, then it must be a simple column name and implictly its type is VALUES
+ Matcher matcher = TARGET_REGEX.matcher(target);
+ String columnName;
+ IndexTarget.Type targetType;
+ if (matcher.matches())
+ {
+ targetType = IndexTarget.Type.fromString(matcher.group(1));
+ columnName = matcher.group(2);
+ }
+ else
+ {
+ columnName = target;
+ targetType = IndexTarget.Type.VALUES;
+ }
+
+ // in the case of a quoted column name the name in the target string
+ // will be enclosed in quotes, which we need to unwrap. It may also
+ // include quote characters internally, escaped like so:
+ // abc"def -> abc""def.
+ // Because the target string is stored in a CQL compatible form, we
+ // need to un-escape any such quotes to get the actual column name
+ if (columnName.startsWith("\""))
+ {
+ columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
+ columnName = columnName.replaceAll("\"\"", "\"");
+ }
+
+ // if it's not a CQL table, we can't assume that the column name is utf8, so
+ // in that case we have to do a linear scan of the cfm's columns to get the matching one
+ if (cfm.isCQLTable())
+ return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
+ else
+ for (ColumnDefinition column : cfm.allColumns())
+ if (column.name.toString().equals(columnName))
+ return Pair.create(column, targetType);
+
+ throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
+ }
+
+ static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
+ Pair<ColumnDefinition, IndexTarget.Type> target)
+ {
+ if (indexDef.isKeys())
+ return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
+
+ ColumnDefinition indexedColumn = target.left;
+ if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
+ {
+ switch (((CollectionType)indexedColumn.type).kind)
+ {
+ case LIST:
+ return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+ case SET:
+ return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+ case MAP:
+ switch (target.right)
+ {
+ case KEYS:
+ return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+ case KEYS_AND_VALUES:
+ return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
+ case VALUES:
+ return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+ }
+ throw new AssertionError();
+ }
+ }
+
+ switch (indexedColumn.kind)
+ {
+ case CLUSTERING:
+ return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
+ case REGULAR:
+ return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
+ case PARTITION_KEY:
+ return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
+ //case COMPACT_VALUE:
+ // return new CompositesIndexOnCompactValue();
+ }
+ throw new AssertionError();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTable.java
index d66638e,b0aa89e..923ef82
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@@ -111,11 -113,9 +111,11 @@@ public abstract class SSTabl
FileUtils.deleteWithConfirm(desc.filenameFor(component));
}
- FileUtils.delete(desc.filenameFor(Component.SUMMARY));
+
+ if (components.contains(Component.SUMMARY))
+ FileUtils.delete(desc.filenameFor(Component.SUMMARY));
- logger.debug("Deleted {}", desc);
+ logger.trace("Deleted {}", desc);
return true;
}