You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/03 14:49:25 UTC

git commit: Secondary index support for collections

Updated Branches:
  refs/heads/trunk 57516e082 -> d12a0d7b0


Secondary index support for collections

patch by slebresne; reviewed by iamaleksey for CASSANDRA-4511


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d12a0d7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d12a0d7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d12a0d7b

Branch: refs/heads/trunk
Commit: d12a0d7b0299786bf1d0484f3770bae6a94cb0c9
Parents: 57516e0
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Nov 14 09:17:51 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Dec 3 14:49:02 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/cql3/Cql.g        |   4 +
 .../org/apache/cassandra/cql3/Relation.java     |  15 ++-
 .../cql3/statements/CreateIndexStatement.java   |  21 +++-
 .../cassandra/cql3/statements/Restriction.java  | 111 +++++++++++++++++-
 .../cql3/statements/SelectStatement.java        |  71 ++++++++++--
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 .../apache/cassandra/db/IndexExpression.java    |  19 +++-
 .../cassandra/db/filter/ExtendedFilter.java     |  47 ++++++--
 .../AbstractSimplePerColumnSecondaryIndex.java  |  13 ++-
 .../db/index/SecondaryIndexSearcher.java        |   2 +-
 .../db/index/composites/CompositesIndex.java    |  50 ++++++++-
 .../CompositesIndexOnCollectionKey.java         | 112 +++++++++++++++++++
 .../CompositesIndexOnCollectionValue.java       | 110 ++++++++++++++++++
 .../db/index/composites/CompositesSearcher.java |  21 +++-
 15 files changed, 566 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3bc50ac..08c3a67 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
  * User-defined types for CQL3 (CASSANDRA-5590)
  * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
  * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511)
 
 
 2.0.4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 325d6f6..fb0054d 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -947,6 +947,8 @@ relation[List<Relation> clauses]
         { $clauses.add(new Relation(name, Relation.Type.IN, marker)); }
     | name=cident K_IN { Relation rel = Relation.createInRelation($name.id); }
        '(' ( f1=term { rel.addInValue(f1); } (',' fN=term { rel.addInValue(fN); } )* )? ')' { $clauses.add(rel); }
+    | name=cident K_CONTAINS { Relation.Type rt = Relation.Type.CONTAINS; } /* (K_KEY { rt = Relation.Type.CONTAINS_KEY })? */
+        t=term { $clauses.add(new Relation(name, rt, t)); }
     | '(' relation[$clauses] ')'
     ;
 
@@ -1045,6 +1047,7 @@ basic_unreserved_keyword returns [String str]
         | K_CUSTOM
         | K_TRIGGER
         | K_DISTINCT
+        | K_CONTAINS
         ) { $str = $k.text; }
     ;
 
@@ -1101,6 +1104,7 @@ K_DESC:        D E S C;
 K_ALLOW:       A L L O W;
 K_FILTERING:   F I L T E R I N G;
 K_IF:          I F;
+K_CONTAINS:    C O N T A I N S;
 
 K_GRANT:       G R A N T;
 K_ALL:         A L L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/Relation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java
index 15ed540..cfcdd54 100644
--- a/src/java/org/apache/cassandra/cql3/Relation.java
+++ b/src/java/org/apache/cassandra/cql3/Relation.java
@@ -35,7 +35,20 @@ public class Relation
 
     public static enum Type
     {
-        EQ, LT, LTE, GTE, GT, IN;
+        EQ, LT, LTE, GTE, GT, IN, CONTAINS, CONTAINS_KEY;
+
+        public boolean allowsIndexQuery()
+        {
+            switch (this)
+            {
+                case EQ:
+                case CONTAINS:
+                case CONTAINS_KEY:
+                    return true;
+                default:
+                    return false;
+            }
+        }
     }
 
     private Relation(ColumnIdentifier entity, Type type, Term.Raw value, List<Term.Raw> inValues, boolean onToken)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index b040121..ae6c15c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -18,10 +18,13 @@
 package org.apache.cassandra.cql3.statements;
 
 import java.util.Collections;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
@@ -91,9 +94,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
         if (cfm.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR)
             throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.kind, columnName));
 
-        if (cd.type.isCollection() && !isCustom)
-            throw new InvalidRequestException("Indexes on collections are no yet supported");
-
         if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
             throw new InvalidRequestException(String.format("Cannot add secondary index to already primarily indexed column %s", columnName));
     }
@@ -108,11 +108,24 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             return;
 
         if (isCustom)
+        {
             cd.setIndexType(IndexType.CUSTOM, Collections.singletonMap(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, indexClass));
+        }
         else if (cfm.hasCompositeComparator())
-            cd.setIndexType(IndexType.COMPOSITES, Collections.<String, String>emptyMap());
+        {
+            Map<String, String> options = Collections.emptyMap();
+            // For now, we only allow indexing values for collections, but we could later allow
+            // to also index map keys, so we record that this is the values we index to make our
+            // lives easier then.
+            if (cd.type.isCollection())
+                options = ImmutableMap.of("index_values", "");
+
+            cd.setIndexType(IndexType.COMPOSITES, options);
+        }
         else
+        {
             cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+        }
 
         cd.setIndexName(indexName);
         cfm.addDefaultIndexNames();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 9119a9d..b6f900c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -37,8 +37,9 @@ public interface Restriction
     public boolean isSlice();
     public boolean isEQ();
     public boolean isIN();
+    public boolean isContains();
 
-    // Only supported for EQ and IN, but it's convenient to have here
+    // Not supported by Slice, but it's convenient to have here
     public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
 
     public static class EQ implements Restriction
@@ -72,6 +73,11 @@ public interface Restriction
             return false;
         }
 
+        public boolean isContains()
+        {
+            return false;
+        }
+
         public boolean isOnToken()
         {
             return onToken;
@@ -107,6 +113,11 @@ public interface Restriction
             return false;
         }
 
+        public boolean isContains()
+        {
+            return false;
+        }
+
         public boolean isIN()
         {
             return true;
@@ -210,6 +221,11 @@ public interface Restriction
             return false;
         }
 
+        public boolean isContains()
+        {
+            return false;
+        }
+
         public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
         {
             throw new UnsupportedOperationException();
@@ -302,4 +318,97 @@ public interface Restriction
                                                           onToken ? "*" : "");
         }
     }
+
+    // This holds both CONTAINS and CONTAINS_KEY restriction because we might want to have both of them.
+    public static class Contains implements Restriction
+    {
+        private List<Term> values; // for CONTAINS
+        private List<Term> keys;   // for CONTAINS_KEY
+
+        public boolean hasContains()
+        {
+            return values != null;
+        }
+
+        public boolean hasContainsKey()
+        {
+            return keys != null;
+        }
+
+        public void add(Term t, boolean isKey)
+        {
+            if (isKey)
+                addKey(t);
+            else
+                addValue(t);
+        }
+
+        public void addValue(Term t)
+        {
+            if (values == null)
+                values = new ArrayList<>();
+            values.add(t);
+        }
+
+        public void addKey(Term t)
+        {
+            if (keys == null)
+                keys = new ArrayList<>();
+            keys.add(t);
+        }
+
+        public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            if (values == null)
+                return Collections.emptyList();
+
+            List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
+            for (Term value : values)
+                buffers.add(value.bindAndGet(variables));
+            return buffers;
+        }
+
+        public List<ByteBuffer> keys(List<ByteBuffer> variables) throws InvalidRequestException
+        {
+            if (keys == null)
+                return Collections.emptyList();
+
+            List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(keys.size());
+            for (Term value : keys)
+                buffers.add(value.bindAndGet(variables));
+            return buffers;
+        }
+
+        public boolean isSlice()
+        {
+            return false;
+        }
+
+        public boolean isEQ()
+        {
+            return false;
+        }
+
+        public boolean isIN()
+        {
+            return false;
+        }
+
+        public boolean isContains()
+        {
+            return true;
+        }
+
+        public boolean isOnToken()
+        {
+            return false;
+        }
+
+
+        @Override
+        public String toString()
+        {
+            return String.format("CONTAINS(values=%s, keys=%s)", values, keys);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 344e926..62ebd21 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -756,15 +756,25 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 {
                     if (slice.hasBound(b))
                     {
-                        ByteBuffer value = slice.bound(b, variables);
-                        if (value == null)
-                            throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
-                        if (value.remaining() > 0xFFFF)
-                            throw new InvalidRequestException("Index expression values may not be larger than 64K");
+                        ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables));
                         expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value));
                     }
                 }
             }
+            else if (restriction.isContains())
+            {
+                Restriction.Contains contains = (Restriction.Contains)restriction;
+                for (ByteBuffer value : contains.values(variables))
+                {
+                    validateIndexedValue(def, value);
+                    expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value));
+                }
+                for (ByteBuffer key : contains.keys(variables))
+                {
+                    validateIndexedValue(def, key);
+                    expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key));
+                }
+            }
             else
             {
                 List<ByteBuffer> values = restriction.values(variables);
@@ -772,17 +782,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 if (values.size() != 1)
                     throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
 
-                ByteBuffer value = values.get(0);
-                if (value == null)
-                    throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
-                if (value.remaining() > 0xFFFF)
-                    throw new InvalidRequestException("Index expression values may not be larger than 64K");
+                ByteBuffer value = validateIndexedValue(def, values.get(0));
                 expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.EQ, value));
             }
         }
         return expressions;
     }
 
+    private static ByteBuffer validateIndexedValue(ColumnDefinition def, ByteBuffer value) throws InvalidRequestException
+    {
+        if (value == null)
+            throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
+        if (value.remaining() > 0xFFFF)
+            throw new InvalidRequestException("Index expression values may not be larger than 64K");
+        return value;
+    }
 
     private Iterable<Column> columnsInOrder(final ColumnFamily cf, final List<ByteBuffer> variables) throws InvalidRequestException
     {
@@ -1109,7 +1123,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                 }
 
                 stmt.restrictedColumns.add(def);
-                if (def.isIndexed() && rel.operator() == Relation.Type.EQ)
+                if (def.isIndexed() && rel.operator().allowsIndexQuery())
                 {
                     hasQueriableIndex = true;
                     if (def.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
@@ -1490,10 +1504,45 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
                         ((Restriction.Slice)restriction).setBound(def.name, newRel.operator(), t);
                     }
                     break;
+                case CONTAINS_KEY:
+                    if (!(receiver.type instanceof MapType))
+                        throw new InvalidRequestException(String.format("Cannot use CONTAINS_KEY on non-map column %s", def.name));
+                    // Fallthrough on purpose
+                case CONTAINS:
+                    {
+                        if (!receiver.type.isCollection())
+                            throw new InvalidRequestException(String.format("Cannot use %s relation on non collection column %s", newRel.operator(), def.name));
+
+                        if (restriction == null)
+                            restriction = new Restriction.Contains();
+                        else if (!restriction.isContains())
+                            throw new InvalidRequestException(String.format("Collection column %s can only be restricted by CONTAINS or CONTAINS KEY", def.name));
+                        boolean isKey = newRel.operator() == Relation.Type.CONTAINS_KEY;
+                        receiver = makeCollectionReceiver(receiver, isKey);
+                        Term t = newRel.getValue().prepare(receiver);
+                        ((Restriction.Contains)restriction).add(t, isKey);
+                    }
             }
             return restriction;
         }
 
+        private static ColumnSpecification makeCollectionReceiver(ColumnSpecification collection, boolean isKey)
+        {
+            assert collection.type.isCollection();
+            switch (((CollectionType)collection.type).kind)
+            {
+                case LIST:
+                    assert !isKey;
+                    return Lists.valueSpecOf(collection);
+                case SET:
+                    assert !isKey;
+                    return Sets.valueSpecOf(collection);
+                case MAP:
+                    return isKey ? Maps.keySpecOf(collection) : Maps.valueSpecOf(collection);
+            }
+            throw new AssertionError();
+        }
+
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b00d220..396bbd3 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1697,7 +1697,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                     removeDroppedColumns(data);
 
-                    if (!filter.isSatisfiedBy(rawRow.key, data, null))
+                    if (!filter.isSatisfiedBy(rawRow.key, data, null, null))
                         continue;
 
                     logger.trace("{} satisfies all filter expressions", data);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/IndexExpression.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexExpression.java b/src/java/org/apache/cassandra/db/IndexExpression.java
index e08e41f..b57890a 100644
--- a/src/java/org/apache/cassandra/db/IndexExpression.java
+++ b/src/java/org/apache/cassandra/db/IndexExpression.java
@@ -40,7 +40,7 @@ public class IndexExpression
 
     public enum Operator
     {
-        EQ, GTE, GT, LTE, LT;
+        EQ, GTE, GT, LTE, LT, CONTAINS, CONTAINS_KEY;
 
         public static Operator findByOrdinal(int ordinal)
         {
@@ -55,10 +55,27 @@ public class IndexExpression
                     return LTE;
                 case 4:
                     return LT;
+                case 5:
+                    return CONTAINS;
+                case 6:
+                    return CONTAINS_KEY;
                 default:
                     throw new AssertionError();
             }
         }
+
+        public boolean allowsIndexQuery()
+        {
+            switch (this)
+            {
+                case EQ:
+                case CONTAINS:
+                case CONTAINS_KEY:
+                    return true;
+                default:
+                    return false;
+            }
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 89d6683..e749871 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.filter;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -30,8 +31,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnNameBuilder;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 
 /**
@@ -129,7 +129,7 @@ public abstract class ExtendedFilter
      * @return true if the provided data satisfies all the expressions from
      * the clause of this filter.
      */
-    public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder);
+    public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement);
 
     public static boolean satisfies(int comparison, IndexExpression.Operator op)
     {
@@ -279,10 +279,8 @@ public abstract class ExtendedFilter
             return pruned;
         }
 
-        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement)
         {
-            // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
-            // where the index returned a row which doesn't have the primary column when we actually read it
             for (IndexExpression expression : clause)
             {
                 ColumnDefinition def = data.metadata().getColumnDefinition(expression.column);
@@ -301,6 +299,13 @@ public abstract class ExtendedFilter
                 }
                 else
                 {
+                    if (def.type.isCollection())
+                    {
+                        if (!collectionSatisfies(def, data, builder, expression, collectionElement))
+                            return false;
+                        continue;
+                    }
+
                     dataValue = extractDataValue(def, rowKey.key, data, builder);
                     validator = def.type;
                 }
@@ -315,6 +320,34 @@ public abstract class ExtendedFilter
             return true;
         }
 
+        private static boolean collectionSatisfies(ColumnDefinition def, ColumnFamily data, ColumnNameBuilder builder, IndexExpression expr, ByteBuffer collectionElement)
+        {
+            assert def.type.isCollection();
+
+            CollectionType type = (CollectionType)def.type;
+            builder = builder.copy().add(def.name.bytes);
+
+            switch (type.kind)
+            {
+                case LIST:
+                    assert collectionElement != null;
+                    return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0;
+                case SET:
+                    return data.getColumn(builder.add(expr.value).build()) != null;
+                case MAP:
+                    if (expr.operator == IndexExpression.Operator.CONTAINS_KEY)
+                    {
+                        return data.getColumn(builder.add(expr.value).build()) != null;
+                    }
+                    else
+                    {
+                        assert collectionElement != null;
+                        return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0;
+                    }
+            }
+            throw new AssertionError();
+        }
+
         private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
         {
             switch (def.kind)
@@ -359,7 +392,7 @@ public abstract class ExtendedFilter
             return data;
         }
 
-        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
+        public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement)
         {
             return true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 699b391..b7593ad 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -50,10 +50,21 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              indexedCfMetadata.cfName,
-                                                             new LocalPartitioner(columnDef.type),
+                                                             new LocalPartitioner(getIndexKeyComparator()),
                                                              indexedCfMetadata);
     }
 
+    protected AbstractType<?> getIndexKeyComparator()
+    {
+        return columnDef.type;
+    }
+
+    @Override
+    public DecoratedKey getIndexKeyFor(ByteBuffer value)
+    {
+        return new DecoratedKey(new LocalToken(getIndexKeyComparator(), value), value);
+    }
+
     protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column);
 
     protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index f18357b..a508a15 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -67,7 +67,7 @@ public abstract class SecondaryIndexSearcher
                 continue;
 
             SecondaryIndex index = indexManager.getIndexForColumn(expression.column);
-            if (index == null || expression.operator != IndexExpression.Operator.EQ)
+            if (index == null || !expression.operator.allowsIndexQuery())
                 continue;
             int columns = index.getIndexCfs().getMeanColumns();
             candidates.put(index, columns);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 3dea495..6d137ca 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -53,6 +53,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
     public static CompositesIndex create(ColumnDefinition cfDef)
     {
+        if (cfDef.type.isCollection())
+        {
+            switch (((CollectionType)cfDef.type).kind)
+            {
+                case LIST:
+                    return new CompositesIndexOnCollectionValue();
+                case SET:
+                    return new CompositesIndexOnCollectionKey();
+                case MAP:
+                    return new CompositesIndexOnCollectionValue();
+            }
+        }
+
         switch (cfDef.kind)
         {
             case CLUSTERING_COLUMN:
@@ -70,6 +83,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
     // Check SecondaryIndex.getIndexComparator if you want to know why this is static
     public static CompositeType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
     {
+        if (cfDef.type.isCollection())
+        {
+            switch (((CollectionType)cfDef.type).kind)
+            {
+                case LIST:
+                    return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
+                case SET:
+                    return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
+                case MAP:
+                    return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
+            }
+        }
+
         switch (cfDef.kind)
         {
             case CLUSTERING_COLUMN:
@@ -127,10 +153,12 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
         ColumnDefinition columnDef = columnDefs.iterator().next();
         Map<String, String> options = new HashMap<String, String>(columnDef.getIndexOptions());
 
-        // We take no options though we used to have one called "prefix_size",
-        // so skip it silently for backward compatibility sake.
+        // We used to have an option called "prefix_size" so skip it silently for backward compatibility sake.
         options.remove("prefix_size");
 
+        if (columnDef.type.isCollection())
+            options.remove("index_values");
+
         if (!options.isEmpty())
             throw new ConfigurationException("Unknown options provided for COMPOSITES index: " + options.keySet());
     }
@@ -143,14 +171,30 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
 
         public final ByteBuffer indexedKey;
         public final ColumnNameBuilder indexedEntryNameBuilder;
+        public final ByteBuffer indexedEntryCollectionKey; // may be null
+
+        public IndexedEntry(DecoratedKey indexValue,
+                            ByteBuffer indexEntry,
+                            long timestamp,
+                            ByteBuffer indexedKey,
+                            ColumnNameBuilder indexedEntryNameBuilder)
+        {
+            this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryNameBuilder, null);
+        }
 
-        public IndexedEntry(DecoratedKey indexValue, ByteBuffer indexEntry, long timestamp, ByteBuffer indexedKey, ColumnNameBuilder indexedEntryNameBuilder)
+        public IndexedEntry(DecoratedKey indexValue,
+                            ByteBuffer indexEntry,
+                            long timestamp,
+                            ByteBuffer indexedKey,
+                            ColumnNameBuilder indexedEntryNameBuilder,
+                            ByteBuffer indexedEntryCollectionKey)
         {
             this.indexValue = indexValue;
             this.indexEntry = indexEntry;
             this.timestamp = timestamp;
             this.indexedKey = indexedKey;
             this.indexedEntryNameBuilder = indexedEntryNameBuilder;
+            this.indexedEntryCollectionKey = indexedEntryCollectionKey;
         }
 
         public ByteBuffer indexedEntryStart()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
new file mode 100644
index 0000000..c2acfc9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.LocalToken;
+
+/**
+ * Index on the collection element of the cell name of a collection.
+ *
+ * A cell indexed by this index will have the general form:
+ *   ck_0 ... ck_n c_name [col_elt] : v
+ * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
+ * collection element that we want to index (which may or may not be there depending
+ * on whether c_name is the collection we're indexing) and v the cell value.
+ *
+ * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
+ * col_elt). The index entry will be:
+ *   - row key will be col_elt value (getIndexedValue()).
+ *   - cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
+ */
+public class CompositesIndexOnCollectionKey extends CompositesIndex
+{
+    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    {
+        int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
+        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
+        List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
+        types.add(SecondaryIndex.keyComparator);
+        for (int i = 0; i < count - 1; i++)
+            types.add(ckTypes.get(i));
+        return CompositeType.getInstance(types);
+    }
+
+    @Override
+    protected AbstractType<?> getIndexKeyComparator()
+    {
+        return ((CollectionType)columnDef.type).nameComparator();
+    }
+
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
+    {
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(column.name());
+        return components[columnDef.position() + 1];
+    }
+
+    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    {
+        int count = 1 + baseCfs.metadata.clusteringColumns().size();
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(columnName);
+        CompositeType.Builder builder = getIndexComparator().builder();
+        builder.add(rowKey);
+        for (int i = 0; i < count - 1; i++)
+            builder.add(components[i]);
+        return builder;
+    }
+
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
+    {
+        int count = 1 + baseCfs.metadata.clusteringColumns().size();
+        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
+
+        ColumnNameBuilder builder = getBaseComparator().builder();
+        for (int i = 0; i < count - 1; i++)
+            builder.add(components[i + 1]);
+
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+    }
+
+    @Override
+    public boolean indexes(ByteBuffer name)
+    {
+        // We index if the CQL3 column name is the one of the collection we index
+        ByteBuffer[] components = getBaseComparator().split(name);
+        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+        return components.length > columnDef.position()
+            && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+    }
+
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    {
+        ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexValue.key).build();
+        Column liveColumn = data.getColumn(bb);
+        return (liveColumn == null || liveColumn.isMarkedForDelete(now));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
new file mode 100644
index 0000000..f416d0e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.LocalToken;
+
+/**
+ * Index the value of a collection cell.
+ *
+ * This is a lot like an index on REGULAR, except that we also need to make
+ * the collection key part of the index entry so that:
+ *   1) we don't have to scan the whole collection at query time to know the
+ *   entry is stale and if it still satisfies the query.
+ *   2) if a collection has multiple time the same value, we need one entry
+ *   for each so that if we delete one of the value only we only delete the
+ *   entry corresponding to that value.
+ */
+public class CompositesIndexOnCollectionValue extends CompositesIndex
+{
+    public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    {
+        int prefixSize = columnDef.position();
+        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 2);
+        types.add(SecondaryIndex.keyComparator);
+        for (int i = 0; i < prefixSize; i++)
+            types.add(((CompositeType)baseMetadata.comparator).types.get(i));
+        types.add(((CollectionType)columnDef.type).nameComparator()); // collection key
+        return CompositeType.getInstance(types);
+    }
+
+    @Override
+    protected AbstractType<?> getIndexKeyComparator()
+    {
+        return ((CollectionType)columnDef.type).valueComparator();
+    }
+
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
+    {
+        return column.value();
+    }
+
+    protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+    {
+        int prefixSize = columnDef.position();
+        CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+        ByteBuffer[] components = baseComparator.split(columnName);
+        assert components.length == baseComparator.types.size();
+        CompositeType.Builder builder = getIndexComparator().builder();
+        builder.add(rowKey);
+        for (int i = 0; i < prefixSize; i++)
+            builder.add(components[i]);
+        builder.add(components[prefixSize + 1]);
+        return builder;
+    }
+
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
+    {
+        int prefixSize = columnDef.position();
+        ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
+        CompositeType.Builder builder = getBaseComparator().builder();
+        for (int i = 0; i < prefixSize; i++)
+            builder.add(components[i + 1]);
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder, components[prefixSize + 1]);
+    }
+
+    @Override
+    public boolean indexes(ByteBuffer name)
+    {
+        ByteBuffer[] components = getBaseComparator().split(name);
+        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+        return components.length > columnDef.position()
+            && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+    }
+
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    {
+        ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexedEntryCollectionKey).build();
+        Column liveColumn = data.getColumn(bb);
+        if (liveColumn == null || liveColumn.isMarkedForDelete(now))
+            return true;
+
+        ByteBuffer liveValue = liveColumn.value();
+        return ((CollectionType)columnDef.type).valueComparator().compare(entry.indexValue.key, liveValue) != 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 0d5d1a5..bcb0dd2 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -135,6 +135,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                  */
                 DecoratedKey currentKey = null;
                 ColumnFamily data = null;
+                ByteBuffer previousPrefix = null;
 
                 while (true)
                 {
@@ -232,6 +233,16 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                         if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
                             continue;
 
+                        // If we've record the previous prefix, it means we're dealing with an index on the collection value. In
+                        // that case, we can have multiple index prefix for the same CQL3 row. In that case, we want to only add
+                        // the CQL3 row once (because requesting the data multiple time would be inefficient but more importantly
+                        // because we shouldn't count the columns multiple times with the lastCounted() call at the end of this
+                        // method).
+                        if (previousPrefix != null && previousPrefix.equals(start))
+                            continue;
+                        else
+                            previousPrefix = null;
+
                         logger.trace("Adding index hit to current row for {}", indexComparator.getString(column.name()));
 
                         // We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
@@ -248,9 +259,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
                             continue;
                         }
 
-                        assert newData != null : "An entry with not data should have been considered stale";
+                        assert newData != null : "An entry with no data should have been considered stale";
+
+                        // We know the entry is not stale and so the entry satisfy the primary clause. So whether
+                        // or not the data satisfies the other clauses, there will be no point to re-check the
+                        // same CQL3 row if we run into another collection value entry for this row.
+                        if (entry.indexedEntryCollectionKey != null)
+                            previousPrefix = start;
 
-                        if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder))
+                        if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder, entry.indexedEntryCollectionKey))
                             continue;
 
                         if (data == null)