You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/03 14:49:25 UTC
git commit: Secondary index support for collections
Updated Branches:
refs/heads/trunk 57516e082 -> d12a0d7b0
Secondary index support for collections
patch by slebresne; reviewed by iamaleksey for CASSANDRA-4511
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d12a0d7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d12a0d7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d12a0d7b
Branch: refs/heads/trunk
Commit: d12a0d7b0299786bf1d0484f3770bae6a94cb0c9
Parents: 57516e0
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Nov 14 09:17:51 2013 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Dec 3 14:49:02 2013 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/cql3/Cql.g | 4 +
.../org/apache/cassandra/cql3/Relation.java | 15 ++-
.../cql3/statements/CreateIndexStatement.java | 21 +++-
.../cassandra/cql3/statements/Restriction.java | 111 +++++++++++++++++-
.../cql3/statements/SelectStatement.java | 71 ++++++++++--
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../apache/cassandra/db/IndexExpression.java | 19 +++-
.../cassandra/db/filter/ExtendedFilter.java | 47 ++++++--
.../AbstractSimplePerColumnSecondaryIndex.java | 13 ++-
.../db/index/SecondaryIndexSearcher.java | 2 +-
.../db/index/composites/CompositesIndex.java | 50 ++++++++-
.../CompositesIndexOnCollectionKey.java | 112 +++++++++++++++++++
.../CompositesIndexOnCollectionValue.java | 110 ++++++++++++++++++
.../db/index/composites/CompositesSearcher.java | 21 +++-
15 files changed, 566 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3bc50ac..08c3a67 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
* User-defined types for CQL3 (CASSANDRA-5590)
* Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
* Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511)
2.0.4
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index 325d6f6..fb0054d 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -947,6 +947,8 @@ relation[List<Relation> clauses]
{ $clauses.add(new Relation(name, Relation.Type.IN, marker)); }
| name=cident K_IN { Relation rel = Relation.createInRelation($name.id); }
'(' ( f1=term { rel.addInValue(f1); } (',' fN=term { rel.addInValue(fN); } )* )? ')' { $clauses.add(rel); }
+ | name=cident K_CONTAINS { Relation.Type rt = Relation.Type.CONTAINS; } /* (K_KEY { rt = Relation.Type.CONTAINS_KEY })? */
+ t=term { $clauses.add(new Relation(name, rt, t)); }
| '(' relation[$clauses] ')'
;
@@ -1045,6 +1047,7 @@ basic_unreserved_keyword returns [String str]
| K_CUSTOM
| K_TRIGGER
| K_DISTINCT
+ | K_CONTAINS
) { $str = $k.text; }
;
@@ -1101,6 +1104,7 @@ K_DESC: D E S C;
K_ALLOW: A L L O W;
K_FILTERING: F I L T E R I N G;
K_IF: I F;
+K_CONTAINS: C O N T A I N S;
K_GRANT: G R A N T;
K_ALL: A L L;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/Relation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java
index 15ed540..cfcdd54 100644
--- a/src/java/org/apache/cassandra/cql3/Relation.java
+++ b/src/java/org/apache/cassandra/cql3/Relation.java
@@ -35,7 +35,20 @@ public class Relation
public static enum Type
{
- EQ, LT, LTE, GTE, GT, IN;
+ EQ, LT, LTE, GTE, GT, IN, CONTAINS, CONTAINS_KEY;
+
+ public boolean allowsIndexQuery()
+ {
+ switch (this)
+ {
+ case EQ:
+ case CONTAINS:
+ case CONTAINS_KEY:
+ return true;
+ default:
+ return false;
+ }
+ }
}
private Relation(ColumnIdentifier entity, Type type, Term.Raw value, List<Term.Raw> inValues, boolean onToken)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index b040121..ae6c15c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -18,10 +18,13 @@
package org.apache.cassandra.cql3.statements;
import java.util.Collections;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableMap;
+
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
@@ -91,9 +94,6 @@ public class CreateIndexStatement extends SchemaAlteringStatement
if (cfm.isDense() && cd.kind != ColumnDefinition.Kind.REGULAR)
throw new InvalidRequestException(String.format("Secondary index on %s column %s is not yet supported for compact table", cd.kind, columnName));
- if (cd.type.isCollection() && !isCustom)
- throw new InvalidRequestException("Indexes on collections are no yet supported");
-
if (cd.kind == ColumnDefinition.Kind.PARTITION_KEY && cd.isOnAllComponents())
throw new InvalidRequestException(String.format("Cannot add secondary index to already primarily indexed column %s", columnName));
}
@@ -108,11 +108,24 @@ public class CreateIndexStatement extends SchemaAlteringStatement
return;
if (isCustom)
+ {
cd.setIndexType(IndexType.CUSTOM, Collections.singletonMap(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, indexClass));
+ }
else if (cfm.hasCompositeComparator())
- cd.setIndexType(IndexType.COMPOSITES, Collections.<String, String>emptyMap());
+ {
+ Map<String, String> options = Collections.emptyMap();
+ // For now, we only allow indexing values for collections, but we could later allow
+ // to also index map keys, so we record that this is the values we index to make our
+ // lives easier then.
+ if (cd.type.isCollection())
+ options = ImmutableMap.of("index_values", "");
+
+ cd.setIndexType(IndexType.COMPOSITES, options);
+ }
else
+ {
cd.setIndexType(IndexType.KEYS, Collections.<String, String>emptyMap());
+ }
cd.setIndexName(indexName);
cfm.addDefaultIndexNames();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/Restriction.java b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
index 9119a9d..b6f900c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/statements/Restriction.java
@@ -37,8 +37,9 @@ public interface Restriction
public boolean isSlice();
public boolean isEQ();
public boolean isIN();
+ public boolean isContains();
- // Only supported for EQ and IN, but it's convenient to have here
+ // Not supported by Slice, but it's convenient to have here
public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException;
public static class EQ implements Restriction
@@ -72,6 +73,11 @@ public interface Restriction
return false;
}
+ public boolean isContains()
+ {
+ return false;
+ }
+
public boolean isOnToken()
{
return onToken;
@@ -107,6 +113,11 @@ public interface Restriction
return false;
}
+ public boolean isContains()
+ {
+ return false;
+ }
+
public boolean isIN()
{
return true;
@@ -210,6 +221,11 @@ public interface Restriction
return false;
}
+ public boolean isContains()
+ {
+ return false;
+ }
+
public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
{
throw new UnsupportedOperationException();
@@ -302,4 +318,97 @@ public interface Restriction
onToken ? "*" : "");
}
}
+
+ // This holds both CONTAINS and CONTAINS_KEY restriction because we might want to have both of them.
+ public static class Contains implements Restriction
+ {
+ private List<Term> values; // for CONTAINS
+ private List<Term> keys; // for CONTAINS_KEY
+
+ public boolean hasContains()
+ {
+ return values != null;
+ }
+
+ public boolean hasContainsKey()
+ {
+ return keys != null;
+ }
+
+ public void add(Term t, boolean isKey)
+ {
+ if (isKey)
+ addKey(t);
+ else
+ addValue(t);
+ }
+
+ public void addValue(Term t)
+ {
+ if (values == null)
+ values = new ArrayList<>();
+ values.add(t);
+ }
+
+ public void addKey(Term t)
+ {
+ if (keys == null)
+ keys = new ArrayList<>();
+ keys.add(t);
+ }
+
+ public List<ByteBuffer> values(List<ByteBuffer> variables) throws InvalidRequestException
+ {
+ if (values == null)
+ return Collections.emptyList();
+
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(values.size());
+ for (Term value : values)
+ buffers.add(value.bindAndGet(variables));
+ return buffers;
+ }
+
+ public List<ByteBuffer> keys(List<ByteBuffer> variables) throws InvalidRequestException
+ {
+ if (keys == null)
+ return Collections.emptyList();
+
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(keys.size());
+ for (Term value : keys)
+ buffers.add(value.bindAndGet(variables));
+ return buffers;
+ }
+
+ public boolean isSlice()
+ {
+ return false;
+ }
+
+ public boolean isEQ()
+ {
+ return false;
+ }
+
+ public boolean isIN()
+ {
+ return false;
+ }
+
+ public boolean isContains()
+ {
+ return true;
+ }
+
+ public boolean isOnToken()
+ {
+ return false;
+ }
+
+
+ @Override
+ public String toString()
+ {
+ return String.format("CONTAINS(values=%s, keys=%s)", values, keys);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 344e926..62ebd21 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -756,15 +756,25 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
{
if (slice.hasBound(b))
{
- ByteBuffer value = slice.bound(b, variables);
- if (value == null)
- throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
- if (value.remaining() > 0xFFFF)
- throw new InvalidRequestException("Index expression values may not be larger than 64K");
+ ByteBuffer value = validateIndexedValue(def, slice.bound(b, variables));
expressions.add(new IndexExpression(def.name.bytes, slice.getIndexOperator(b), value));
}
}
}
+ else if (restriction.isContains())
+ {
+ Restriction.Contains contains = (Restriction.Contains)restriction;
+ for (ByteBuffer value : contains.values(variables))
+ {
+ validateIndexedValue(def, value);
+ expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS, value));
+ }
+ for (ByteBuffer key : contains.keys(variables))
+ {
+ validateIndexedValue(def, key);
+ expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.CONTAINS_KEY, key));
+ }
+ }
else
{
List<ByteBuffer> values = restriction.values(variables);
@@ -772,17 +782,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
if (values.size() != 1)
throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
- ByteBuffer value = values.get(0);
- if (value == null)
- throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
- if (value.remaining() > 0xFFFF)
- throw new InvalidRequestException("Index expression values may not be larger than 64K");
+ ByteBuffer value = validateIndexedValue(def, values.get(0));
expressions.add(new IndexExpression(def.name.bytes, IndexExpression.Operator.EQ, value));
}
}
return expressions;
}
+ private static ByteBuffer validateIndexedValue(ColumnDefinition def, ByteBuffer value) throws InvalidRequestException
+ {
+ if (value == null)
+ throw new InvalidRequestException(String.format("Unsupported null value for indexed column %s", def.name));
+ if (value.remaining() > 0xFFFF)
+ throw new InvalidRequestException("Index expression values may not be larger than 64K");
+ return value;
+ }
private Iterable<Column> columnsInOrder(final ColumnFamily cf, final List<ByteBuffer> variables) throws InvalidRequestException
{
@@ -1109,7 +1123,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
}
stmt.restrictedColumns.add(def);
- if (def.isIndexed() && rel.operator() == Relation.Type.EQ)
+ if (def.isIndexed() && rel.operator().allowsIndexQuery())
{
hasQueriableIndex = true;
if (def.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
@@ -1490,10 +1504,45 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
((Restriction.Slice)restriction).setBound(def.name, newRel.operator(), t);
}
break;
+ case CONTAINS_KEY:
+ if (!(receiver.type instanceof MapType))
+ throw new InvalidRequestException(String.format("Cannot use CONTAINS_KEY on non-map column %s", def.name));
+ // Fallthrough on purpose
+ case CONTAINS:
+ {
+ if (!receiver.type.isCollection())
+ throw new InvalidRequestException(String.format("Cannot use %s relation on non collection column %s", newRel.operator(), def.name));
+
+ if (restriction == null)
+ restriction = new Restriction.Contains();
+ else if (!restriction.isContains())
+ throw new InvalidRequestException(String.format("Collection column %s can only be restricted by CONTAINS or CONTAINS KEY", def.name));
+ boolean isKey = newRel.operator() == Relation.Type.CONTAINS_KEY;
+ receiver = makeCollectionReceiver(receiver, isKey);
+ Term t = newRel.getValue().prepare(receiver);
+ ((Restriction.Contains)restriction).add(t, isKey);
+ }
}
return restriction;
}
+ private static ColumnSpecification makeCollectionReceiver(ColumnSpecification collection, boolean isKey)
+ {
+ assert collection.type.isCollection();
+ switch (((CollectionType)collection.type).kind)
+ {
+ case LIST:
+ assert !isKey;
+ return Lists.valueSpecOf(collection);
+ case SET:
+ assert !isKey;
+ return Sets.valueSpecOf(collection);
+ case MAP:
+ return isKey ? Maps.keySpecOf(collection) : Maps.valueSpecOf(collection);
+ }
+ throw new AssertionError();
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b00d220..396bbd3 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1697,7 +1697,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
removeDroppedColumns(data);
- if (!filter.isSatisfiedBy(rawRow.key, data, null))
+ if (!filter.isSatisfiedBy(rawRow.key, data, null, null))
continue;
logger.trace("{} satisfies all filter expressions", data);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/IndexExpression.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IndexExpression.java b/src/java/org/apache/cassandra/db/IndexExpression.java
index e08e41f..b57890a 100644
--- a/src/java/org/apache/cassandra/db/IndexExpression.java
+++ b/src/java/org/apache/cassandra/db/IndexExpression.java
@@ -40,7 +40,7 @@ public class IndexExpression
public enum Operator
{
- EQ, GTE, GT, LTE, LT;
+ EQ, GTE, GT, LTE, LT, CONTAINS, CONTAINS_KEY;
public static Operator findByOrdinal(int ordinal)
{
@@ -55,10 +55,27 @@ public class IndexExpression
return LTE;
case 4:
return LT;
+ case 5:
+ return CONTAINS;
+ case 6:
+ return CONTAINS_KEY;
default:
throw new AssertionError();
}
}
+
+ public boolean allowsIndexQuery()
+ {
+ switch (this)
+ {
+ case EQ:
+ case CONTAINS:
+ case CONTAINS_KEY:
+ return true;
+ default:
+ return false;
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
index 89d6683..e749871 100644
--- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.filter;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -30,8 +31,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
/**
@@ -129,7 +129,7 @@ public abstract class ExtendedFilter
* @return true if the provided data satisfies all the expressions from
* the clause of this filter.
*/
- public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder);
+ public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement);
public static boolean satisfies(int comparison, IndexExpression.Operator op)
{
@@ -279,10 +279,8 @@ public abstract class ExtendedFilter
return pruned;
}
- public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
+ public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement)
{
- // We enforces even the primary clause because reads are not synchronized with writes and it is thus possible to have a race
- // where the index returned a row which doesn't have the primary column when we actually read it
for (IndexExpression expression : clause)
{
ColumnDefinition def = data.metadata().getColumnDefinition(expression.column);
@@ -301,6 +299,13 @@ public abstract class ExtendedFilter
}
else
{
+ if (def.type.isCollection())
+ {
+ if (!collectionSatisfies(def, data, builder, expression, collectionElement))
+ return false;
+ continue;
+ }
+
dataValue = extractDataValue(def, rowKey.key, data, builder);
validator = def.type;
}
@@ -315,6 +320,34 @@ public abstract class ExtendedFilter
return true;
}
+ private static boolean collectionSatisfies(ColumnDefinition def, ColumnFamily data, ColumnNameBuilder builder, IndexExpression expr, ByteBuffer collectionElement)
+ {
+ assert def.type.isCollection();
+
+ CollectionType type = (CollectionType)def.type;
+ builder = builder.copy().add(def.name.bytes);
+
+ switch (type.kind)
+ {
+ case LIST:
+ assert collectionElement != null;
+ return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0;
+ case SET:
+ return data.getColumn(builder.add(expr.value).build()) != null;
+ case MAP:
+ if (expr.operator == IndexExpression.Operator.CONTAINS_KEY)
+ {
+ return data.getColumn(builder.add(expr.value).build()) != null;
+ }
+ else
+ {
+ assert collectionElement != null;
+ return type.valueComparator().compare(data.getColumn(builder.add(collectionElement).build()).value(), expr.value) == 0;
+ }
+ }
+ throw new AssertionError();
+ }
+
private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, ColumnNameBuilder builder)
{
switch (def.kind)
@@ -359,7 +392,7 @@ public abstract class ExtendedFilter
return data;
}
- public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder)
+ public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, ColumnNameBuilder builder, ByteBuffer collectionElement)
{
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index 699b391..b7593ad 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -50,10 +50,21 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
indexedCfMetadata.cfName,
- new LocalPartitioner(columnDef.type),
+ new LocalPartitioner(getIndexKeyComparator()),
indexedCfMetadata);
}
+ protected AbstractType<?> getIndexKeyComparator()
+ {
+ return columnDef.type;
+ }
+
+ @Override
+ public DecoratedKey getIndexKeyFor(ByteBuffer value)
+ {
+ return new DecoratedKey(new LocalToken(getIndexKeyComparator(), value), value);
+ }
+
protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, Column column);
protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index f18357b..a508a15 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -67,7 +67,7 @@ public abstract class SecondaryIndexSearcher
continue;
SecondaryIndex index = indexManager.getIndexForColumn(expression.column);
- if (index == null || expression.operator != IndexExpression.Operator.EQ)
+ if (index == null || !expression.operator.allowsIndexQuery())
continue;
int columns = index.getIndexCfs().getMeanColumns();
candidates.put(index, columns);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index 3dea495..6d137ca 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -53,6 +53,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public static CompositesIndex create(ColumnDefinition cfDef)
{
+ if (cfDef.type.isCollection())
+ {
+ switch (((CollectionType)cfDef.type).kind)
+ {
+ case LIST:
+ return new CompositesIndexOnCollectionValue();
+ case SET:
+ return new CompositesIndexOnCollectionKey();
+ case MAP:
+ return new CompositesIndexOnCollectionValue();
+ }
+ }
+
switch (cfDef.kind)
{
case CLUSTERING_COLUMN:
@@ -70,6 +83,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
// Check SecondaryIndex.getIndexComparator if you want to know why this is static
public static CompositeType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
{
+ if (cfDef.type.isCollection())
+ {
+ switch (((CollectionType)cfDef.type).kind)
+ {
+ case LIST:
+ return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
+ case SET:
+ return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
+ case MAP:
+ return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
+ }
+ }
+
switch (cfDef.kind)
{
case CLUSTERING_COLUMN:
@@ -127,10 +153,12 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
ColumnDefinition columnDef = columnDefs.iterator().next();
Map<String, String> options = new HashMap<String, String>(columnDef.getIndexOptions());
- // We take no options though we used to have one called "prefix_size",
- // so skip it silently for backward compatibility sake.
+ // We used to have an option called "prefix_size" so skip it silently for backward compatibility sake.
options.remove("prefix_size");
+ if (columnDef.type.isCollection())
+ options.remove("index_values");
+
if (!options.isEmpty())
throw new ConfigurationException("Unknown options provided for COMPOSITES index: " + options.keySet());
}
@@ -143,14 +171,30 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public final ByteBuffer indexedKey;
public final ColumnNameBuilder indexedEntryNameBuilder;
+ public final ByteBuffer indexedEntryCollectionKey; // may be null
+
+ public IndexedEntry(DecoratedKey indexValue,
+ ByteBuffer indexEntry,
+ long timestamp,
+ ByteBuffer indexedKey,
+ ColumnNameBuilder indexedEntryNameBuilder)
+ {
+ this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryNameBuilder, null);
+ }
- public IndexedEntry(DecoratedKey indexValue, ByteBuffer indexEntry, long timestamp, ByteBuffer indexedKey, ColumnNameBuilder indexedEntryNameBuilder)
+ public IndexedEntry(DecoratedKey indexValue,
+ ByteBuffer indexEntry,
+ long timestamp,
+ ByteBuffer indexedKey,
+ ColumnNameBuilder indexedEntryNameBuilder,
+ ByteBuffer indexedEntryCollectionKey)
{
this.indexValue = indexValue;
this.indexEntry = indexEntry;
this.timestamp = timestamp;
this.indexedKey = indexedKey;
this.indexedEntryNameBuilder = indexedEntryNameBuilder;
+ this.indexedEntryCollectionKey = indexedEntryCollectionKey;
}
public ByteBuffer indexedEntryStart()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
new file mode 100644
index 0000000..c2acfc9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.LocalToken;
+
+/**
+ * Index on the collection element of the cell name of a collection.
+ *
+ * A cell indexed by this index will have the general form:
+ * ck_0 ... ck_n c_name [col_elt] : v
+ * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
+ * collection element that we want to index (which may or may not be there depending
+ * on whether c_name is the collection we're indexing) and v the cell value.
+ *
+ * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
+ * col_elt). The index entry will be:
+ * - row key will be col_elt value (getIndexedValue()).
+ * - cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
+ */
+public class CompositesIndexOnCollectionKey extends CompositesIndex
+{
+ public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ {
+ int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
+ List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
+ List<AbstractType<?>> ckTypes = baseMetadata.comparator.getComponents();
+ types.add(SecondaryIndex.keyComparator);
+ for (int i = 0; i < count - 1; i++)
+ types.add(ckTypes.get(i));
+ return CompositeType.getInstance(types);
+ }
+
+ @Override
+ protected AbstractType<?> getIndexKeyComparator()
+ {
+ return ((CollectionType)columnDef.type).nameComparator();
+ }
+
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
+ {
+ CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+ ByteBuffer[] components = baseComparator.split(column.name());
+ return components[columnDef.position() + 1];
+ }
+
+ protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+ {
+ int count = 1 + baseCfs.metadata.clusteringColumns().size();
+ CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+ ByteBuffer[] components = baseComparator.split(columnName);
+ CompositeType.Builder builder = getIndexComparator().builder();
+ builder.add(rowKey);
+ for (int i = 0; i < count - 1; i++)
+ builder.add(components[i]);
+ return builder;
+ }
+
+ public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
+ {
+ int count = 1 + baseCfs.metadata.clusteringColumns().size();
+ ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
+
+ ColumnNameBuilder builder = getBaseComparator().builder();
+ for (int i = 0; i < count - 1; i++)
+ builder.add(components[i + 1]);
+
+ return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder);
+ }
+
+ @Override
+ public boolean indexes(ByteBuffer name)
+ {
+ // We index if the CQL3 column name is the one of the collection we index
+ ByteBuffer[] components = getBaseComparator().split(name);
+ AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+ return components.length > columnDef.position()
+ && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+ }
+
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ {
+ ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexValue.key).build();
+ Column liveColumn = data.getColumn(bb);
+ return (liveColumn == null || liveColumn.isMarkedForDelete(now));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
new file mode 100644
index 0000000..f416d0e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.index.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnNameBuilder;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.dht.LocalToken;
+
+/**
+ * Index the value of a collection cell.
+ *
+ * This is a lot like an index on REGULAR, except that we also need to make
+ * the collection key part of the index entry so that:
+ * 1) we don't have to scan the whole collection at query time to know the
+ * entry is stale and if it still satisfies the query.
+ * 2) if a collection has multiple time the same value, we need one entry
+ * for each so that if we delete one of the value only we only delete the
+ * entry corresponding to that value.
+ */
+public class CompositesIndexOnCollectionValue extends CompositesIndex
+{
+ public static CompositeType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ {
+ int prefixSize = columnDef.position();
+ List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 2);
+ types.add(SecondaryIndex.keyComparator);
+ for (int i = 0; i < prefixSize; i++)
+ types.add(((CompositeType)baseMetadata.comparator).types.get(i));
+ types.add(((CollectionType)columnDef.type).nameComparator()); // collection key
+ return CompositeType.getInstance(types);
+ }
+
+ @Override
+ protected AbstractType<?> getIndexKeyComparator()
+ {
+ return ((CollectionType)columnDef.type).valueComparator();
+ }
+
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Column column)
+ {
+ return column.value();
+ }
+
+ protected ColumnNameBuilder makeIndexColumnNameBuilder(ByteBuffer rowKey, ByteBuffer columnName)
+ {
+ int prefixSize = columnDef.position();
+ CompositeType baseComparator = (CompositeType)baseCfs.getComparator();
+ ByteBuffer[] components = baseComparator.split(columnName);
+ assert components.length == baseComparator.types.size();
+ CompositeType.Builder builder = getIndexComparator().builder();
+ builder.add(rowKey);
+ for (int i = 0; i < prefixSize; i++)
+ builder.add(components[i]);
+ builder.add(components[prefixSize + 1]);
+ return builder;
+ }
+
+ public IndexedEntry decodeEntry(DecoratedKey indexedValue, Column indexEntry)
+ {
+ int prefixSize = columnDef.position();
+ ByteBuffer[] components = getIndexComparator().split(indexEntry.name());
+ CompositeType.Builder builder = getBaseComparator().builder();
+ for (int i = 0; i < prefixSize; i++)
+ builder.add(components[i + 1]);
+ return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), components[0], builder, components[prefixSize + 1]);
+ }
+
+ @Override
+ public boolean indexes(ByteBuffer name)
+ {
+ ByteBuffer[] components = getBaseComparator().split(name);
+ AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+ return components.length > columnDef.position()
+ && comp.compare(components[columnDef.position()], columnDef.name.bytes) == 0;
+ }
+
+ public boolean isStale(IndexedEntry entry, ColumnFamily data, long now)
+ {
+ ByteBuffer bb = entry.indexedEntryNameBuilder.copy().add(columnDef.name).add(entry.indexedEntryCollectionKey).build();
+ Column liveColumn = data.getColumn(bb);
+ if (liveColumn == null || liveColumn.isMarkedForDelete(now))
+ return true;
+
+ ByteBuffer liveValue = liveColumn.value();
+ return ((CollectionType)columnDef.type).valueComparator().compare(entry.indexValue.key, liveValue) != 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12a0d7b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 0d5d1a5..bcb0dd2 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -135,6 +135,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
*/
DecoratedKey currentKey = null;
ColumnFamily data = null;
+ ByteBuffer previousPrefix = null;
while (true)
{
@@ -232,6 +233,16 @@ public class CompositesSearcher extends SecondaryIndexSearcher
if (!filter.columnFilter(dk.key).maySelectPrefix(baseComparator, start))
continue;
+ // If we've record the previous prefix, it means we're dealing with an index on the collection value. In
+ // that case, we can have multiple index prefix for the same CQL3 row. In that case, we want to only add
+ // the CQL3 row once (because requesting the data multiple time would be inefficient but more importantly
+ // because we shouldn't count the columns multiple times with the lastCounted() call at the end of this
+ // method).
+ if (previousPrefix != null && previousPrefix.equals(start))
+ continue;
+ else
+ previousPrefix = null;
+
logger.trace("Adding index hit to current row for {}", indexComparator.getString(column.name()));
// We always query the whole CQL3 row. In the case where the original filter was a name filter this might be
@@ -248,9 +259,15 @@ public class CompositesSearcher extends SecondaryIndexSearcher
continue;
}
- assert newData != null : "An entry with not data should have been considered stale";
+ assert newData != null : "An entry with no data should have been considered stale";
+
+ // We know the entry is not stale and so the entry satisfy the primary clause. So whether
+ // or not the data satisfies the other clauses, there will be no point to re-check the
+ // same CQL3 row if we run into another collection value entry for this row.
+ if (entry.indexedEntryCollectionKey != null)
+ previousPrefix = start;
- if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder))
+ if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryNameBuilder, entry.indexedEntryCollectionKey))
continue;
if (data == null)