You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2017/04/07 09:49:44 UTC

[6/6] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0

Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/833c993b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/833c993b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/833c993b

Branch: refs/heads/cassandra-3.0
Commit: 833c993b8e604046179067e663f963dcf4c4a2ca
Parents: 8d34076 194329d
Author: Benjamin Lerer <b....@gmail.com>
Authored: Fri Apr 7 11:34:10 2017 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Fri Apr 7 11:39:41 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../restrictions/StatementRestrictions.java     | 17 +++++++++++++++
 .../cql3/statements/SelectStatement.java        |  9 ++++----
 .../index/internal/CassandraIndex.java          |  2 +-
 .../validation/entities/SecondaryIndexTest.java | 23 ++++++++++++++++++++
 5 files changed, 47 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5bcdf16,6ea2d59..33d5028
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -22,22 -4,8 +22,23 @@@ Merged from 2.2
   * Discard in-flight shadow round responses (CASSANDRA-12653)
   * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
   * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +Merged from 2.1:
++ * Fix 2ndary index queries on partition keys for tables with static columns CASSANDRA-13147
 + * Fix ParseError unhashable type list in cqlsh copy from (CASSANDRA-13364)
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 542dec9,2c396c4..d025d8a
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@@ -78,12 -71,16 +78,18 @@@ public final class StatementRestriction
       */
      private RestrictionSet nonPrimaryKeyRestrictions;
  
+     /**
+      * <code>true</code> if nonPrimaryKeyRestrictions contains restriction on a regular column,
+      * <code>false</code> otherwise.
+      */
+     private boolean hasRegularColumnsRestriction = false;
+ 
 +    private Set<ColumnDefinition> notNullColumns;
 +
      /**
 -     * The restrictions used to build the index expressions
 +     * The restrictions used to build the row filter
       */
 -    private final List<Restrictions> indexRestrictions = new ArrayList<>();
 +    private final IndexRestrictions indexRestrictions = new IndexRestrictions();
  
      /**
       * <code>true</code> if the secondary index need to be queried, <code>false</code> otherwise

http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index aca6146,13276c7..bd377f4
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -730,41 -703,35 +730,42 @@@ public class SelectStatement implement
          }
          else
          {
 -            keyComponents = new ByteBuffer[]{ key };
 +            return new ByteBuffer[]{ key };
          }
 +    }
  
 -        Iterator<Cell> cells = cf.getSortedColumns().iterator();
 -        if (restrictions.isNonCompositeSliceWithExclusiveBounds())
 -            cells = applySliceRestriction(cells, options);
 -
 +    // Used by ModificationStatement for CAS operations
 +    void processPartition(RowIterator partition, QueryOptions options, Selection.ResultSetBuilder result, int nowInSec)
 +    throws InvalidRequestException
 +    {
          int protocolVersion = options.getProtocolVersion();
 -        CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
  
 -        // If there is static columns but there is no non-static row,
 -        // and the select was a full partition selection (i.e. there was no condition on clustering or regular columns),
 -        // we want to include the static columns in the result set (and we're done).
 -        CQL3Row staticRow = iter.getStaticRow();
 -        if (staticRow != null && !iter.hasNext() && !restrictions.hasClusteringColumnsRestriction() && !restrictions.hasRegularColumnsRestriction())
 +        ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey());
 +
 +        Row staticRow = partition.staticRow();
-         // If there is no rows, then provided the select was a full partition selection
-         // (i.e. not a 2ndary index search and there was no condition on clustering columns),
++        // If there is no rows, and there's no restriction on clustering/regular columns,
++        // then provided the select was a full partition selection (either by partition key and/or by static column),
 +        // we want to include static columns and we're done.
 +        if (!partition.hasNext())
          {
-             if (!staticRow.isEmpty() && (!restrictions.usesSecondaryIndexing() || cfm.isStaticCompactTable())
-                     && !restrictions.hasClusteringColumnsRestriction())
 -            result.newRow(protocolVersion);
 -            for (ColumnDefinition def : selection.getColumns())
++            if (!staticRow.isEmpty()
++                && (!restrictions.hasClusteringColumnsRestriction() || cfm.isStaticCompactTable())
++                && !restrictions.hasRegularColumnsRestriction())
              {
 -                switch (def.kind)
 +                result.newRow(protocolVersion);
 +                for (ColumnDefinition def : selection.getColumns())
                  {
 -                    case PARTITION_KEY:
 -                        result.add(keyComponents[def.position()]);
 -                        break;
 -                    case STATIC:
 -                        addValue(result, def, staticRow, options);
 -                        break;
 -                    default:
 -                        result.add((ByteBuffer)null);
 +                    switch (def.kind)
 +                    {
 +                        case PARTITION_KEY:
 +                            result.add(keyComponents[def.position()]);
 +                            break;
 +                        case STATIC:
 +                            addValue(result, def, staticRow, nowInSec, protocolVersion);
 +                            break;
 +                        default:
 +                            result.add((ByteBuffer)null);
 +                    }
                  }
              }
              return;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 2a0dec0,0000000..5caeefa
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@@ -1,882 -1,0 +1,882 @@@
 +/*
 + *
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + *
 + */
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.function.BiFunction;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import com.google.common.collect.ImmutableSet;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.CollectionType;
 +import org.apache.cassandra.db.marshal.EmptyType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.dht.LocalPartitioner;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.internal.composites.CompositesSearcher;
 +import org.apache.cassandra.index.internal.keys.KeysSearcher;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +
 +/**
 + * Index implementation which indexes the values for a single column in the base
 + * table and which stores its index data in a local, hidden table.
 + */
 +public abstract class CassandraIndex implements Index
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 +
 +    public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
 +
 +    public final ColumnFamilyStore baseCfs;
 +    protected IndexMetadata metadata;
 +    protected ColumnFamilyStore indexCfs;
 +    protected ColumnDefinition indexedColumn;
 +    protected CassandraIndexFunctions functions;
 +
 +    protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        this.baseCfs = baseCfs;
 +        setMetadata(indexDef);
 +    }
 +
 +    /**
 +     * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
 +     * @param indexedColumn
 +     * @param operator
 +     * @return
 +     */
 +    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
 +    {
 +        return operator == Operator.EQ;
 +    }
 +
 +    /**
 +     * Used to construct an the clustering for an entry in the index table based on values from the base data.
 +     * The clustering columns in the index table encode the values required to retrieve the correct data from the base
 +     * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
 +     * Used whenever a row in the index table is written or deleted.
 +     * @param partitionKey from the base data being indexed
 +     * @param prefix from the base data being indexed
 +     * @param path from the base data being indexed
 +     * @return a clustering prefix to be used to insert into the index table
 +     */
 +    protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                                           ClusteringPrefix prefix,
 +                                                           CellPath path);
 +
 +    /**
 +     * Used at search time to convert a row in the index table into a simple struct containing the values required
 +     * to retrieve the corresponding row from the base table.
 +     * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
 +     * @param indexEntry a row from the index table
 +     * @return
 +     */
 +    public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
 +                                           Row indexEntry);
 +
 +    /**
 +     * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
 +     * Used at read time to identify out of date index entries so that they can be excluded from search results and
 +     * repaired
 +     * @param row the current row from the primary data table
 +     * @param indexValue the value we retrieved from the index
 +     * @param nowInSec
 +     * @return true if the index is out of date and the entry should be dropped
 +     */
 +    public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
 +
 +    /**
 +     * Extract the value to be inserted into the index from the components of the base data
 +     * @param partitionKey from the primary data
 +     * @param clustering from the primary data
 +     * @param path from the primary data
 +     * @param cellValue from the primary data
 +     * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
 +     * key in the index table
 +     */
 +    protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                                  Clustering clustering,
 +                                                  CellPath path,
 +                                                  ByteBuffer cellValue);
 +
 +    public ColumnDefinition getIndexedColumn()
 +    {
 +        return indexedColumn;
 +    }
 +
 +    public ClusteringComparator getIndexComparator()
 +    {
 +        return indexCfs.metadata.comparator;
 +    }
 +
 +    public ColumnFamilyStore getIndexCfs()
 +    {
 +        return indexCfs;
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
 +        // if we're just linking in the index on an already-built index post-restart or if the base
 +        // table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
 +        return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask();
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
 +    }
 +
 +    public Callable<Void> getBlockingFlushTask()
 +    {
 +        return () -> {
 +            indexCfs.forceBlockingFlush();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return () -> {
 +            invalidate();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
 +    {
 +        return () -> {
 +            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
 +            indexCfs.reload();
 +            return null;
 +        };
 +    }
 +
 +    @Override
 +    public void validate(ReadCommand command) throws InvalidRequestException
 +    {
 +        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
 +
 +        if (target.isPresent())
 +        {
 +            ByteBuffer indexValue = target.get().getIndexValue();
 +            checkFalse(indexValue.remaining() > FBUtilities.MAX_UNSIGNED_SHORT,
 +                       "Index expression values may not be larger than 64K");
 +        }
 +    }
 +
 +    private void setMetadata(IndexMetadata indexDef)
 +    {
 +        metadata = indexDef;
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
 +        functions = getFunctions(indexDef, target);
 +        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
 +        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
 +                                                             cfm.cfName,
 +                                                             cfm,
 +                                                             baseCfs.getTracker().loadsstables);
 +        indexedColumn = target.left;
 +    }
 +
 +    public Callable<?> getTruncateTask(final long truncatedAt)
 +    {
 +        return () -> {
 +            indexCfs.discardSSTables(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        // built-in indexes are always included in builds initiated from SecondaryIndexManager
 +        return true;
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return indexedColumn.name.equals(column.name);
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return indexedColumn.name.equals(column.name)
 +               && supportsOperator(indexedColumn, operator);
 +    }
 +
 +    private boolean supportsExpression(RowFilter.Expression expression)
 +    {
 +        return supportsExpression(expression.column(), expression.operator());
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        return indexCfs.getMeanColumns();
 +    }
 +
 +    /**
 +     * No post processing of query results, just return them unchanged
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return getTargetExpression(filter.getExpressions()).map(filter::without)
 +                                                           .orElse(filter);
 +    }
 +
 +    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
 +    {
 +        return expressions.stream().filter(this::supportsExpression).findFirst();
 +    }
 +
 +    public Index.Searcher searcherFor(ReadCommand command)
 +    {
 +        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
 +
 +        if (target.isPresent())
 +        {
 +            target.get().validateForIndexing();
 +            switch (getIndexMetadata().kind)
 +            {
 +                case COMPOSITES:
 +                    return new CompositesSearcher(command, target.get(), this);
 +                case KEYS:
 +                    return new KeysSearcher(command, target.get(), this);
 +                default:
 +                    throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
 +                                                                  metadata.kind,
 +                                                                  metadata.name,
 +                                                                  indexedColumn.name.toString()));
 +            }
 +        }
 +
 +        return null;
 +
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        switch (indexedColumn.kind)
 +        {
 +            case PARTITION_KEY:
 +                validatePartitionKey(update.partitionKey());
 +                break;
 +            case CLUSTERING:
 +                validateClusterings(update);
 +                break;
 +            case REGULAR:
 +                if (update.columns().regulars.contains(indexedColumn))
 +                    validateRows(update);
 +                break;
 +            case STATIC:
 +                if (update.columns().statics.contains(indexedColumn))
 +                    validateRows(Collections.singleton(update.staticRow()));
 +                break;
 +        }
 +    }
 +
 +    public Indexer indexerFor(final DecoratedKey key,
 +                              final PartitionColumns columns,
 +                              final int nowInSec,
 +                              final OpOrder.Group opGroup,
 +                              final IndexTransaction.Type transactionType)
 +    {
 +        /**
 +         * Indexes on regular and static columns (the non primary-key ones) only care about updates with live
 +         * data for the column they index. In particular, they don't care about having just row or range deletions
 +         * as they don't know how to update the index table unless they know exactly the value that is deleted.
 +         *
 +         * Note that in practice this means that those indexes are only purged of stale entries on compaction,
 +         * when we resolve both the deletion and the prior data it deletes. Of course, such stale entries are also
 +         * filtered on read.
 +         */
 +        if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn))
 +            return null;
 +
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {
 +            }
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {
 +            }
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {
 +            }
 +
 +            public void insertRow(Row row)
 +            {
-                 if (row.isStatic() != indexedColumn.isStatic())
++                if (row.isStatic() && !indexedColumn.isStatic() && !indexedColumn.isPartitionKey())
 +                    return;
 +
 +                if (isPrimaryKeyIndex())
 +                {
 +                    indexPrimaryKey(row.clustering(),
 +                                    getPrimaryKeyIndexLiveness(row),
 +                                    row.deletion());
 +                }
 +                else
 +                {
 +                    if (indexedColumn.isComplex())
 +                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                    else
 +                        indexCell(row.clustering(), row.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void removeRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    return;
 +
 +                if (indexedColumn.isComplex())
 +                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                else
 +                    removeCell(row.clustering(), row.getCell(indexedColumn));
 +            }
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                assert oldRow.isStatic() == newRow.isStatic();
 +                if (newRow.isStatic() != indexedColumn.isStatic())
 +                    return;
 +
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(newRow.clustering(),
 +                                    newRow.primaryKeyLivenessInfo(),
 +                                    newRow.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                {
 +                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
 +                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
 +                }
 +                else
 +                {
 +                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
 +                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void finish()
 +            {
 +            }
 +
 +            private void indexCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    indexCell(clustering, cell);
 +            }
 +
 +            private void indexCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                insert(key.getKey(),
 +                       clustering,
 +                       cell,
 +                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
 +                       opGroup);
 +            }
 +
 +            private void removeCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    removeCell(clustering, cell);
 +            }
 +
 +            private void removeCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
 +            }
 +
 +            private void indexPrimaryKey(final Clustering clustering,
 +                                         final LivenessInfo liveness,
 +                                         final Row.Deletion deletion)
 +            {
 +                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
 +                    insert(key.getKey(), clustering, null, liveness, opGroup);
 +
 +                if (!deletion.isLive())
 +                    delete(key.getKey(), clustering, deletion.time(), opGroup);
 +            }
 +
 +            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
 +            {
 +                long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +                int ttl = row.primaryKeyLivenessInfo().ttl();
 +                for (Cell cell : row.cells())
 +                {
 +                    long cellTimestamp = cell.timestamp();
 +                    if (cell.isLive(nowInSec))
 +                    {
 +                        if (cellTimestamp > timestamp)
 +                        {
 +                            timestamp = cellTimestamp;
 +                            ttl = cell.ttl();
 +                        }
 +                    }
 +                }
 +                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Specific to internal indexes, this is called by a
 +     * searcher when it encounters a stale entry in the index
 +     * @param indexKey the partition key in the index table
 +     * @param indexClustering the clustering in the index table
 +     * @param deletion deletion timestamp etc
 +     * @param opGroup the operation under which to perform the deletion
 +     */
 +    public void deleteStaleEntry(DecoratedKey indexKey,
 +                                 Clustering indexClustering,
 +                                 DeletionTime deletion,
 +                                 OpOrder.Group opGroup)
 +    {
 +        doDelete(indexKey, indexClustering, deletion, opGroup);
 +        logger.trace("Removed index entry for stale value {}", indexKey);
 +    }
 +
 +    /**
 +     * Called when adding a new entry to the index
 +     */
 +    private void insert(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        LivenessInfo info,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
 +        PartitionUpdate upd = partitionUpdate(valueKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.trace("Inserted entry into index for value {}", valueKey);
 +    }
 +
 +    /**
 +     * Called when deleting entries on non-primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        OpOrder.Group opGroup,
 +                        int nowInSec)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, cell),
 +                 new DeletionTime(cell.timestamp(), nowInSec),
 +                 opGroup);
 +    }
 +
 +    /**
 +     * Called when deleting entries from indexes on primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        DeletionTime deletion,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               null));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, null),
 +                 deletion,
 +                 opGroup);
 +    }
 +
 +    private void doDelete(DecoratedKey indexKey,
 +                          Clustering indexClustering,
 +                          DeletionTime deletion,
 +                          OpOrder.Group opGroup)
 +    {
 +        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
 +        PartitionUpdate upd = partitionUpdate(indexKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.trace("Removed index entry for value {}", indexKey);
 +    }
 +
 +    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isPartitionKey();
 +        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
 +    }
 +
 +    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isClusteringColumn();
 +        for (Row row : update)
 +            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
 +    }
 +
 +    private void validateRows(Iterable<Row> rows)
 +    {
 +        assert !indexedColumn.isPrimaryKeyColumn();
 +        for (Row row : rows)
 +        {
 +            if (indexedColumn.isComplex())
 +            {
 +                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
 +                if (data != null)
 +                {
 +                    for (Cell cell : data)
 +                    {
 +                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
 +                    }
 +                }
 +            }
 +            else
 +            {
 +                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
 +            }
 +        }
 +    }
 +
 +    private void validateIndexedValue(ByteBuffer value)
 +    {
 +        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
 +            throw new InvalidRequestException(String.format(
 +                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
 +                                                           value.remaining(),
 +                                                           metadata.name,
 +                                                           baseCfs.metadata.ksName,
 +                                                           baseCfs.metadata.cfName,
 +                                                           indexedColumn.name.toString(),
 +                                                           FBUtilities.MAX_UNSIGNED_SHORT));
 +    }
 +
 +    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
 +                                       Clustering clustering,
 +                                       Cell cell)
 +    {
 +        return getIndexedValue(rowKey,
 +                               clustering,
 +                               cell == null ? null : cell.path(),
 +                               cell == null ? null : cell.value()
 +        );
 +    }
 +
 +    private Clustering buildIndexClustering(ByteBuffer rowKey,
 +                                            Clustering clustering,
 +                                            Cell cell)
 +    {
 +        return buildIndexClusteringPrefix(rowKey,
 +                                          clustering,
 +                                          cell == null ? null : cell.path()).build();
 +    }
 +
 +    private DecoratedKey getIndexKeyFor(ByteBuffer value)
 +    {
 +        return indexCfs.decorateKey(value);
 +    }
 +
 +    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
 +    {
 +        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 +    }
 +
 +    private void invalidate()
 +    {
 +        // interrupt in-progress compactions
 +        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
 +        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
 +        CompactionManager.instance.waitForCessation(cfss);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +        indexCfs.forceBlockingFlush();
 +        indexCfs.readOrdering.awaitNewBarrier();
 +        indexCfs.invalidate();
 +    }
 +
 +    private boolean isBuilt()
 +    {
 +        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name);
 +    }
 +
 +    private boolean isPrimaryKeyIndex()
 +    {
 +        return indexedColumn.isPrimaryKeyColumn();
 +    }
 +
 +    private Callable<?> getBuildIndexTask()
 +    {
 +        return () -> {
 +            buildBlocking();
 +            return null;
 +        };
 +    }
 +
 +    private void buildBlocking()
 +    {
 +        baseCfs.forceBlockingFlush();
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
 +             Refs<SSTableReader> sstables = viewFragment.refs)
 +        {
 +            if (sstables.isEmpty())
 +            {
 +                logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
 +                            baseCfs.metadata.ksName,
 +                            baseCfs.metadata.cfName,
 +                            metadata.name);
 +                baseCfs.indexManager.markIndexBuilt(metadata.name);
 +                return;
 +            }
 +
 +            logger.info("Submitting index build of {} for data in {}",
 +                        metadata.name,
 +                        getSSTableNames(sstables));
 +
 +            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                      Collections.singleton(this),
 +                                                                      new ReducingKeyIterator(sstables));
 +            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +            FBUtilities.waitOnFuture(future);
 +            indexCfs.forceBlockingFlush();
 +            baseCfs.indexManager.markIndexBuilt(metadata.name);
 +        }
 +        logger.info("Index build of {} complete", metadata.name);
 +    }
 +
 +    private static String getSSTableNames(Collection<SSTableReader> sstables)
 +    {
 +        return StreamSupport.stream(sstables.spliterator(), false)
 +                            .map(SSTableReader::toString)
 +                            .collect(Collectors.joining(", "));
 +    }
 +
 +    /**
 +     * Construct the CFMetadata for an index table, the clustering columns in the index table
 +     * vary dependent on the kind of the indexed value.
 +     * @param baseCfsMetadata
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
 +    {
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
 +        CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
 +        ColumnDefinition indexedColumn = target.left;
 +        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
 +
 +        // Tables for legacy KEYS indexes are non-compound and dense
 +        CFMetaData.Builder builder = indexMetadata.isKeys()
 +                                     ? CFMetaData.Builder.create(baseCfsMetadata.ksName,
 +                                                                 baseCfsMetadata.indexColumnFamilyName(indexMetadata),
 +                                                                 true, false, false)
 +                                     : CFMetaData.Builder.create(baseCfsMetadata.ksName,
 +                                                                 baseCfsMetadata.indexColumnFamilyName(indexMetadata));
 +
 +        builder =  builder.withId(baseCfsMetadata.cfId)
 +                          .withPartitioner(new LocalPartitioner(indexedValueType))
 +                          .addPartitionKey(indexedColumn.name, indexedColumn.type)
 +                          .addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
 +
 +        if (indexMetadata.isKeys())
 +        {
 +            // A dense, compact table for KEYS indexes must have a compact
 +            // value column defined, even though it is never used
 +            CompactTables.DefaultNames names =
 +                CompactTables.defaultNameGenerator(ImmutableSet.of(indexedColumn.name.toString(), "partition_key"));
 +            builder = builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance);
 +        }
 +        else
 +        {
 +            // The clustering columns for a table backing a COMPOSITES index are dependent
 +            // on the specific type of index (there are specializations for indexes on collections)
 +            builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
 +        }
 +
 +        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
 +    }
 +
 +    /**
 +     * Factory method for new CassandraIndex instances
 +     * @param baseCfs
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
 +    {
 +        return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
 +    }
 +
 +    // Public because it's also used to convert index metadata into a thrift-compatible format
 +    public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
 +                                                                       IndexMetadata indexDef)
 +    {
 +        String target = indexDef.options.get("target");
 +        assert target != null : String.format("No target definition found for index %s", indexDef.name);
 +
 +        // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
 +        // if not, then it must be a simple column name and implictly its type is VALUES
 +        Matcher matcher = TARGET_REGEX.matcher(target);
 +        String columnName;
 +        IndexTarget.Type targetType;
 +        if (matcher.matches())
 +        {
 +            targetType = IndexTarget.Type.fromString(matcher.group(1));
 +            columnName = matcher.group(2);
 +        }
 +        else
 +        {
 +            columnName = target;
 +            targetType = IndexTarget.Type.VALUES;
 +        }
 +
 +        // in the case of a quoted column name the name in the target string
 +        // will be enclosed in quotes, which we need to unwrap. It may also
 +        // include quote characters internally, escaped like so:
 +        //      abc"def -> abc""def.
 +        // Because the target string is stored in a CQL compatible form, we
 +        // need to un-escape any such quotes to get the actual column name
 +        if (columnName.startsWith("\""))
 +        {
 +            columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
 +            columnName = columnName.replaceAll("\"\"", "\"");
 +        }
 +
 +        // if it's not a CQL table, we can't assume that the column name is utf8, so
 +        // in that case we have to do a linear scan of the cfm's columns to get the matching one
 +        if (cfm.isCQLTable())
 +            return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
 +        else
 +            for (ColumnDefinition column : cfm.allColumns())
 +                if (column.name.toString().equals(columnName))
 +                    return Pair.create(column, targetType);
 +
 +        throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
 +    }
 +
 +    static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
 +                                                Pair<ColumnDefinition, IndexTarget.Type> target)
 +    {
 +        if (indexDef.isKeys())
 +            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
 +
 +        ColumnDefinition indexedColumn = target.left;
 +        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
 +        {
 +            switch (((CollectionType)indexedColumn.type).kind)
 +            {
 +                case LIST:
 +                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                case SET:
 +                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                case MAP:
 +                    switch (target.right)
 +                    {
 +                        case KEYS:
 +                            return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                        case KEYS_AND_VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
 +                        case VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                    }
 +                    throw new AssertionError();
 +            }
 +        }
 +
 +        switch (indexedColumn.kind)
 +        {
 +            case CLUSTERING:
 +                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
 +            case REGULAR:
 +                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
 +            case PARTITION_KEY:
 +                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
 +            //case COMPACT_VALUE:
 +            //    return new CompositesIndexOnCompactValue();
 +        }
 +        throw new AssertionError();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/833c993b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
index 5d43bd2,b653f4e..8376652
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java
@@@ -1159,37 -845,90 +1159,60 @@@ public class SecondaryIndexTest extend
                     row(bytes("foo124"), EMPTY_BYTE_BUFFER));
      }
  
+     @Test
 -    public void testIndexOnRegularColumnWithPartitionWithoutRows() throws Throwable
 -    {
 -        createTable("CREATE TABLE %s (pk int, c int, s int static, v int, PRIMARY KEY(pk, c))");
 -        createIndex("CREATE INDEX ON %s (v)");
 -        execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 1);
 -        execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 2);
 -        execute("INSERT INTO %s (pk, s) VALUES (?, ?)", 2, 9);
 -        execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 1);
 -        flush();
 -        execute("DELETE FROM %s WHERE pk = ? and c = ?", 3, 1);
 -        assertRows(execute("SELECT * FROM %s WHERE v = ?", 1),
 -                   row(1, 1, 9, 1));
 -    }
 -
 -    /**
 -     * Custom index used to test the behavior of the system when the index is not ready.
 -     * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code>
 -     * to avoid the check but return a <code>CompositesSearcher</code>.
 -     */
 -    public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex
++    public void testIndexOnPartitionKeyWithStaticColumnAndNoRows() throws Throwable
+     {
 -        private volatile CountDownLatch latch = new CountDownLatch(1);
++        createTable("CREATE TABLE %s (pk1 int, pk2 int, c int, s int static, v int, PRIMARY KEY((pk1, pk2), c))");
++        createIndex("CREATE INDEX ON %s (pk2)");
++        execute("INSERT INTO %s (pk1, pk2, c, s, v) VALUES (?, ?, ?, ?, ?)", 1, 1, 1, 9, 1);
++        execute("INSERT INTO %s (pk1, pk2, c, s, v) VALUES (?, ?, ?, ?, ?)", 1, 1, 2, 9, 2);
++        execute("INSERT INTO %s (pk1, pk2, s) VALUES (?, ?, ?)", 2, 1, 9);
++        execute("INSERT INTO %s (pk1, pk2, c, s, v) VALUES (?, ?, ?, ?, ?)", 3, 1, 1, 9, 1);
+ 
 -        @Override
 -        public void index(ByteBuffer rowKey, ColumnFamily cf)
 -        {
 -            try
 -            {
 -                latch.await();
 -            }
 -            catch (InterruptedException e)
 -            {
 -                Thread.interrupted();
 -            }
 -        }
++        assertRows(execute("SELECT * FROM %s WHERE pk2 = ?", 1),
++                   row(2, 1, null, 9, null),
++                   row(1, 1, 1, 9, 1),
++                   row(1, 1, 2, 9, 2),
++                   row(3, 1, 1, 9, 1));
+ 
 -        @Override
 -        public void delete(DecoratedKey key, Group opGroup)
 -        {
 -        }
++        execute("UPDATE %s SET s=?, v=? WHERE pk1=? AND pk2=? AND c=?", 9, 1, 1, 10, 2);
++        assertRows(execute("SELECT * FROM %s WHERE pk2 = ?", 10), row(1, 10, 2, 9, 1));
+ 
 -        @Override
 -        public void init()
 -        {
 -        }
++        execute("UPDATE %s SET s=? WHERE pk1=? AND pk2=?", 9, 1, 20);
++        assertRows(execute("SELECT * FROM %s WHERE pk2 = ?", 20), row(1, 20, null, 9, null));
++    }
+ 
 -        @Override
 -        public void reload()
 -        {
 -        }
 +    private ResultMessage.Prepared prepareStatement(String cql, boolean forThrift)
 +    {
 +        return QueryProcessor.prepare(String.format(cql, KEYSPACE, currentTable()),
 +                                      ClientState.forInternalCalls(),
 +                                      forThrift);
 +    }
  
 -        @Override
 -        public void validateOptions() throws ConfigurationException
 -        {
 -        }
 +    private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp)
 +    {
 +        assertNotNull(cell);
 +        assertEquals(0, def.type.compare(cell.value(), val));
 +        assertEquals(timestamp, cell.timestamp());
 +    }
  
 -        @Override
 -        public String getIndexName()
 -        {
 -            return "testIndex";
 -        }
 +    private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm)
 +    {
 +        ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true));
 +        AbstractType<?> type = col.type;
 +        assertEquals(expected, type.compose(row.getCell(col).value()));
 +    }
  
 -        @Override
 -        protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
 -        {
 -            return new CompositesSearcher(baseCfs.indexManager, columns)
 -            {
 -                @Override
 -                public boolean canHandleIndexClause(List<IndexExpression> clause)
 -                {
 -                    return true;
 -                }
 -
 -                @Override
 -                public void validate(IndexExpression indexExpression) throws InvalidRequestException
 -                {
 -                }
 -            };
 -        }
 +    /**
 +     * <code>CassandraIndex</code> that blocks during the initialization.
 +     */
 +    public static class IndexBlockingOnInitialization extends CustomCassandraIndex
 +    {
 +        private final CountDownLatch latch = new CountDownLatch(1);
  
 -        @Override
 -        public void forceBlockingFlush()
 +        public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
          {
 +            super(baseCfs, indexDef);
          }
  
          @Override