You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2015/09/22 22:13:17 UTC

[6/9] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
index 1f0191a,0000000..1dadf20
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchExecutor.java
@@@ -1,265 -1,0 +1,265 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.io.File;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.function.Supplier;
 +
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.io.FSReadError;
 +import org.apache.cassandra.service.StorageService;
 +
 +/**
 + * A multi-threaded (by default) executor for dispatching hints.
 + *
 + * Most of dispatch is triggered by {@link HintsDispatchTrigger} running every ~10 seconds.
 + */
 +final class HintsDispatchExecutor
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(HintsDispatchExecutor.class);
 +
 +    private final File hintsDirectory;
 +    private final ExecutorService executor;
 +    private final AtomicBoolean isPaused;
 +    private final Map<UUID, Future> scheduledDispatches;
 +
 +    HintsDispatchExecutor(File hintsDirectory, int maxThreads, AtomicBoolean isPaused)
 +    {
 +        this.hintsDirectory = hintsDirectory;
 +        this.isPaused = isPaused;
 +
 +        scheduledDispatches = new ConcurrentHashMap<>();
 +        executor = new JMXEnabledThreadPoolExecutor(1,
 +                                                    maxThreads,
 +                                                    1,
 +                                                    TimeUnit.MINUTES,
 +                                                    new LinkedBlockingQueue<>(),
 +                                                    new NamedThreadFactory("HintsDispatcher", Thread.MIN_PRIORITY),
 +                                                    "internal");
 +    }
 +
 +    /*
 +     * It's safe to terminate dispatch in process and to deschedule dispatch.
 +     */
 +    void shutdownBlocking()
 +    {
 +        scheduledDispatches.clear();
 +        executor.shutdownNow();
 +    }
 +
 +    boolean isScheduled(HintsStore store)
 +    {
 +        return scheduledDispatches.containsKey(store.hostId);
 +    }
 +
 +    Future dispatch(HintsStore store)
 +    {
 +        return dispatch(store, store.hostId);
 +    }
 +
 +    Future dispatch(HintsStore store, UUID hostId)
 +    {
 +        /*
 +         * It is safe to perform dispatch for the same host id concurrently in two or more threads,
 +         * however there is nothing to win from it - so we don't.
 +         *
 +         * Additionally, having just one dispatch task per host id ensures that we'll never violate our per-destination
 +         * rate limit, without having to share a ratelimiter between threads.
 +         *
 +         * It also simplifies reasoning about dispatch sessions.
 +         */
 +        return scheduledDispatches.computeIfAbsent(hostId, uuid -> executor.submit(new DispatchHintsTask(store, hostId)));
 +    }
 +
 +    Future transfer(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
 +    {
 +        return executor.submit(new TransferHintsTask(catalog, hostIdSupplier));
 +    }
 +
 +    void completeDispatchBlockingly(HintsStore store)
 +    {
 +        Future future = scheduledDispatches.get(store.hostId);
 +        try
 +        {
 +            if (future != null)
 +                future.get();
 +        }
 +        catch (ExecutionException | InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +
 +    private final class TransferHintsTask implements Runnable
 +    {
 +        private final HintsCatalog catalog;
 +
 +        /*
 +         * Supplies target hosts to stream to. Generally returns the one the DynamicSnitch thinks is closest.
 +         * We use a supplier here to be able to get a new host if the current one dies during streaming.
 +         */
 +        private final Supplier<UUID> hostIdSupplier;
 +
 +        private TransferHintsTask(HintsCatalog catalog, Supplier<UUID> hostIdSupplier)
 +        {
 +            this.catalog = catalog;
 +            this.hostIdSupplier = hostIdSupplier;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            UUID hostId = hostIdSupplier.get();
 +            logger.info("Transferring all hints to {}", hostId);
 +            if (transfer(hostId))
 +                return;
 +
 +            logger.warn("Failed to transfer all hints to {}; will retry in {} seconds", hostId, 10);
 +
 +            try
 +            {
 +                TimeUnit.SECONDS.sleep(10);
 +            }
 +            catch (InterruptedException e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +
 +            hostId = hostIdSupplier.get();
 +            logger.info("Transferring all hints to {}", hostId);
 +            if (!transfer(hostId))
 +            {
 +                logger.error("Failed to transfer all hints to {}", hostId);
 +                throw new RuntimeException("Failed to transfer all hints to " + hostId);
 +            }
 +        }
 +
 +        private boolean transfer(UUID hostId)
 +        {
 +            catalog.stores()
 +                   .map(store -> new DispatchHintsTask(store, hostId))
 +                   .forEach(Runnable::run);
 +
 +            return !catalog.hasFiles();
 +        }
 +    }
 +
 +    private final class DispatchHintsTask implements Runnable
 +    {
 +        private final HintsStore store;
 +        private final UUID hostId;
 +        private final RateLimiter rateLimiter;
 +
 +        DispatchHintsTask(HintsStore store, UUID hostId)
 +        {
 +            this.store = store;
 +            this.hostId = hostId;
 +
 +            // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
 +            // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
 +            // the goal is to bound maximum hints traffic going towards a particular node from the rest of the cluster,
 +            // not total outgoing hints traffic from this node - this is why the rate limiter is not shared between
 +            // all the dispatch tasks (as there will be at most one dispatch task for a particular host id at a time).
 +            int nodesCount = Math.max(1, StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
 +            int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB() / nodesCount;
 +            this.rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 +        }
 +
 +        public void run()
 +        {
 +            try
 +            {
 +                dispatch();
 +            }
 +            finally
 +            {
 +                scheduledDispatches.remove(hostId);
 +            }
 +        }
 +
 +        private void dispatch()
 +        {
 +            while (true)
 +            {
 +                if (isPaused.get())
 +                    break;
 +
 +                HintsDescriptor descriptor = store.poll();
 +                if (descriptor == null)
 +                    break;
 +
 +                try
 +                {
 +                    if (!dispatch(descriptor))
 +                        break;
 +                }
 +                catch (FSReadError e)
 +                {
 +                    logger.error("Failed to dispatch hints file {}: file is corrupted ({})", descriptor.fileName(), e);
 +                    store.cleanUp(descriptor);
 +                    store.blacklist(descriptor);
 +                    throw e;
 +                }
 +            }
 +        }
 +
 +        /*
 +         * Will return true if dispatch was successful, false if we hit a failure (destination node went down, for example).
 +         */
 +        private boolean dispatch(HintsDescriptor descriptor)
 +        {
-             logger.debug("Dispatching hints file {}", descriptor.fileName());
++            logger.trace("Dispatching hints file {}", descriptor.fileName());
 +
 +            File file = new File(hintsDirectory, descriptor.fileName());
 +            Long offset = store.getDispatchOffset(descriptor).orElse(null);
 +
 +            try (HintsDispatcher dispatcher = HintsDispatcher.create(file, rateLimiter, hostId, descriptor.hostId, isPaused))
 +            {
 +                if (offset != null)
 +                    dispatcher.seek(offset);
 +
 +                if (dispatcher.dispatch())
 +                {
 +                    if (!file.delete())
 +                        logger.error("Failed to delete hints file {}", descriptor.fileName());
 +                    store.cleanUp(descriptor);
 +                    logger.info("Finished hinted handoff of file {} to endpoint {}", descriptor.fileName(), hostId);
 +                    return true;
 +                }
 +                else
 +                {
 +                    store.markDispatchOffset(descriptor, dispatcher.dispatchOffset());
 +                    store.offerFirst(descriptor);
 +                    logger.info("Finished hinted handoff of file {} to endpoint {}, partially", descriptor.fileName(), hostId);
 +                    return false;
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 47364f6,0000000..3daf147
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -1,1002 -1,0 +1,1002 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index;
 +
 +import java.lang.reflect.Constructor;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import com.google.common.base.Joiner;
 +import com.google.common.base.Strings;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Maps;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.MoreExecutors;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.internal.CassandraIndex;
 +import org.apache.cassandra.index.transactions.*;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.Indexes;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +/**
 + * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
 + * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
 + * and so on.
 + *
 + * The Index interface defines a number of methods which return Callable<?>. These are primarily the
 + * management tasks for an index implementation. Most of them are currently executed in a blocking
 + * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
 + * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially
 + * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could
 + * then be defined with as void and called directly from SIM (rather than being run via the executor service).
 + * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
 + * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
 + *
 + * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
 + * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
 + * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
 + * indexes is performed on the CompactionManager.
 + *
 + * This class also provides instances of processors which listen to updates to the base table and forward to
 + * registered Indexes the info required to keep those indexes up to date.
 + * There are two variants of these processors, each with a factory method provided by SIM:
 + *      IndexTransaction: deals with updates generated on the regular write path.
 + *      CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
 + * Further details on their usage and lifecycles can be found in the interface definitions below.
 + *
 + * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
 + * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
 + * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
 + * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
 + * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
 + * a target replica.
 + */
 +public class SecondaryIndexManager implements IndexRegistry
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 +
 +    private Map<String, Index> indexes = Maps.newConcurrentMap();
 +
 +    // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
 +    private static final ExecutorService asyncExecutor =
 +        new JMXEnabledThreadPoolExecutor(1,
 +                                         StageManager.KEEPALIVE,
 +                                         TimeUnit.SECONDS,
 +                                         new LinkedBlockingQueue<>(),
 +                                         new NamedThreadFactory("SecondaryIndexManagement"),
 +                                         "internal");
 +
 +    // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
 +    private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
 +
 +    /**
 +     * The underlying column family containing the source data for these indexes
 +     */
 +    public final ColumnFamilyStore baseCfs;
 +
 +    public SecondaryIndexManager(ColumnFamilyStore baseCfs)
 +    {
 +        this.baseCfs = baseCfs;
 +    }
 +
 +
 +    /**
 +     * Drops and adds new indexes associated with the underlying CF
 +     */
 +    public void reload()
 +    {
 +        // figure out what needs to be added and dropped.
 +        Indexes tableIndexes = baseCfs.metadata.getIndexes();
 +        indexes.keySet()
 +               .stream()
 +               .filter(indexName -> !tableIndexes.has(indexName))
 +               .forEach(this::removeIndex);
 +
 +        // we call add for every index definition in the collection as
 +        // some may not have been created here yet, only added to schema
 +        for (IndexMetadata tableIndex : tableIndexes)
 +            addIndex(tableIndex);
 +    }
 +
 +    private Future<?> reloadIndex(IndexMetadata indexDef)
 +    {
 +        // if the index metadata has changed, reload the index
 +        IndexMetadata registered = indexes.get(indexDef.name).getIndexMetadata();
 +        if (!registered.equals(indexDef))
 +        {
 +            Index index = indexes.remove(registered.name);
 +            index.register(this);
 +            return blockingExecutor.submit(index.getMetadataReloadTask(indexDef));
 +        }
 +
 +        // otherwise, nothing to do
 +        return Futures.immediateFuture(null);
 +    }
 +
 +    private Future<?> createIndex(IndexMetadata indexDef)
 +    {
 +        Index index = createInstance(indexDef);
 +        index.register(this);
 +        final Callable<?> initialBuildTask = index.getInitializationTask();
 +        return initialBuildTask == null
 +               ? Futures.immediateFuture(null)
 +               : asyncExecutor.submit(initialBuildTask);
 +    }
 +
 +    /**
 +     * Adds and builds a index
 +     * @param indexDef the IndexMetadata describing the index
 +     */
 +    public synchronized Future<?> addIndex(IndexMetadata indexDef)
 +    {
 +        if (indexes.containsKey(indexDef.name))
 +            return reloadIndex(indexDef);
 +        else
 +            return createIndex(indexDef);
 +    }
 +
 +    public synchronized void removeIndex(String indexName)
 +    {
 +        Index index = indexes.remove(indexName);
 +        if (null != index)
 +        {
 +            executeBlocking(index.getInvalidateTask());
 +            unregisterIndex(index);
 +        }
 +    }
 +
 +
 +    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
 +    {
 +        if (indexes.isEmpty())
 +            return Collections.emptySet();
 +
 +        Set<IndexMetadata> dependentIndexes = new HashSet<>();
 +        for (Index index : indexes.values())
 +            if (index.dependsOn(column))
 +                dependentIndexes.add(index.getIndexMetadata());
 +
 +        return dependentIndexes;
 +    }
 +
 +
 +    /**
 +     * Called when dropping a Table
 +     */
 +    public void markAllIndexesRemoved()
 +    {
 +       getBuiltIndexNames().forEach(this::markIndexRemoved);
 +    }
 +
 +    /**
 +    * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
 +    * Caller must acquire and release references to the sstables used here.
 +    * Note also that only this method of (re)building indexes:
 +    *   a) takes a set of index *names* rather than Indexers
 +    *   b) marks exsiting indexes removed prior to rebuilding
 +    *
 +    * @param sstables the data to build from
 +    * @param indexNames the list of indexes to be rebuilt
 +    */
 +    public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
 +    {
 +        Set<Index> toRebuild = indexes.values().stream()
 +                                               .filter(index -> indexNames.contains(index.getIndexName()))
 +                                               .filter(Index::shouldBuildBlocking)
 +                                               .collect(Collectors.toSet());
 +        if (toRebuild.isEmpty())
 +        {
 +            logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
 +            return;
 +        }
 +
 +        toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexName()));
 +
 +        buildIndexesBlocking(sstables, toRebuild);
 +
 +        toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexName()));
 +    }
 +
 +    public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
 +    {
 +        buildIndexesBlocking(sstables, indexes.values()
 +                                              .stream()
 +                                              .filter(Index::shouldBuildBlocking)
 +                                              .collect(Collectors.toSet()));
 +    }
 +
 +    // For convenience, may be called directly from Index impls
 +    public void buildIndexBlocking(Index index)
 +    {
 +        if (index.shouldBuildBlocking())
 +        {
 +            try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +                 Refs<SSTableReader> sstables = viewFragment.refs)
 +            {
 +                buildIndexesBlocking(sstables, Collections.singleton(index));
 +                markIndexBuilt(index.getIndexName());
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is a secondary index.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs)
 +    {
 +        return isIndexColumnFamily(cfs.name);
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is the one secondary index.
 +     *
 +     * @param cfs the name of the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamily(String cfName)
 +    {
 +        return cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the parent of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code>
 +     * @return the parent of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs)
 +    {
 +        String parentCfs = getParentCfsName(cfs.name);
 +        return cfs.keyspace.getColumnFamilyStore(parentCfs);
 +    }
 +
 +    /**
 +     * Returns the parent name of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the parent name of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static String getParentCfsName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringBefore(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the index name
 +     */
 +    public static String getIndexName(ColumnFamilyStore cfs)
 +    {
 +        return getIndexName(cfs.name);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the index name
 +     */
 +    public static String getIndexName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        logger.info("Submitting index build of {} for data in {}",
 +                    indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")),
 +                    sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
 +
 +        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                  indexes,
 +                                                                  new ReducingKeyIterator(sstables));
 +        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +        FBUtilities.waitOnFuture(future);
 +
 +        flushIndexesBlocking(indexes);
 +        logger.info("Index build of {} complete",
 +                    indexes.stream().map(Index::getIndexName).collect(Collectors.joining(",")));
 +    }
 +
 +    private void markIndexBuilt(String indexName)
 +    {
 +        SystemKeyspace.setIndexBuilt(baseCfs.name, indexName);
 +    }
 +
 +    private void markIndexRemoved(String indexName)
 +    {
 +        SystemKeyspace.setIndexRemoved(baseCfs.name, indexName);
 +    }
 +
 +
 +    public Index getIndexByName(String indexName)
 +    {
 +        return indexes.get(indexName);
 +    }
 +
 +    private Index createInstance(IndexMetadata indexDef)
 +    {
 +        Index newIndex;
 +        if (indexDef.isCustom())
 +        {
 +            assert indexDef.options != null;
 +            String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
 +            assert ! Strings.isNullOrEmpty(className);
 +            try
 +            {
 +                Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
 +                Constructor ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
 +                newIndex = (Index)ctor.newInstance(baseCfs, indexDef);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +        else
 +        {
 +            newIndex = CassandraIndex.newIndex(baseCfs, indexDef);
 +        }
 +        return newIndex;
 +    }
 +
 +    /**
 +     * Truncate all indexes
 +     */
 +    public void truncateAllIndexesBlocking(final long truncatedAt)
 +    {
 +        executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
 +    }
 +
 +    /**
 +     * Remove all indexes
 +     */
 +    public void invalidateAllIndexesBlocking()
 +    {
 +        executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
 +    }
 +
 +    /**
 +     * Perform a blocking flush all indexes
 +     */
 +    public void flushAllIndexesBlocking()
 +    {
 +       flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
 +    }
 +
 +    /**
 +     * Perform a blocking flush of selected indexes
 +     */
 +    public void flushIndexesBlocking(Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        List<Future<?>> wait = new ArrayList<>();
 +        List<Index> nonCfsIndexes = new ArrayList<>();
 +
 +        // for each CFS backed index, submit a flush task which we'll wait on for completion
 +        // for the non-CFS backed indexes, we'll flush those while we wait.
 +        synchronized (baseCfs.getTracker())
 +        {
 +            indexes.forEach(index ->
 +                index.getBackingTable()
 +                     .map(cfs -> wait.add(cfs.forceFlush()))
 +                     .orElseGet(() -> nonCfsIndexes.add(index)));
 +        }
 +
 +        executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
 +        FBUtilities.waitOnFutures(wait);
 +    }
 +
 +    /**
 +     * Performs a blocking flush of all custom indexes
 +     */
 +    public void flushAllNonCFSBackedIndexesBlocking()
 +    {
 +        Set<Index> customIndexers = indexes.values().stream()
 +                                                    .filter(index -> !(index.getBackingTable().isPresent()))
 +                                                    .collect(Collectors.toSet());
 +        flushIndexesBlocking(customIndexers);
 +    }
 +
 +    /**
 +     * @return all indexes which are marked as built and ready to use
 +     */
 +    public List<String> getBuiltIndexNames()
 +    {
 +        Set<String> allIndexNames = new HashSet<>();
 +        indexes.values().stream()
 +                .map(Index::getIndexName)
 +                .forEach(allIndexNames::add);
 +        return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
 +    }
 +
 +    /**
 +     * @return all backing Tables used by registered indexes
 +     */
 +    public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores()
 +    {
 +        Set<ColumnFamilyStore> backingTables = new HashSet<>();
 +        indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
 +        return backingTables;
 +    }
 +
 +    /**
 +     * @return if there are ANY indexes registered for this table
 +     */
 +    public boolean hasIndexes()
 +    {
 +        return !indexes.isEmpty();
 +    }
 +
 +    /**
 +     * When building an index against existing data in sstables, add the given partition to the index
 +     */
 +    public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
 +    {
 +        if (!indexes.isEmpty())
 +        {
 +            DecoratedKey key = partition.partitionKey();
 +            Set<Index.Indexer> indexers = indexes.stream()
 +                                                 .map(index -> index.indexerFor(key,
 +                                                                                nowInSec,
 +                                                                                opGroup,
 +                                                                                IndexTransaction.Type.UPDATE))
 +                                                 .collect(Collectors.toSet());
 +
 +            indexers.forEach(Index.Indexer::begin);
 +
 +            try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
 +            {
 +                if (!filtered.staticRow().isEmpty())
 +                    indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
 +
 +                while (filtered.hasNext())
 +                {
 +                    Row row = filtered.next();
 +                    indexers.forEach(indexer -> indexer.insertRow(row));
 +                }
 +            }
 +
 +            indexers.forEach(Index.Indexer::finish);
 +        }
 +    }
 +
 +    /**
 +     * Delete all data from all indexes for this partition.
 +     * For when cleanup rips a partition out entirely.
 +     *
 +     * TODO : improve cleanup transaction to batch updates & perform them async
 +     */
 +    public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
 +    {
 +        // we need to acquire memtable lock because secondary index deletion may
 +        // cause a race (see CASSANDRA-3712). This is done internally by the
 +        // index transaction when it commits
 +        CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                                    partition.columns(),
 +                                                                    nowInSec);
 +        indexTransaction.start();
 +        indexTransaction.onPartitionDeletion(partition.partitionLevelDeletion());
 +        indexTransaction.commit();
 +
 +        while (partition.hasNext())
 +        {
 +            Unfiltered unfiltered = partition.next();
 +            if (unfiltered.kind() != Unfiltered.Kind.ROW)
 +                continue;
 +
 +            indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                     partition.columns(),
 +                                                     nowInSec);
 +            indexTransaction.start();
 +            indexTransaction.onRowDelete((Row)unfiltered);
 +            indexTransaction.commit();
 +        }
 +    }
 +
 +    /**
 +     * Called at query time to choose which (if any) of the registered index implementations to use for a given query.
 +     *
 +     * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
 +     * the search space the most.
 +     *
 +     * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
 +     * specify are automatically included. Following that, the registered indexes are filtered to include only those
 +     * which support the standard expressions in the RowFilter.
 +     *
 +     * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
 +     * method.
 +     *
 +     * Implementation specific validation of the target expression, either custom or standard, by the selected
 +     * index should be performed in the searcherFor method to ensure that we pick the right index regardless of
 +     * the validity of the expression.
 +     *
 +     * This method is only called once during the lifecycle of a ReadCommand and the result is
 +     * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
 +     * ReadOrderGroup, or an estimate of the result size from an average index query.
 +     *
 +     * @param command ReadCommand to be executed
 +     * @return an Index instance, ready to use during execution of the command, or null if none
 +     * of the registered indexes can support the command.
 +     */
 +    public Index getBestIndexFor(ReadCommand command)
 +    {
 +        if (indexes.isEmpty() || command.rowFilter().isEmpty())
 +            return null;
 +
 +        List<Index> searchableIndexes = new ArrayList<>();
 +        for (RowFilter.Expression expression : command.rowFilter())
 +        {
 +            if (expression.isCustom())
 +            {
 +                RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
 +                searchableIndexes.add(indexes.get(customExpression.getTargetIndex().name));
 +            }
 +            else
 +            {
 +                indexes.values().stream()
 +                       .filter(index -> index.supportsExpression(expression.column(), expression.operator()))
 +                       .forEach(searchableIndexes::add);
 +            }
 +        }
 +
 +        if (searchableIndexes.isEmpty())
 +        {
-             logger.debug("No applicable indexes found");
++            logger.trace("No applicable indexes found");
 +            Tracing.trace("No applicable indexes found");
 +            return null;
 +        }
 +
 +        Index selected = searchableIndexes.size() == 1
 +                         ? searchableIndexes.get(0)
 +                         : searchableIndexes.stream()
 +                                            .max((a, b) -> Longs.compare(a.getEstimatedResultRows(),
 +                                                                         b.getEstimatedResultRows()))
 +                                            .orElseThrow(() -> new AssertionError("Could not select most selective index"));
 +
 +        // pay for an additional threadlocal get() rather than build the strings unnecessarily
 +        if (Tracing.isTracing())
 +        {
 +            Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
 +                          searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows())
 +                                           .collect(Collectors.joining(",")),
 +                          selected.getIndexName());
 +        }
 +        return selected;
 +    }
 +
 +    /**
 +     * Called at write time to ensure that values present in the update
 +     * are valid according to the rules of all registered indexes which
 +     * will process it. The partition key as well as the clustering and
 +     * cell values for each row in the update may be checked by index
 +     * implementations
 +     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
 +     * @throws InvalidRequestException
 +     */
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        indexes.values()
 +               .stream()
 +               .filter(i -> i.indexes(update.columns()))
 +               .forEach(i -> i.validate(update));
 +    }
 +
 +    /**
 +     * IndexRegistry methods
 +     */
 +    public void registerIndex(Index index)
 +    {
 +        indexes.put(index.getIndexMetadata().name, index);
-         logger.debug("Registered index {}", index.getIndexMetadata().name);
++        logger.trace("Registered index {}", index.getIndexMetadata().name);
 +    }
 +
 +    public void unregisterIndex(Index index)
 +    {
 +        Index removed = indexes.remove(index.getIndexMetadata().name);
-         logger.debug(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
++        logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
 +                     index.getIndexMetadata().name);
 +    }
 +
 +    public Index getIndex(IndexMetadata metadata)
 +    {
 +        return indexes.get(metadata.name);
 +    }
 +
 +    public Collection<Index> listIndexes()
 +    {
 +        return ImmutableSet.copyOf(indexes.values());
 +    }
 +
 +    /**
 +     * Handling of index updates.
 +     * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
 +     * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
 +     */
 +
 +    /**
 +     * Transaction for updates on the write path.
 +     */
 +    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
 +    {
 +        if (!hasIndexes())
 +            return UpdateTransaction.NO_OP;
 +
 +        // todo : optimize lookup, we can probably cache quite a bit of stuff, rather than doing
 +        // a linear scan every time. Holding off that though until CASSANDRA-7771 to figure out
 +        // exactly how indexes are to be identified & associated with a given partition update
 +        Index.Indexer[] indexers = indexes.values().stream()
 +                                          .filter(i -> i.indexes(update.columns()))
 +                                          .map(i -> i.indexerFor(update.partitionKey(),
 +                                                                 nowInSec,
 +                                                                 opGroup,
 +                                                                 IndexTransaction.Type.UPDATE))
 +                                          .toArray(Index.Indexer[]::new);
 +
 +        return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
 +    }
 +
 +    /**
 +     * Transaction for use when merging rows during compaction
 +     */
 +    public CompactionTransaction newCompactionTransaction(DecoratedKey key,
 +                                                          PartitionColumns partitionColumns,
 +                                                          int versions,
 +                                                          int nowInSec)
 +    {
 +        // the check for whether there are any registered indexes is already done in CompactionIterator
 +
 +        Index[] interestedIndexes = indexes.values().stream()
 +                                           .filter(i -> i.indexes(partitionColumns))
 +                                           .toArray(Index[]::new);
 +
 +        return interestedIndexes.length == 0
 +               ? CompactionTransaction.NO_OP
 +               : new IndexGCTransaction(key, versions, nowInSec, interestedIndexes);
 +    }
 +
 +    /**
 +     * Transaction for use when removing partitions during cleanup
 +     */
 +    public CleanupTransaction newCleanupTransaction(DecoratedKey key,
 +                                                    PartitionColumns partitionColumns,
 +                                                    int nowInSec)
 +    {
 +        //
 +        if (!hasIndexes())
 +            return CleanupTransaction.NO_OP;
 +
 +        Index[] interestedIndexes = indexes.values().stream()
 +                                           .filter(i -> i.indexes(partitionColumns))
 +                                           .toArray(Index[]::new);
 +
 +        return interestedIndexes.length == 0
 +               ? CleanupTransaction.NO_OP
 +               : new CleanupGCTransaction(key, nowInSec, interestedIndexes);
 +    }
 +
 +    /**
 +     * A single use transaction for processing a partition update on the regular write path
 +     */
 +    private static final class WriteTimeTransaction implements UpdateTransaction
 +    {
 +        private final Index.Indexer[] indexers;
 +
 +        private WriteTimeTransaction(Index.Indexer...indexers)
 +        {
 +            // don't allow null indexers, if we don't need any use a NullUpdater object
 +            for (Index.Indexer indexer : indexers) assert indexer != null;
 +            this.indexers = indexers;
 +        }
 +
 +        public void start()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.begin();
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.partitionDelete(deletionTime);
 +        }
 +
 +        public void onRangeTombstone(RangeTombstone tombstone)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.rangeTombstone(tombstone);
 +        }
 +
 +        public void onInserted(Row row)
 +        {
 +            Arrays.stream(indexers).forEach(h -> h.insertRow(row));
 +        }
 +
 +        public void onUpdated(Row existing, Row updated)
 +        {
 +            final Row.Builder toRemove = BTreeRow.sortedBuilder();
 +            toRemove.newRow(existing.clustering());
 +            final Row.Builder toInsert = BTreeRow.sortedBuilder();
 +            toInsert.newRow(updated.clustering());
 +            // diff listener collates the columns to be added & removed from the indexes
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                    if (merged != null && merged != original)
 +                        toInsert.addPrimaryKeyLivenessInfo(merged);
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (merged != null && merged != original)
 +                        toInsert.addCell(merged);
 +
 +                    if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
 +                        toRemove.addCell(original);
 +
 +                }
 +            };
 +            Rows.diff(diffListener, updated, existing);
 +            Row oldRow = toRemove.build();
 +            Row newRow = toInsert.build();
 +            for (Index.Indexer indexer : indexers)
 +                indexer.updateRow(oldRow, newRow);
 +        }
 +
 +        public void commit()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.finish();
 +        }
 +
 +        private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
 +        {
 +            // If either the value or timestamp is different, then we
 +            // should delete from the index. If not, then we can infer that
 +            // at least one of the cells is an ExpiringColumn and that the
 +            // difference is in the expiry time. In this case, we don't want to
 +            // delete the old value from the index as the tombstone we insert
 +            // will just hide the inserted value.
 +            // Completely identical cells (including expiring columns with
 +            // identical ttl & localExpirationTime) will not get this far due
 +            // to the oldCell.equals(newCell) in StandardUpdater.update
 +            return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during compaction where the only
 +     * operation is to merge rows
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class IndexGCTransaction implements CompactionTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final int versions;
 +        private final int nowInSec;
 +        private final Index[] indexes;
 +
 +        private Row[] rows;
 +
 +        private IndexGCTransaction(DecoratedKey key,
 +                                   int versions,
 +                                   int nowInSec,
 +                                   Index...indexes)
 +        {
 +            // don't allow null indexers, if we don't have any, use a noop transaction
 +            for (Index index : indexes) assert index != null;
 +
 +            this.key = key;
 +            this.versions = versions;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +            if (versions > 0)
 +                rows = new Row[versions];
 +        }
 +
 +        public void onRowMerge(Row merged, Row...versions)
 +        {
 +            // Diff listener constructs rows representing deltas between the merged and original versions
 +            // These delta rows are then passed to registered indexes for removal processing
 +            final Row.Builder[] builders = new Row.Builder[versions.length];
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (original != null && merged == null)
 +                    {
 +                        if (builders[i] == null)
 +                        {
 +                            builders[i] = BTreeRow.sortedBuilder();
 +                            builders[i].newRow(clustering);
 +                        }
 +                        builders[i].addCell(original);
 +                    }
 +                }
 +            };
 +
 +            Rows.diff(diffListener, merged, versions);
 +
 +            for(int i = 0; i < builders.length; i++)
 +                if (builders[i] != null)
 +                    rows[i] = builders[i].build();
 +        }
 +
 +        public void commit()
 +        {
 +            if (rows == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.COMPACTION);
 +                    indexer.begin();
 +                    for (Row row : rows)
 +                        if (row != null)
 +                            indexer.removeRow(row);
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during cleanup, where
 +     * partitions and rows are only removed
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class CleanupGCTransaction implements CleanupTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final int nowInSec;
 +        private final Index[] indexes;
 +
 +        private Row row;
 +        private DeletionTime partitionDelete;
 +
 +        private CleanupGCTransaction(DecoratedKey key,
 +                                     int nowInSec,
 +                                     Index...indexes)
 +        {
 +            // don't allow null indexers, if we don't have any, use a noop transaction
 +            for (Index index : indexes) assert index != null;
 +
 +            this.key = key;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            partitionDelete = deletionTime;
 +        }
 +
 +        public void onRowDelete(Row row)
 +        {
 +            this.row = row;
 +        }
 +
 +        public void commit()
 +        {
 +            if (row == null && partitionDelete == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, nowInSec, opGroup, Type.CLEANUP);
 +                    indexer.begin();
 +                    if (row != null)
 +                        indexer.removeRow(row);
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    private static void executeBlocking(Callable<?> task)
 +    {
 +        if (null != task)
 +            FBUtilities.waitOnFuture(blockingExecutor.submit(task));
 +    }
 +
 +    private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
 +    {
 +        List<Future<?>> waitFor = new ArrayList<>();
 +        indexers.forEach(indexer -> {
 +            Callable<?> task = function.apply(indexer);
 +            if (null != task)
 +                waitFor.add(blockingExecutor.submit(task));
 +        });
 +        FBUtilities.waitOnFutures(waitFor);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index f6a10e5,0000000..93f5d61
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@@ -1,826 -1,0 +1,826 @@@
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.function.BiFunction;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.CollectionType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.dht.LocalPartitioner;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.internal.composites.CompositesSearcher;
 +import org.apache.cassandra.index.internal.keys.KeysSearcher;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +/**
 + * Index implementation which indexes the values for a single column in the base
 + * table and which stores its index data in a local, hidden table.
 + */
 +public abstract class CassandraIndex implements Index
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 +
 +    public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
 +
 +    public final ColumnFamilyStore baseCfs;
 +    protected IndexMetadata metadata;
 +    protected ColumnFamilyStore indexCfs;
 +    protected ColumnDefinition indexedColumn;
 +    protected CassandraIndexFunctions functions;
 +
 +    protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        this.baseCfs = baseCfs;
 +        setMetadata(indexDef);
 +    }
 +
 +    /**
 +     * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
 +     * @param indexedColumn
 +     * @param operator
 +     * @return
 +     */
 +    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
 +    {
 +        return operator == Operator.EQ;
 +    }
 +
 +    /**
 +     * Used to construct an the clustering for an entry in the index table based on values from the base data.
 +     * The clustering columns in the index table encode the values required to retrieve the correct data from the base
 +     * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
 +     * Used whenever a row in the index table is written or deleted.
 +     * @param partitionKey from the base data being indexed
 +     * @param prefix from the base data being indexed
 +     * @param path from the base data being indexed
 +     * @return a clustering prefix to be used to insert into the index table
 +     */
 +    protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                                           ClusteringPrefix prefix,
 +                                                           CellPath path);
 +
 +    /**
 +     * Used at search time to convert a row in the index table into a simple struct containing the values required
 +     * to retrieve the corresponding row from the base table.
 +     * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
 +     * @param indexEntry a row from the index table
 +     * @return
 +     */
 +    public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
 +                                           Row indexEntry);
 +
 +    /**
 +     * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
 +     * Used at read time to identify out of date index entries so that they can be excluded from search results and
 +     * repaired
 +     * @param row the current row from the primary data table
 +     * @param indexValue the value we retrieved from the index
 +     * @param nowInSec
 +     * @return true if the index is out of date and the entry should be dropped
 +     */
 +    public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
 +
 +    /**
 +     * Extract the value to be inserted into the index from the components of the base data
 +     * @param partitionKey from the primary data
 +     * @param clustering from the primary data
 +     * @param path from the primary data
 +     * @param cellValue from the primary data
 +     * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
 +     * key in the index table
 +     */
 +    protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                                  Clustering clustering,
 +                                                  CellPath path,
 +                                                  ByteBuffer cellValue);
 +
 +    public ColumnDefinition getIndexedColumn()
 +    {
 +        return indexedColumn;
 +    }
 +
 +    public ClusteringComparator getIndexComparator()
 +    {
 +        return indexCfs.metadata.comparator;
 +    }
 +
 +    public ColumnFamilyStore getIndexCfs()
 +    {
 +        return indexCfs;
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
 +        // if we're just linking in the index on an already-built index post-restart
 +        // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
 +        return isBuilt() ? null : getBuildIndexTask();
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public String getIndexName()
 +    {
 +        return metadata.name;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
 +    }
 +
 +    public Callable<Void> getBlockingFlushTask()
 +    {
 +        return () -> {
 +            indexCfs.forceBlockingFlush();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return () -> {
 +            markRemoved();
 +            invalidate();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
 +    {
 +        setMetadata(indexDef);
 +        return () -> {
 +            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
 +            indexCfs.reload();
 +            return null;
 +        };
 +    }
 +
 +    private void setMetadata(IndexMetadata indexDef)
 +    {
 +        metadata = indexDef;
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
 +        functions = getFunctions(indexDef, target);
 +        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
 +        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
 +                                                             cfm.cfName,
 +                                                             cfm,
 +                                                             baseCfs.getTracker().loadsstables);
 +        indexedColumn = target.left;
 +    }
 +
 +    public Callable<?> getTruncateTask(final long truncatedAt)
 +    {
 +        return () -> {
 +            indexCfs.discardSSTables(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        // built-in indexes are always included in builds initiated from SecondaryIndexManager
 +        return true;
 +    }
 +
 +    public boolean indexes(PartitionColumns columns)
 +    {
 +        // if we have indexes on the partition key or clustering columns, return true
 +        return isPrimaryKeyIndex() || columns.contains(indexedColumn);
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return indexedColumn.name.equals(column.name);
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return indexedColumn.name.equals(column.name)
 +               && supportsOperator(indexedColumn, operator);
 +    }
 +
 +    private boolean supportsExpression(RowFilter.Expression expression)
 +    {
 +        return supportsExpression(expression.column(), expression.operator());
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        return indexCfs.getMeanColumns();
 +    }
 +
 +    /**
 +     * No post processing of query results, just return them unchanged
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return getTargetExpression(filter.getExpressions()).map(filter::without)
 +                                                           .orElse(filter);
 +    }
 +
 +    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
 +    {
 +        return expressions.stream().filter(this::supportsExpression).findFirst();
 +    }
 +
 +    public Index.Searcher searcherFor(ReadCommand command)
 +    {
 +        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
 +
 +        if (target.isPresent())
 +        {
 +            target.get().validateForIndexing();
 +            switch (getIndexMetadata().kind)
 +            {
 +                case COMPOSITES:
 +                    return new CompositesSearcher(command, target.get(), this);
 +                case KEYS:
 +                    return new KeysSearcher(command, target.get(), this);
 +                default:
 +                    throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
 +                                                                  metadata.kind,
 +                                                                  metadata.name,
 +                                                                  indexedColumn.name.toString()));
 +            }
 +        }
 +
 +        return null;
 +
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        switch (indexedColumn.kind)
 +        {
 +            case PARTITION_KEY:
 +                validatePartitionKey(update.partitionKey());
 +                break;
 +            case CLUSTERING:
 +                validateClusterings(update);
 +                break;
 +            case REGULAR:
 +                validateRows(update);
 +                break;
 +            case STATIC:
 +                validateRows(Collections.singleton(update.staticRow()));
 +                break;
 +        }
 +    }
 +
 +    public Indexer indexerFor(final DecoratedKey key,
 +                              final int nowInSec,
 +                              final OpOrder.Group opGroup,
 +                              final IndexTransaction.Type transactionType)
 +    {
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {
 +            }
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {
 +            }
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {
 +            }
 +
 +            public void insertRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                {
 +                    indexPrimaryKey(row.clustering(),
 +                                    getPrimaryKeyIndexLiveness(row),
 +                                    row.deletion());
 +                }
 +                else
 +                {
 +                    if (indexedColumn.isComplex())
 +                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                    else
 +                        indexCell(row.clustering(), row.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void removeRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                else
 +                    removeCell(row.clustering(), row.getCell(indexedColumn));
 +            }
 +
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(newRow.clustering(),
 +                                    newRow.primaryKeyLivenessInfo(),
 +                                    newRow.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                {
 +                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
 +                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
 +                }
 +                else
 +                {
 +                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
 +                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void finish()
 +            {
 +            }
 +
 +            private void indexCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    indexCell(clustering, cell);
 +            }
 +
 +            private void indexCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                insert(key.getKey(),
 +                       clustering,
 +                       cell,
 +                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
 +                       opGroup);
 +            }
 +
 +            private void removeCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    removeCell(clustering, cell);
 +            }
 +
 +            private void removeCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
 +            }
 +
 +            private void indexPrimaryKey(final Clustering clustering,
 +                                         final LivenessInfo liveness,
 +                                         final Row.Deletion deletion)
 +            {
 +                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
 +                    insert(key.getKey(), clustering, null, liveness, opGroup);
 +
 +                if (!deletion.isLive())
 +                    delete(key.getKey(), clustering, deletion.time(), opGroup);
 +            }
 +
 +            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
 +            {
 +                long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +                int ttl = row.primaryKeyLivenessInfo().ttl();
 +                for (Cell cell : row.cells())
 +                {
 +                    long cellTimestamp = cell.timestamp();
 +                    if (cell.isLive(nowInSec))
 +                    {
 +                        if (cellTimestamp > timestamp)
 +                        {
 +                            timestamp = cellTimestamp;
 +                            ttl = cell.ttl();
 +                        }
 +                    }
 +                }
 +                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Specific to internal indexes, this is called by a
 +     * searcher when it encounters a stale entry in the index
 +     * @param indexKey the partition key in the index table
 +     * @param indexClustering the clustering in the index table
 +     * @param deletion deletion timestamp etc
 +     * @param opGroup the operation under which to perform the deletion
 +     */
 +    public void deleteStaleEntry(DecoratedKey indexKey,
 +                                 Clustering indexClustering,
 +                                 DeletionTime deletion,
 +                                 OpOrder.Group opGroup)
 +    {
 +        doDelete(indexKey, indexClustering, deletion, opGroup);
-         logger.debug("Removed index entry for stale value {}", indexKey);
++        logger.trace("Removed index entry for stale value {}", indexKey);
 +    }
 +
 +    /**
 +     * Called when adding a new entry to the index
 +     */
 +    private void insert(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        LivenessInfo info,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
 +        PartitionUpdate upd = partitionUpdate(valueKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
-         logger.debug("Inserted entry into index for value {}", valueKey);
++        logger.trace("Inserted entry into index for value {}", valueKey);
 +    }
 +
 +    /**
 +     * Called when deleting entries on non-primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        OpOrder.Group opGroup,
 +                        int nowInSec)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, cell),
 +                 new DeletionTime(cell.timestamp(), nowInSec),
 +                 opGroup);
 +    }
 +
 +    /**
 +     * Called when deleting entries from indexes on primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        DeletionTime deletion,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               null));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, null),
 +                 deletion,
 +                 opGroup);
 +    }
 +
 +    private void doDelete(DecoratedKey indexKey,
 +                          Clustering indexClustering,
 +                          DeletionTime deletion,
 +                          OpOrder.Group opGroup)
 +    {
 +        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
 +        PartitionUpdate upd = partitionUpdate(indexKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
-         logger.debug("Removed index entry for value {}", indexKey);
++        logger.trace("Removed index entry for value {}", indexKey);
 +    }
 +
 +    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isPartitionKey();
 +        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
 +    }
 +
 +    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isClusteringColumn();
 +        for (Row row : update)
 +            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
 +    }
 +
 +    private void validateRows(Iterable<Row> rows)
 +    {
 +        assert !indexedColumn.isPrimaryKeyColumn();
 +        for (Row row : rows)
 +        {
 +            if (indexedColumn.isComplex())
 +            {
 +                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
 +                if (data != null)
 +                {
 +                    for (Cell cell : data)
 +                    {
 +                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
 +                    }
 +                }
 +            }
 +            else
 +            {
 +                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
 +            }
 +        }
 +    }
 +
 +    private void validateIndexedValue(ByteBuffer value)
 +    {
 +        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
 +            throw new InvalidRequestException(String.format(
 +                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
 +                                                           value.remaining(),
 +                                                           getIndexName(),
 +                                                           baseCfs.metadata.ksName,
 +                                                           baseCfs.metadata.cfName,
 +                                                           indexedColumn.name.toString(),
 +                                                           FBUtilities.MAX_UNSIGNED_SHORT));
 +    }
 +
 +    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
 +                                       Clustering clustering,
 +                                       Cell cell)
 +    {
 +        return getIndexedValue(rowKey,
 +                               clustering,
 +                               cell == null ? null : cell.path(),
 +                               cell == null ? null : cell.value()
 +        );
 +    }
 +
 +    private Clustering buildIndexClustering(ByteBuffer rowKey,
 +                                            Clustering clustering,
 +                                            Cell cell)
 +    {
 +        return buildIndexClusteringPrefix(rowKey,
 +                                          clustering,
 +                                          cell == null ? null : cell.path()).build();
 +    }
 +
 +    private DecoratedKey getIndexKeyFor(ByteBuffer value)
 +    {
 +        return indexCfs.decorateKey(value);
 +    }
 +
 +    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
 +    {
 +        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 +    }
 +
 +    private void invalidate()
 +    {
 +        // interrupt in-progress compactions
 +        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
 +        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
 +        CompactionManager.instance.waitForCessation(cfss);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +        indexCfs.forceBlockingFlush();
 +        indexCfs.readOrdering.awaitNewBarrier();
 +        indexCfs.invalidate();
 +    }
 +
 +    private boolean isBuilt()
 +    {
 +        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
 +    }
 +
 +    private void markBuilt()
 +    {
 +        SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
 +    }
 +
 +    private void markRemoved()
 +    {
 +        SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getIndexName());
 +    }
 +
 +    private boolean isPrimaryKeyIndex()
 +    {
 +        return indexedColumn.isPrimaryKeyColumn();
 +    }
 +
 +    private Callable<?> getBuildIndexTask()
 +    {
 +        return () -> {
 +            buildBlocking();
 +            return null;
 +        };
 +    }
 +
 +    private void buildBlocking()
 +    {
 +        baseCfs.forceBlockingFlush();
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +             Refs<SSTableReader> sstables = viewFragment.refs)
 +        {
 +            if (sstables.isEmpty())
 +            {
 +                logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
 +                            baseCfs.metadata.ksName,
 +                            baseCfs.metadata.cfName,
 +                            getIndexName());
 +                markBuilt();
 +                return;
 +            }
 +
 +            logger.info("Submitting index build of {} for data in {}",
 +                        getIndexName(),
 +                        getSSTableNames(sstables));
 +
 +            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                      Collections.singleton(this),
 +                                                                      new ReducingKeyIterator(sstables));
 +            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +            FBUtilities.waitOnFuture(future);
 +            indexCfs.forceBlockingFlush();
 +            markBuilt();
 +        }
 +        logger.info("Index build of {} complete", getIndexName());
 +    }
 +
 +    private static String getSSTableNames(Collection<SSTableReader> sstables)
 +    {
 +        return StreamSupport.stream(sstables.spliterator(), false)
 +                            .map(SSTableReader::toString)
 +                            .collect(Collectors.joining(", "));
 +    }
 +
 +    /**
 +     * Construct the CFMetadata for an index table, the clustering columns in the index table
 +     * vary dependent on the kind of the indexed value.
 +     * @param baseCfsMetadata
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
 +    {
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
 +        CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
 +        ColumnDefinition indexedColumn = target.left;
 +        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
 +        CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
 +                                                               baseCfsMetadata.indexColumnFamilyName(indexMetadata))
 +                                                       .withId(baseCfsMetadata.cfId)
 +                                                       .withPartitioner(new LocalPartitioner(indexedValueType))
 +                                                       .addPartitionKey(indexedColumn.name, indexedColumn.type);
 +
 +        builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
 +        builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
 +        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
 +    }
 +
 +    /**
 +     * Factory method for new CassandraIndex instances
 +     * @param baseCfs
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
 +    {
 +        return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
 +    }
 +
 +    // Public because it's also used to convert index metadata into a thrift-compatible format
 +    public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
 +                                                                       IndexMetadata indexDef)
 +    {
 +        String target = indexDef.options.get("target");
 +        assert target != null : String.format("No target definition found for index %s", indexDef.name);
 +
 +        // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
 +        // if not, then it must be a simple column name and implictly its type is VALUES
 +        Matcher matcher = TARGET_REGEX.matcher(target);
 +        String columnName;
 +        IndexTarget.Type targetType;
 +        if (matcher.matches())
 +        {
 +            targetType = IndexTarget.Type.fromString(matcher.group(1));
 +            columnName = matcher.group(2);
 +        }
 +        else
 +        {
 +            columnName = target;
 +            targetType = IndexTarget.Type.VALUES;
 +        }
 +
 +        // in the case of a quoted column name the name in the target string
 +        // will be enclosed in quotes, which we need to unwrap. It may also
 +        // include quote characters internally, escaped like so:
 +        //      abc"def -> abc""def.
 +        // Because the target string is stored in a CQL compatible form, we
 +        // need to un-escape any such quotes to get the actual column name
 +        if (columnName.startsWith("\""))
 +        {
 +            columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
 +            columnName = columnName.replaceAll("\"\"", "\"");
 +        }
 +
 +        // if it's not a CQL table, we can't assume that the column name is utf8, so
 +        // in that case we have to do a linear scan of the cfm's columns to get the matching one
 +        if (cfm.isCQLTable())
 +            return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
 +        else
 +            for (ColumnDefinition column : cfm.allColumns())
 +                if (column.name.toString().equals(columnName))
 +                    return Pair.create(column, targetType);
 +
 +        throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
 +    }
 +
 +    static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
 +                                                Pair<ColumnDefinition, IndexTarget.Type> target)
 +    {
 +        if (indexDef.isKeys())
 +            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
 +
 +        ColumnDefinition indexedColumn = target.left;
 +        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
 +        {
 +            switch (((CollectionType)indexedColumn.type).kind)
 +            {
 +                case LIST:
 +                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                case SET:
 +                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                case MAP:
 +                    switch (target.right)
 +                    {
 +                        case KEYS:
 +                            return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                        case KEYS_AND_VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
 +                        case VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                    }
 +                    throw new AssertionError();
 +            }
 +        }
 +
 +        switch (indexedColumn.kind)
 +        {
 +            case CLUSTERING:
 +                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
 +            case REGULAR:
 +                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
 +            case PARTITION_KEY:
 +                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
 +            //case COMPACT_VALUE:
 +            //    return new CompositesIndexOnCompactValue();
 +        }
 +        throw new AssertionError();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa60cde3/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTable.java
index d66638e,b0aa89e..923ef82
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@@ -111,11 -113,9 +111,11 @@@ public abstract class SSTabl
  
              FileUtils.deleteWithConfirm(desc.filenameFor(component));
          }
 -        FileUtils.delete(desc.filenameFor(Component.SUMMARY));
 +
 +        if (components.contains(Component.SUMMARY))
 +            FileUtils.delete(desc.filenameFor(Component.SUMMARY));
  
-         logger.debug("Deleted {}", desc);
+         logger.trace("Deleted {}", desc);
          return true;
      }