You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/12/19 00:31:31 UTC

cassandra git commit: Support indexing key/value entries in map collections

Repository: cassandra
Updated Branches:
  refs/heads/trunk a10a2ba81 -> 3f55c35b8


Support indexing key/value entries in map collections

Patch by Samuel Klock; reviewed by Tyler Hobbs for CASSANDRA-8473


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f55c35b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f55c35b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f55c35b

Branch: refs/heads/trunk
Commit: 3f55c35b8c6bc8240e34573f2fe45709c64e566f
Parents: a10a2ba
Author: Samuel Klock <sk...@akamai.com>
Authored: Thu Dec 18 17:30:26 2014 -0600
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Thu Dec 18 17:30:26 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 pylib/cqlshlib/cql3handling.py                  |   2 +-
 src/java/org/apache/cassandra/cql3/Cql.g        |   9 +-
 .../cassandra/cql3/SingleColumnRelation.java    |  87 ++++-
 .../restrictions/SingleColumnRestriction.java   |  65 +++-
 .../restrictions/SingleColumnRestrictions.java  |   8 +-
 .../cql3/statements/CreateIndexStatement.java   |  64 ++--
 .../cassandra/cql3/statements/IndexTarget.java  |  73 +++-
 .../cassandra/db/filter/ExtendedFilter.java     |  28 +-
 .../cassandra/db/index/SecondaryIndex.java      |   5 +
 .../db/index/composites/CompositesIndex.java    |  19 +-
 .../CompositesIndexIncludingCollectionKey.java  |  89 +++++
 .../CompositesIndexOnCollectionKey.java         |  61 +---
 .../CompositesIndexOnCollectionKeyAndValue.java |  87 +++++
 .../cql3/SecondaryIndexOnMapEntriesTest.java    | 338 +++++++++++++++++++
 15 files changed, 792 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2848d53..c3b62f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Support index key/value entries on map collections (CASSANDRA-8473)
  * Modernize schema tables (CASSANDRA-8261)
  * Support for user-defined aggregation functions (CASSANDRA-8053)
  * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index 84af796..4ee2099 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -616,7 +616,7 @@ syntax_rules += r'''
                     ;
 <whereClause> ::= <relation> ( "AND" <relation> )*
                 ;
-<relation> ::= [rel_lhs]=<cident> ( "=" | "<" | ">" | "<=" | ">=" | "CONTAINS" ( "KEY" )? ) <term>
+<relation> ::= [rel_lhs]=<cident> ( "[" <term> "]" )? ( "=" | "<" | ">" | "<=" | ">=" | "CONTAINS" ( "KEY" )? ) <term>
              | token="TOKEN" "(" [rel_tokname]=<cident>
                                  ( "," [rel_tokname]=<cident> )*
                              ")" ("=" | "<" | ">" | "<=" | ">=") <tokenDefinition>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index ed133e7..2a6e6d0 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -676,9 +676,10 @@ createIndexStatement returns [CreateIndexStatement expr]
     ;
 
 indexIdent returns [IndexTarget.Raw id]
-    : c=cident                { $id = IndexTarget.Raw.valuesOf(c); }
-    | K_KEYS '(' c=cident ')' { $id = IndexTarget.Raw.keysOf(c); }
-    | K_FULL '(' c=cident ')' { $id = IndexTarget.Raw.fullCollection(c); }
+    : c=cident                   { $id = IndexTarget.Raw.valuesOf(c); }
+    | K_KEYS '(' c=cident ')'    { $id = IndexTarget.Raw.keysOf(c); }
+    | K_ENTRIES '(' c=cident ')' { $id = IndexTarget.Raw.keysAndValuesOf(c); }
+    | K_FULL '(' c=cident ')'    { $id = IndexTarget.Raw.fullCollection(c); }
     ;
 
 
@@ -1150,6 +1151,7 @@ relation[List<Relation> clauses]
         { $clauses.add(SingleColumnRelation.createInRelation($name.id, inValues)); }
     | name=cident K_CONTAINS { Operator rt = Operator.CONTAINS; } (K_KEY { rt = Operator.CONTAINS_KEY; })?
         t=term { $clauses.add(new SingleColumnRelation(name, rt, t)); }
+    | name=cident '[' key=term ']' type=relationType t=term { $clauses.add(new SingleColumnRelation(name, key, type, t)); }
     | ids=tupleOfIdentifiers
       ( K_IN
           ( '(' ')'
@@ -1342,6 +1344,7 @@ K_WHERE:       W H E R E;
 K_AND:         A N D;
 K_KEY:         K E Y;
 K_KEYS:        K E Y S;
+K_ENTRIES:     E N T R I E S;
 K_FULL:        F U L L;
 K_INSERT:      I N S E R T;
 K_UPDATE:      U P D A T E;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
index 7817d43..3db1195 100644
--- a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
+++ b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.cql3;
 
 import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -27,10 +28,12 @@ import org.apache.cassandra.cql3.restrictions.Restriction;
 import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction;
 import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 
 /**
  * Relations encapsulate the relationship between an entity of some kind, and
@@ -40,12 +43,14 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse
 public final class SingleColumnRelation extends Relation
 {
     private final ColumnIdentifier.Raw entity;
+    private final Term.Raw mapKey;
     private final Term.Raw value;
     private final List<Term.Raw> inValues;
 
-    private SingleColumnRelation(ColumnIdentifier.Raw entity, Operator type, Term.Raw value, List<Term.Raw> inValues)
+    private SingleColumnRelation(ColumnIdentifier.Raw entity, Term.Raw mapKey, Operator type, Term.Raw value, List<Term.Raw> inValues)
     {
         this.entity = entity;
+        this.mapKey = mapKey;
         this.relationType = type;
         this.value = value;
         this.inValues = inValues;
@@ -55,17 +60,30 @@ public final class SingleColumnRelation extends Relation
      * Creates a new relation.
      *
      * @param entity the kind of relation this is; what the term is being compared to.
+     * @param mapKey the key into the entity identifying the value the term is being compared to.
+     * @param type the type that describes how this entity relates to the value.
+     * @param value the value being compared.
+     */
+    public SingleColumnRelation(ColumnIdentifier.Raw entity, Term.Raw mapKey, Operator type, Term.Raw value)
+    {
+        this(entity, mapKey, type, value, null);
+    }
+
+    /**
+     * Creates a new relation.
+     *
+     * @param entity the kind of relation this is; what the term is being compared to.
      * @param type the type that describes how this entity relates to the value.
      * @param value the value being compared.
      */
     public SingleColumnRelation(ColumnIdentifier.Raw entity, Operator type, Term.Raw value)
     {
-        this(entity, type, value, null);
+        this(entity, null, type, value);
     }
 
     public static SingleColumnRelation createInRelation(ColumnIdentifier.Raw entity, List<Term.Raw> inValues)
     {
-        return new SingleColumnRelation(entity, Operator.IN, null, inValues);
+        return new SingleColumnRelation(entity, null, Operator.IN, null, inValues);
     }
 
     public ColumnIdentifier.Raw getEntity()
@@ -73,6 +91,11 @@ public final class SingleColumnRelation extends Relation
         return entity;
     }
 
+    public Term.Raw getMapKey()
+    {
+        return mapKey;
+    }
+
     @Override
     protected Term toTerm(List<? extends ColumnSpecification> receivers,
                           Raw raw,
@@ -92,7 +115,7 @@ public final class SingleColumnRelation extends Relation
         switch (relationType)
         {
             case GT: return new SingleColumnRelation(entity, Operator.GTE, value);
-            case LT:  return new SingleColumnRelation(entity, Operator.LTE, value);
+            case LT: return new SingleColumnRelation(entity, Operator.LTE, value);
             default: return this;
         }
     }
@@ -100,10 +123,14 @@ public final class SingleColumnRelation extends Relation
     @Override
     public String toString()
     {
+        String entityAsString = entity.toString();
+        if (mapKey != null)
+            entityAsString = String.format("%s[%s]", entityAsString, mapKey);
+
         if (isIN())
-            return String.format("%s IN %s", entity, inValues);
+            return String.format("%s IN %s", entityAsString, inValues);
 
-        return String.format("%s %s %s", entity, relationType, value);
+        return String.format("%s %s %s", entityAsString, relationType, value);
     }
 
     @Override
@@ -111,8 +138,15 @@ public final class SingleColumnRelation extends Relation
                                            VariableSpecifications boundNames) throws InvalidRequestException
     {
         ColumnDefinition columnDef = toColumnDefinition(cfm, entity);
-        Term term = toTerm(toReceivers(cfm, columnDef), value, cfm.ksName, boundNames);
-        return new SingleColumnRestriction.EQ(columnDef, term);
+        if (mapKey == null)
+        {
+            Term term = toTerm(toReceivers(cfm, columnDef), value, cfm.ksName, boundNames);
+            return new SingleColumnRestriction.EQ(columnDef, term);
+        }
+        List<? extends ColumnSpecification> receivers = toReceivers(cfm, columnDef);
+        Term entryKey = toTerm(Collections.singletonList(receivers.get(0)), mapKey, cfm.ksName, boundNames);
+        Term entryValue = toTerm(Collections.singletonList(receivers.get(1)), value, cfm.ksName, boundNames);
+        return new SingleColumnRestriction.Contains(columnDef, entryKey, entryValue);
     }
 
     @Override
@@ -195,21 +229,54 @@ public final class SingleColumnRelation extends Relation
 
         checkFalse(isContainsKey() && !(receiver.type instanceof MapType), "Cannot use CONTAINS KEY on non-map column %s", receiver.name);
 
+        if (mapKey != null)
+        {
+            checkFalse(receiver.type instanceof ListType, "Indexes on list entries (%s[index] = value) are not currently supported.", receiver.name);
+            checkTrue(receiver.type instanceof MapType, "Column %s cannot be used as a map", receiver.name);
+            checkTrue(receiver.type.isMultiCell(), "Map-entry equality predicates on frozen map column %s are not supported", receiver.name);
+            checkTrue(isEQ(), "Only EQ relations are supported on map entries");
+        }
+
         if (receiver.type.isCollection())
         {
             // We don't support relations against entire collections (unless they're frozen), like "numbers = {1, 2, 3}"
-            checkFalse(receiver.type.isMultiCell() && !(isContainsKey() || isContains()),
+            checkFalse(receiver.type.isMultiCell() && !isLegalRelationForNonFrozenCollection(),
                        "Collection column '%s' (%s) cannot be restricted by a '%s' relation",
                        receiver.name,
                        receiver.type.asCQL3Type(),
                        operator());
 
             if (isContainsKey() || isContains())
-                receiver = ((CollectionType<?>) receiver.type).makeCollectionReceiver(receiver, isContainsKey());
+            {
+                receiver = makeCollectionReceiver(receiver, isContainsKey());
+            }
+            else if (receiver.type.isMultiCell() && mapKey != null && isEQ())
+            {
+                List<ColumnSpecification> receivers = new ArrayList<>(2);
+                receivers.add(makeCollectionReceiver(receiver, true));
+                receivers.add(makeCollectionReceiver(receiver, false));
+                return receivers;
+            }
         }
+
         return Collections.singletonList(receiver);
     }
 
+    private ColumnSpecification makeCollectionReceiver(ColumnSpecification receiver, boolean forKey)
+    {
+        return ((CollectionType<?>) receiver.type).makeCollectionReceiver(receiver, forKey);
+    }
+
+    private boolean isLegalRelationForNonFrozenCollection()
+    {
+        return isContainsKey() || isContains() || isMapEntryEquality();
+    }
+
+    private boolean isMapEntryEquality()
+    {
+        return mapKey != null && isEQ();
+    }
+
     /**
      * Checks if the specified column is the last column of the partition key.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 0f0f9c8..44a9e68 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.cassandra.config.ColumnDefinition;
@@ -32,6 +34,7 @@ import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.IndexExpression;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@ -324,11 +327,13 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
         }
     }
 
-    // This holds both CONTAINS and CONTAINS_KEY restriction because we might want to have both of them.
+    // This holds CONTAINS, CONTAINS_KEY, and map[key] = value restrictions because we might want to have any combination of them.
     public static final class Contains extends SingleColumnRestriction
     {
         private List<Term> values = new ArrayList<>(); // for CONTAINS
-        private List<Term> keys = new ArrayList<>();  // for CONTAINS_KEY
+        private List<Term> keys = new ArrayList<>(); // for CONTAINS_KEY
+        private List<Term> entryKeys = new ArrayList<>(); // for map[key] = value
+        private List<Term> entryValues = new ArrayList<>(); // for map[key] = value
 
         public Contains(ColumnDefinition columnDef, Term t, boolean isKey)
         {
@@ -339,6 +344,13 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
                 values.add(t);
         }
 
+        public Contains(ColumnDefinition columnDef, Term mapKey, Term mapValue)
+        {
+            super(columnDef);
+            entryKeys.add(mapKey);
+            entryValues.add(mapValue);
+        }
+
         @Override
         public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
         {
@@ -355,7 +367,7 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
         public Restriction mergeWith(Restriction otherRestriction) throws InvalidRequestException
         {
             checkTrue(otherRestriction.isContains(),
-                      "Collection column %s can only be restricted by CONTAINS or CONTAINS KEY",
+                      "Collection column %s can only be restricted by CONTAINS, CONTAINS KEY, or map-entry equality",
                       getColumnDef().name);
 
             SingleColumnRestriction.Contains newContains = new Contains(getColumnDef());
@@ -371,15 +383,18 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
                                          QueryOptions options)
                                          throws InvalidRequestException
         {
-            for (ByteBuffer value : values(options))
+            addExpressionsFor(expressions, values(options), Operator.CONTAINS);
+            addExpressionsFor(expressions, keys(options), Operator.CONTAINS_KEY);
+            addExpressionsFor(expressions, entries(options), Operator.EQ);
+        }
+
+        private void addExpressionsFor(List<IndexExpression> target, List<ByteBuffer> values,
+                                       Operator op) throws InvalidRequestException
+        {
+            for (ByteBuffer value : values)
             {
                 validateIndexedValue(columnDef, value);
-                expressions.add(new IndexExpression(columnDef.name.bytes, Operator.CONTAINS, value));
-            }
-            for (ByteBuffer key : keys(options))
-            {
-                validateIndexedValue(columnDef, key);
-                expressions.add(new IndexExpression(columnDef.name.bytes, Operator.CONTAINS_KEY, key));
+                target.add(new IndexExpression(columnDef.name.bytes, op, value));
             }
         }
 
@@ -394,6 +409,9 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
             if (numberOfKeys() > 0)
                 supported |= index.supportsOperator(Operator.CONTAINS_KEY);
 
+            if (numberOfEntries() > 0)
+                supported |= index.supportsOperator(Operator.EQ);
+
             return supported;
         }
 
@@ -407,16 +425,22 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
             return keys.size();
         }
 
+        public int numberOfEntries()
+        {
+            return entryKeys.size();
+        }
+
         @Override
         public boolean usesFunction(String ksName, String functionName)
         {
-            return usesFunction(values, ksName, functionName) || usesFunction(keys, ksName, functionName);
+            return usesFunction(values, ksName, functionName) || usesFunction(keys, ksName, functionName) ||
+                   usesFunction(entryKeys, ksName, functionName) || usesFunction(entryValues, ksName, functionName);
         }
 
         @Override
         public String toString()
         {
-            return String.format("CONTAINS(values=%s, keys=%s)", values, keys);
+            return String.format("CONTAINS(values=%s, keys=%s, entryKeys=%s, entryValues=%s)", values, keys, entryKeys, entryValues);
         }
 
         @Override
@@ -436,11 +460,26 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
         {
             throw new UnsupportedOperationException();
         }
+
         private List<ByteBuffer> keys(QueryOptions options) throws InvalidRequestException
         {
             return bindAndGet(keys, options);
         }
 
+        private List<ByteBuffer> entries(QueryOptions options) throws InvalidRequestException
+        {
+            List<ByteBuffer> entryBuffers = new ArrayList<>(entryKeys.size());
+            List<ByteBuffer> keyBuffers = bindAndGet(entryKeys, options);
+            List<ByteBuffer> valueBuffers = bindAndGet(entryValues, options);
+            for (int i = 0; i < entryKeys.size(); i++)
+            {
+                if (valueBuffers.get(i) == null)
+                    throw new InvalidRequestException("Unsupported null value for map-entry equality");
+                entryBuffers.add(CompositeType.build(keyBuffers.get(i), valueBuffers.get(i)));
+            }
+            return entryBuffers;
+        }
+
         /**
          * Binds the query options to the specified terms and returns the resulting values.
          *
@@ -467,6 +506,8 @@ public abstract class SingleColumnRestriction extends AbstractRestriction
         {
             to.values.addAll(from.values);
             to.keys.addAll(from.keys);
+            to.entryKeys.addAll(from.entryKeys);
+            to.entryValues.addAll(from.entryValues);
         }
 
         private Contains(ColumnDefinition columnDef)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
index ec74cc9..b9ffc68 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestrictions.java
@@ -188,10 +188,10 @@ final class SingleColumnRestrictions implements Restrictions
     }
 
     /**
-     * Checks if the restrictions contains multiple contains or contains key.
+     * Checks if the restrictions contains multiple contains, contains key, or map[key] = value.
      *
-     * @return <code>true</code> if the restrictions contains multiple contains or contains key.,
-     * <code>false</code> otherwise
+     * @return <code>true</code> if the restrictions contains multiple contains, contains key, or ,
+     * map[key] = value; <code>false</code> otherwise
      */
     public final boolean hasMultipleContains()
     {
@@ -201,7 +201,7 @@ final class SingleColumnRestrictions implements Restrictions
             if (restriction.isContains())
             {
                 Contains contains = (Contains) restriction;
-                numberOfContains += (contains.numberOfValues() + contains.numberOfKeys());
+                numberOfContains += (contains.numberOfValues() + contains.numberOfKeys() + contains.numberOfEntries());
             }
         }
         return numberOfContains > 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index ac19f5c..ee08aca 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -80,34 +80,28 @@ public class CreateIndexStatement extends SchemaAlteringStatement
 
         boolean isMap = cd.type instanceof MapType;
         boolean isFrozenCollection = cd.type.isCollection() && !cd.type.isMultiCell();
-        if (target.isCollectionKeys)
-        {
-            if (!isMap)
-                throw new InvalidRequestException("Cannot create index on keys of column " + target + " with non-map type");
-            if (!cd.type.isMultiCell())
-                throw new InvalidRequestException("Cannot create index on keys of frozen<map> column " + target);
-        }
-        else if (target.isFullCollection)
+
+        if (isFrozenCollection)
         {
-            if (!isFrozenCollection)
-                throw new InvalidRequestException("full() indexes can only be created on frozen collections");
+            validateForFrozenCollection(target);
         }
-        else if (isFrozenCollection)
+        else
         {
-            throw new InvalidRequestException("Frozen collections currently only support full-collection indexes. " +
-                                              "For example, 'CREATE INDEX ON <table>(full(<columnName>))'.");
+            validateNotFullIndex(target);
+            validateIsValuesIndexIfTargetColumnNotCollection(cd, target);
+            validateTargetColumnIsMapIfIndexInvolvesKeys(isMap, target);
         }
 
         if (cd.getIndexType() != null)
         {
-            boolean previousIsKeys = cd.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME);
-            if (isMap && target.isCollectionKeys != previousIsKeys)
+            IndexTarget.TargetType prevType = IndexTarget.TargetType.fromColumnDefinition(cd);
+            if (isMap && target.type != prevType)
             {
-                String msg = "Cannot create index on %s %s, an index on %s %s already exists and indexing "
-                           + "a map on both keys and values at the same time is not currently supported";
+                String msg = "Cannot create index on %s(%s): an index on %s(%s) already exists and indexing " +
+                             "a map on more than one dimension at the same time is not currently supported";
                 throw new InvalidRequestException(String.format(msg,
-                                                                target.column, target.isCollectionKeys ? "keys" : "values",
-                                                                target.column, previousIsKeys ? "keys" : "values"));
+                                                                target.type, target.column,
+                                                                prevType, target.column));
             }
 
             if (ifNotExists)
@@ -135,6 +129,35 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             throw new InvalidRequestException(String.format("Cannot create secondary index on partition key column %s", target.column));
     }
 
+    private void validateForFrozenCollection(IndexTarget target) throws InvalidRequestException
+    {
+        if (target.type != IndexTarget.TargetType.FULL)
+            throw new InvalidRequestException(String.format("Cannot create index on %s of frozen<map> column %s", target.type, target.column));
+    }
+
+    private void validateNotFullIndex(IndexTarget target) throws InvalidRequestException
+    {
+        if (target.type == IndexTarget.TargetType.FULL)
+            throw new InvalidRequestException("full() indexes can only be created on frozen collections");
+    }
+
+    private void validateIsValuesIndexIfTargetColumnNotCollection(ColumnDefinition cd, IndexTarget target) throws InvalidRequestException
+    {
+        if (!cd.type.isCollection() && target.type != IndexTarget.TargetType.VALUES)
+            throw new InvalidRequestException(String.format("Cannot create index on %s of column %s; only non-frozen collections support %s indexes",
+                                                            target.type, target.column, target.type));
+    }
+
+    private void validateTargetColumnIsMapIfIndexInvolvesKeys(boolean isMap, IndexTarget target) throws InvalidRequestException
+    {
+        if (target.type == IndexTarget.TargetType.KEYS || target.type == IndexTarget.TargetType.KEYS_AND_VALUES)
+        {
+            if (!isMap)
+                throw new InvalidRequestException(String.format("Cannot create index on %s of column %s with non-map type",
+                                                                target.type, target.column));
+        }
+    }
+
     public boolean announceMigration(boolean isLocalOnly) throws RequestValidationException
     {
         CFMetaData cfm = Schema.instance.getCFMetaData(keyspace(), columnFamily()).copy();
@@ -156,8 +179,7 @@ public class CreateIndexStatement extends SchemaAlteringStatement
             // to also index map keys, so we record that this is the values we index to make our
             // lives easier then.
             if (cd.type.isCollection() && cd.type.isMultiCell())
-                options = ImmutableMap.of(target.isCollectionKeys ? SecondaryIndex.INDEX_KEYS_OPTION_NAME
-                                                                  : SecondaryIndex.INDEX_VALUES_OPTION_NAME, "");
+                options = ImmutableMap.of(target.type.indexOption(), "");
             cd.setIndexType(IndexType.COMPOSITES, options);
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
index eeee907..d602388 100644
--- a/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
+++ b/src/java/org/apache/cassandra/cql3/statements/IndexTarget.java
@@ -17,53 +17,98 @@
  */
 package org.apache.cassandra.cql3.statements;
 
+import java.util.Map;
+
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.index.SecondaryIndex;
 
 public class IndexTarget
 {
     public final ColumnIdentifier column;
-    public final boolean isCollectionKeys;
-    public final boolean isFullCollection;
+    public final TargetType type;
 
-    private IndexTarget(ColumnIdentifier column, boolean isCollectionKeys, boolean isFullCollection)
+    private IndexTarget(ColumnIdentifier column, TargetType type)
     {
         this.column = column;
-        this.isCollectionKeys = isCollectionKeys;
-        this.isFullCollection = isFullCollection;
+        this.type = type;
     }
 
     public static class Raw
     {
         private final ColumnIdentifier.Raw column;
-        private final boolean isCollectionKeys;
-        private final boolean isFullCollection;
+        private final TargetType type;
 
-        private Raw(ColumnIdentifier.Raw column, boolean isCollectionKeys, boolean isFullCollection)
+        private Raw(ColumnIdentifier.Raw column, TargetType type)
         {
             this.column = column;
-            this.isCollectionKeys = isCollectionKeys;
-            this.isFullCollection = isFullCollection;
+            this.type = type;
         }
 
         public static Raw valuesOf(ColumnIdentifier.Raw c)
         {
-            return new Raw(c, false, false);
+            return new Raw(c, TargetType.VALUES);
         }
 
         public static Raw keysOf(ColumnIdentifier.Raw c)
         {
-            return new Raw(c, true, false);
+            return new Raw(c, TargetType.KEYS);
+        }
+
+        public static Raw keysAndValuesOf(ColumnIdentifier.Raw c)
+        {
+            return new Raw(c, TargetType.KEYS_AND_VALUES);
         }
 
         public static Raw fullCollection(ColumnIdentifier.Raw c)
         {
-            return new Raw(c, false, true);
+            return new Raw(c, TargetType.FULL);
         }
 
         public IndexTarget prepare(CFMetaData cfm)
         {
-            return new IndexTarget(column.prepare(cfm), isCollectionKeys, isFullCollection);
+            return new IndexTarget(column.prepare(cfm), type);
+        }
+    }
+
+    public static enum TargetType
+    {
+        VALUES, KEYS, KEYS_AND_VALUES, FULL;
+
+        public String toString()
+        {
+            switch (this)
+            {
+                case KEYS: return "keys";
+                case KEYS_AND_VALUES: return "entries";
+                case FULL: return "full";
+                default: return "values";
+            }
+        }
+
+        public String indexOption()
+        {
+            switch (this)
+            {
+                case KEYS: return SecondaryIndex.INDEX_KEYS_OPTION_NAME;
+                case KEYS_AND_VALUES: return SecondaryIndex.INDEX_ENTRIES_OPTION_NAME;
+                case VALUES: return SecondaryIndex.INDEX_VALUES_OPTION_NAME;
+                default: throw new AssertionError();
+            }
+        }
+
+        public static TargetType fromColumnDefinition(ColumnDefinition cd)
+        {
+            Map<String, String> options = cd.getIndexOptions();
+            if (options.containsKey(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
+                return KEYS;
+            else if (options.containsKey(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
+                return KEYS_AND_VALUES;
+            else if (cd.type.isCollection() && !cd.type.isMultiCell())
+                return FULL;
+            else
+                return VALUES;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 6f2760a..2ae22b8 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -403,23 +403,21 @@ public abstract class ExtendedFilter
                 return false;
             }
 
-            switch (type.kind)
+            assert type.kind == CollectionType.Kind.MAP;
+            if (expr.isContainsKey())
+                return data.getColumn(data.getComparator().create(prefix, def, expr.value)) != null;
+
+            Iterator<Cell> iter = data.iterator(new ColumnSlice[]{ data.getComparator().create(prefix, def).slice() });
+            ByteBuffer key = CompositeType.extractComponent(expr.value, 0);
+            ByteBuffer value = CompositeType.extractComponent(expr.value, 1);
+            while (iter.hasNext())
             {
-                case LIST:
-                    assert collectionElement != null;
-                    return type.valueComparator().compare(data.getColumn(data.getComparator().create(prefix, def, collectionElement)).value(), expr.value) == 0;
-                case SET:
-                    return data.getColumn(data.getComparator().create(prefix, def, expr.value)) != null;
-                case MAP:
-                    if (expr.isContainsKey())
-                    {
-                        return data.getColumn(data.getComparator().create(prefix, def, expr.value)) != null;
-                    }
-
-                    assert collectionElement != null;
-                    return type.valueComparator().compare(data.getColumn(data.getComparator().create(prefix, def, collectionElement)).value(), expr.value) == 0;
+                Cell next = iter.next();
+                if (type.nameComparator().compare(next.name().collectionElement(), key) == 0 &&
+                    type.valueComparator().compare(next.value(), value) == 0)
+                    return true;
             }
-            throw new AssertionError();
+            return false;
         }
 
         private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, Composite prefix)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 2df7f91..c1036a6 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -75,6 +75,11 @@ public abstract class SecondaryIndex
      */
     public static final String INDEX_VALUES_OPTION_NAME = "index_values";
 
+    /**
+     * The name of the option used to specify that the index is on the collection (map) entries.
+     */
+    public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
+
     public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder()
                                                       ? BytesType.instance
                                                       : new LocalByPartionerType(StorageService.getPartitioner());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index ec965fd..e88d456 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -67,9 +67,12 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
                 case SET:
                     return new CompositesIndexOnCollectionKey();
                 case MAP:
-                    return cfDef.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME)
-                         ? new CompositesIndexOnCollectionKey()
-                         : new CompositesIndexOnCollectionValue();
+                    if (cfDef.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
+                        return new CompositesIndexOnCollectionKey();
+                    else if (cfDef.hasIndexOption(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
+                        return new CompositesIndexOnCollectionKeyAndValue();
+                    else
+                        return new CompositesIndexOnCollectionValue();
             }
         }
 
@@ -99,9 +102,12 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
                 case SET:
                     return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
                 case MAP:
-                    return cfDef.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME)
-                         ? CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef)
-                         : CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
+                    if (cfDef.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
+                        return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
+                    else if (cfDef.hasIndexOption(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
+                        return CompositesIndexOnCollectionKeyAndValue.buildIndexComparator(baseMetadata, cfDef);
+                    else
+                        return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
             }
         }
 
@@ -162,6 +168,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
         {
             options.remove(SecondaryIndex.INDEX_VALUES_OPTION_NAME);
             options.remove(SecondaryIndex.INDEX_KEYS_OPTION_NAME);
+            options.remove(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME);
         }
 
         if (!options.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
new file mode 100644
index 0000000..402ea05
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.marshal.*;
+
+/**
+ * Common superclass for indexes that capture collection keys, including
+ * indexes on such keys themselves.
+ *
+ * A cell indexed by this index will have the general form:
+ *   ck_0 ... ck_n c_name [col_elt] : v
+ * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
+ * collection element that we want to index (which may or may not be there depending
+ * on whether c_name is the collection we're indexing), and v the cell value.
+ *
+ * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
+ * col_elt). The index entry can be viewed in the following way:
+ *   - the row key is determined by subclasses of this type.
+ *   - the cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
+ */
+public abstract class CompositesIndexIncludingCollectionKey extends CompositesIndex
+{
+    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    {
+        int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
+        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
+        types.add(SecondaryIndex.keyComparator);
+        for (int i = 0; i < count - 1; i++)
+            types.add(baseMetadata.comparator.subtype(i));
+        return new CompoundDenseCellNameType(types);
+    }
+
+    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
+    {
+        int count = 1 + baseCfs.metadata.clusteringColumns().size();
+        CBuilder builder = getIndexComparator().builder();
+        builder.add(rowKey);
+        for (int i = 0; i < Math.min(cellName.size(), count - 1); i++)
+            builder.add(cellName.get(i));
+        return builder.build();
+    }
+
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+    {
+        int count = 1 + baseCfs.metadata.clusteringColumns().size();
+        CBuilder builder = baseCfs.getComparator().builder();
+        for (int i = 0; i < count - 1; i++)
+            builder.add(indexEntry.name().get(i + 1));
+        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
+    }
+
+    @Override
+    public boolean indexes(CellName name)
+    {
+        // We index if the CQL3 column name is the one of the collection we index
+        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+        return name.size() > columnDef.position()
+            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
index 81982bb..1e40710 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
@@ -18,47 +18,20 @@
 package org.apache.cassandra.db.index.composites;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CBuilder;
 import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.*;
 
 /**
  * Index on the collection element of the cell name of a collection.
  *
- * A cell indexed by this index will have the general form:
- *   ck_0 ... ck_n c_name [col_elt] : v
- * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
- * collection element that we want to index (which may or may not be there depending
- * on whether c_name is the collection we're indexing) and v the cell value.
- *
- * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
- * col_elt). The index entry will be:
- *   - row key will be col_elt value (getIndexedValue()).
- *   - cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
+ * The row keys for this index are given by the collection element for
+ * indexed columns.
  */
-public class CompositesIndexOnCollectionKey extends CompositesIndex
+public class CompositesIndexOnCollectionKey extends CompositesIndexIncludingCollectionKey
 {
-    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
-    {
-        int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
-        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
-        types.add(SecondaryIndex.keyComparator);
-        for (int i = 0; i < count - 1; i++)
-            types.add(baseMetadata.comparator.subtype(i));
-        return new CompoundDenseCellNameType(types);
-    }
-
     @Override
     protected AbstractType<?> getIndexKeyComparator()
     {
@@ -70,25 +43,6 @@ public class CompositesIndexOnCollectionKey extends CompositesIndex
         return cell.name().get(columnDef.position() + 1);
     }
 
-    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
-    {
-        int count = 1 + baseCfs.metadata.clusteringColumns().size();
-        CBuilder builder = getIndexComparator().builder();
-        builder.add(rowKey);
-        for (int i = 0; i < Math.min(cellName.size(), count - 1); i++)
-            builder.add(cellName.get(i));
-        return builder.build();
-    }
-
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
-    {
-        int count = 1 + baseCfs.metadata.clusteringColumns().size();
-        CBuilder builder = baseCfs.getComparator().builder();
-        for (int i = 0; i < count - 1; i++)
-            builder.add(indexEntry.name().get(i + 1));
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
-    }
-
     @Override
     public boolean supportsOperator(Operator operator)
     {
@@ -96,15 +50,6 @@ public class CompositesIndexOnCollectionKey extends CompositesIndex
                 operator == Operator.CONTAINS && columnDef.type instanceof SetType;
     }
 
-    @Override
-    public boolean indexes(CellName name)
-    {
-        // We index if the CQL3 column name is the one of the collection we index
-        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return name.size() > columnDef.position()
-            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
-    }
-
     public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
     {
         CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, entry.indexValue.getKey());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
new file mode 100644
index 0000000..e18ea4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.marshal.*;
+
+/**
+ * Index on the element and value of cells participating in a collection.
+ *
+ * The row keys for this index are a composite of the collection element
+ * and value of indexed columns.
+ */
+public class CompositesIndexOnCollectionKeyAndValue extends CompositesIndexIncludingCollectionKey
+{
+    @Override
+    protected AbstractType<?> getIndexKeyComparator()
+    {
+        CollectionType colType = (CollectionType)columnDef.type;
+        return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());
+    }
+
+    @Override
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell)
+    {
+        final ByteBuffer key = cell.name().get(columnDef.position() + 1);
+        final ByteBuffer value = cell.value();
+        return CompositeType.build(key, value);
+    }
+
+    @Override
+    public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+    {
+        Cell cell = extractTargetCell(entry, data);
+        if (cellIsDead(cell, now))
+            return true;
+        ByteBuffer indexCollectionValue = extractCollectionValue(entry);
+        ByteBuffer targetCollectionValue = cell.value();
+        AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
+        return valueComparator.compare(indexCollectionValue, targetCollectionValue) != 0;
+    }
+
+    private Cell extractTargetCell(IndexedEntry entry, ColumnFamily data)
+    {
+        ByteBuffer collectionKey = extractCollectionKey(entry);
+        CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, collectionKey);
+        return data.getColumn(name);
+    }
+
+    private ByteBuffer extractCollectionKey(IndexedEntry entry)
+    {
+        return extractIndexKeyComponent(entry, 0);
+    }
+
+    private ByteBuffer extractIndexKeyComponent(IndexedEntry entry, int component)
+    {
+        return ((CompositeType)getIndexKeyComparator()).extractComponent(entry.indexValue.getKey(), component);
+    }
+
+    private ByteBuffer extractCollectionValue(IndexedEntry entry)
+    {
+        return extractIndexKeyComponent(entry, 1);
+    }
+
+    private boolean cellIsDead(Cell cell, long now)
+    {
+        return cell == null || !cell.isLive(now);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f55c35b/test/unit/org/apache/cassandra/cql3/SecondaryIndexOnMapEntriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/SecondaryIndexOnMapEntriesTest.java b/test/unit/org/apache/cassandra/cql3/SecondaryIndexOnMapEntriesTest.java
new file mode 100644
index 0000000..fa40e0a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/SecondaryIndexOnMapEntriesTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SecondaryIndexOnMapEntriesTest extends CQLTester
+{
+    @Test
+    public void testShouldNotCreateIndexOnFrozenMaps() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k TEXT PRIMARY KEY, v FROZEN<MAP<TEXT, TEXT>>)");
+        assertIndexInvalidForColumn("v");
+    }
+
+    @Test
+    public void testShouldNotCreateIndexOnNonMapTypes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k TEXT PRIMARY KEY, i INT, t TEXT, b BLOB, s SET<TEXT>, l LIST<TEXT>, tu TUPLE<TEXT>)");
+        assertIndexInvalidForColumn("i");
+        assertIndexInvalidForColumn("t");
+        assertIndexInvalidForColumn("b");
+        assertIndexInvalidForColumn("s");
+        assertIndexInvalidForColumn("l");
+        assertIndexInvalidForColumn("tu");
+    }
+
+    @Test
+    public void testShouldValidateMapKeyAndValueTypes() throws Throwable
+    {
+        createSimpleTableAndIndex();
+
+        String query = "SELECT * FROM %s WHERE v[?] = ?";
+        Object validKey = "valid key";
+        Object invalidKey = 31415;
+        Object validValue = 31415;
+        Object invalidValue = "invalid value";
+        assertInvalid(query, invalidKey, invalidValue);
+        assertInvalid(query, invalidKey, validValue);
+        assertInvalid(query, validKey, invalidValue);
+        assertReturnsNoRows(query, validKey, validValue);
+    }
+
+    @Test
+    public void testShouldFindRowsMatchingSingleEqualityRestriction() throws Throwable
+    {
+        createSimpleTableAndIndex();
+        Object[] foo = insertIntoSimpleTable("foo", map("a", 1,
+                                                        "c", 3));
+        Object[] bar = insertIntoSimpleTable("bar", map("a", 1,
+                                                        "b", 2));
+        Object[] baz = insertIntoSimpleTable("baz", map("b", 2,
+                                                        "c", 5,
+                                                        "d", 4));
+        Object[] qux = insertIntoSimpleTable("qux", map("b", 2,
+                                                        "d", 4));
+
+        assertRowsForConditions(entry("a", 1), bar, foo);
+        assertRowsForConditions(entry("b", 2), bar, baz, qux);
+        assertRowsForConditions(entry("c", 3), foo);
+        assertRowsForConditions(entry("c", 5), baz);
+        assertRowsForConditions(entry("d", 4), baz, qux);
+    }
+
+    @Test
+    public void testRequireFilteringDirectiveIfMultipleRestrictionsSpecified() throws Throwable
+    {
+        createSimpleTableAndIndex();
+        String baseQuery = "SELECT * FROM %s WHERE v['foo'] = 31415 AND v['baz'] = 31416";
+        assertInvalid(baseQuery);
+        assertReturnsNoRows(baseQuery + " ALLOW FILTERING");
+    }
+
+    @Test
+    public void testShouldFindRowsMatchingMultipleEqualityRestrictions() throws Throwable
+    {
+        createSimpleTableAndIndex();
+
+        Object[] foo = insertIntoSimpleTable("foo", map("k1", 1));
+        Object[] bar = insertIntoSimpleTable("bar", map("k1", 1,
+                                                        "k2", 2));
+        Object[] baz = insertIntoSimpleTable("baz", map("k2", 2,
+                                                        "k3", 3));
+        Object[] qux = insertIntoSimpleTable("qux", map("k2", 2,
+                                                        "k3", 3,
+                                                        "k4", 4));
+
+        assertRowsForConditions(entry("k1", 1),
+                                bar, foo);
+        assertRowsForConditions(entry("k1", 1).entry("k2", 2),
+                                bar);
+        assertNoRowsForConditions(entry("k1", 1).entry("k2", 2).entry("k3", 3));
+        assertRowsForConditions(entry("k2", 2).entry("k3", 3),
+                                baz, qux);
+        assertRowsForConditions(entry("k2", 2).entry("k3", 3).entry("k4", 4),
+                                qux);
+        assertRowsForConditions(entry("k3", 3).entry("k4", 4),
+                                qux);
+        assertNoRowsForConditions(entry("k3", 3).entry("k4", 4).entry("k5", 5));
+    }
+
+    @Test
+    public void testShouldFindRowsMatchingEqualityAndContainsRestrictions() throws Throwable
+    {
+        createSimpleTableAndIndex();
+
+        Object[] foo = insertIntoSimpleTable("foo", map("common", 31415,
+                                                        "k1", 1,
+                                                        "k2", 2,
+                                                        "k3", 3));
+        Object[] bar = insertIntoSimpleTable("bar", map("common", 31415,
+                                                        "k3", 3,
+                                                        "k4", 4,
+                                                        "k5", 5));
+        Object[] baz = insertIntoSimpleTable("baz", map("common", 31415,
+                                                        "k5", 5,
+                                                        "k6", 6,
+                                                        "k7", 7));
+
+        assertRowsForConditions(entry("common", 31415),
+                                bar, baz, foo);
+        assertRowsForConditions(entry("common", 31415).key("k1"),
+                                foo);
+        assertRowsForConditions(entry("common", 31415).key("k2"),
+                                foo);
+        assertRowsForConditions(entry("common", 31415).key("k3"),
+                                bar, foo);
+        assertRowsForConditions(entry("common", 31415).key("k3").value(2),
+                                foo);
+        assertRowsForConditions(entry("common", 31415).key("k3").value(3),
+                                bar, foo);
+        assertRowsForConditions(entry("common", 31415).key("k3").value(4),
+                                bar);
+        assertRowsForConditions(entry("common", 31415).key("k3").key("k5"),
+                                bar);
+        assertRowsForConditions(entry("common", 31415).key("k5"),
+                                bar, baz);
+        assertRowsForConditions(entry("common", 31415).key("k5").value(4),
+                                bar);
+        assertRowsForConditions(entry("common", 31415).key("k5").value(5),
+                                bar, baz);
+        assertRowsForConditions(entry("common", 31415).key("k5").value(6),
+                                baz);
+        assertNoRowsForConditions(entry("common", 31415).key("k5").value(8));
+    }
+
+    @Test
+    public void testShouldNotAcceptUnsupportedRelationsOnEntries() throws Throwable
+    {
+        createSimpleTableAndIndex();
+        assertInvalidRelation("< 31415");
+        assertInvalidRelation("<= 31415");
+        assertInvalidRelation("> 31415");
+        assertInvalidRelation(">= 31415");
+        assertInvalidRelation("IN (31415, 31416, 31417)");
+        assertInvalidRelation("CONTAINS 31415");
+        assertInvalidRelation("CONTAINS KEY 'foo'");
+    }
+
+    @Test
+    public void testShouldRecognizeAlteredOrDeletedMapEntries() throws Throwable
+    {
+        createSimpleTableAndIndex();
+        Object[] foo = insertIntoSimpleTable("foo", map("common", 31415,
+                                                        "target", 8192));
+        Object[] bar = insertIntoSimpleTable("bar", map("common", 31415,
+                                                        "target", 8192));
+        Object[] baz = insertIntoSimpleTable("baz", map("common", 31415,
+                                                        "target", 8192));
+
+        assertRowsForConditions(entry("target", 8192),
+                                bar, baz, foo);
+        baz = updateMapInSimpleTable(baz, "target", 4096);
+        assertRowsForConditions(entry("target", 8192),
+                                bar, foo);
+        bar = updateMapInSimpleTable(bar, "target", null);
+        assertRowsForConditions(entry("target", 8192),
+                                foo);
+        execute("DELETE FROM %s WHERE k = 'foo'");
+        assertNoRowsForConditions(entry("target", 8192));
+        assertRowsForConditions(entry("common", 31415),
+                                bar, baz);
+        assertRowsForConditions(entry("target", 4096),
+                                baz);
+    }
+
+    @Test
+    public void testShouldRejectQueriesForNullEntries() throws Throwable
+    {
+        createSimpleTableAndIndex();
+        assertInvalid("SELECT * FROM %s WHERE v['somekey'] = null");
+    }
+
+    @Test
+    public void testShouldTreatQueriesAgainstFrozenMapIndexesAsInvalid() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k TEXT PRIMARY KEY, v FROZEN<MAP<TEXT, TEXT>>)");
+        createIndex("CREATE INDEX ON %s(FULL(V))");
+
+        try
+        {
+            execute("SELECT * FROM %s WHERE v['somekey'] = 'somevalue'");
+            fail("Expected index query to fail");
+        }
+        catch (InvalidRequestException e)
+        {
+            String expectedMessage = "Map-entry equality predicates on frozen map column v are not supported";
+            assertTrue("Expected error message to contain '" + expectedMessage + "' but got '" +
+                       e.getMessage() + "'", e.getMessage().contains(expectedMessage));
+        }
+    }
+
+    private void assertIndexInvalidForColumn(String colname) throws Throwable
+    {
+        String query = String.format("CREATE INDEX ON %%s(ENTRIES(%s))", colname);
+        assertInvalid(query);
+    }
+
+    private void assertReturnsNoRows(String query, Object... params) throws Throwable
+    {
+        assertRows(execute(query, params));
+    }
+
+    private void createSimpleTableAndIndex() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k TEXT PRIMARY KEY, v MAP<TEXT, INT>)");
+        createIndex("CREATE INDEX ON %s(ENTRIES(v))");
+    }
+
+    private Object[] insertIntoSimpleTable(String key, Object value) throws Throwable
+    {
+        String query = "INSERT INTO %s (k, v) VALUES (?, ?)";
+        execute(query, key, value);
+        return row(key, value);
+    }
+
+    private void assertRowsForConditions(IndexWhereClause whereClause, Object[]... rows) throws Throwable
+    {
+        assertRows(execute("SELECT * FROM %s WHERE " + whereClause.text(), whereClause.params()), rows);
+    }
+
+    private void assertNoRowsForConditions(IndexWhereClause whereClause) throws Throwable
+    {
+        assertRowsForConditions(whereClause);
+    }
+
+    private void assertInvalidRelation(String rel) throws Throwable
+    {
+        String query = "SELECT * FROM %s WHERE v " + rel;
+        assertInvalid(query);
+    }
+
+    private Object[] updateMapInSimpleTable(Object[] row, String mapKey, Integer mapValue) throws Throwable
+    {
+        execute("UPDATE %s SET v[?] = ? WHERE k = ?", mapKey, mapValue, row[0]);
+        UntypedResultSet rawResults = execute("SELECT * FROM %s WHERE k = ?", row[0]);
+        Map<Object, Object> value = (Map<Object, Object>)row[1];
+        if (mapValue == null)
+        {
+            value.remove(mapKey);
+        }
+        else
+        {
+            value.put(mapKey, mapValue);
+        }
+        return row;
+    }
+
+    private IndexWhereClause entry(Object key, Object value)
+    {
+        return (new IndexWhereClause()).entry(key, value);
+    }
+
+    private static final class IndexWhereClause
+    {
+        private final List<String> preds = new ArrayList<>();
+        private final List<Object> params = new ArrayList<>();
+
+        public IndexWhereClause entry(Object key, Object value)
+        {
+            preds.add("v[?] = ?");
+            params.add(key);
+            params.add(value);
+            return this;
+        }
+
+        public IndexWhereClause key(Object key)
+        {
+            preds.add("v CONTAINS KEY ?");
+            params.add(key);
+            return this;
+        }
+
+        public IndexWhereClause value(Object value)
+        {
+            preds.add("v CONTAINS ?");
+            params.add(value);
+            return this;
+        }
+
+        public String text()
+        {
+            if (preds.size() == 1)
+                return preds.get(0);
+            return StringUtils.join(preds, " AND ") + " ALLOW FILTERING";
+        }
+
+        public Object[] params()
+        {
+            return params.toArray();
+        }
+    }
+}