You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/07/16 15:28:08 UTC
[1/3] git commit: Add lists, sets and maps support
Updated Branches:
refs/heads/trunk 3c92afdd8 -> 91bdf7fb4
Add lists, sets and maps support
patch by slebresne; reviewed by jbellis for CASSANDRA-3647
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/91bdf7fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/91bdf7fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/91bdf7fb
Branch: refs/heads/trunk
Commit: 91bdf7fb4220b27e9566c6673bf5dbd14153017c
Parents: 3c92afd
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jun 5 11:01:51 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Jul 16 15:27:15 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +-
.../org/apache/cassandra/cql3/CFDefinition.java | 28 ++-
src/java/org/apache/cassandra/cql3/CFPropDefs.java | 40 ---
.../apache/cassandra/cql3/ColumnIdentifier.java | 12 +-
src/java/org/apache/cassandra/cql3/Cql.g | 167 ++++++++---
src/java/org/apache/cassandra/cql3/Operation.java | 91 +++++--
src/java/org/apache/cassandra/cql3/ParsedType.java | 129 ++++++++
.../org/apache/cassandra/cql3/QueryProcessor.java | 6 +-
src/java/org/apache/cassandra/cql3/Term.java | 73 +++--
.../apache/cassandra/cql3/UpdateParameters.java | 64 ++++
src/java/org/apache/cassandra/cql3/Value.java | 121 ++++++++
.../cql3/statements/AlterTableStatement.java | 35 ++-
.../cassandra/cql3/statements/BatchStatement.java | 4 +-
.../cassandra/cql3/statements/ColumnGroupMap.java | 175 +++++++++++
.../statements/CreateColumnFamilyStatement.java | 55 +++-
.../cassandra/cql3/statements/DeleteStatement.java | 147 ++++++---
.../cql3/statements/ModificationStatement.java | 61 ++++-
.../cassandra/cql3/statements/SelectStatement.java | 164 ++++-------
.../apache/cassandra/cql3/statements/Selector.java | 78 +++++-
.../cassandra/cql3/statements/UpdateStatement.java | 225 +++++++++-----
.../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../apache/cassandra/db/SliceFromReadCommand.java | 3 +-
.../apache/cassandra/db/filter/ColumnCounter.java | 108 +++++++
.../apache/cassandra/db/filter/ExtendedFilter.java | 8 +
.../cassandra/db/filter/SliceQueryFilter.java | 62 +++-
.../db/marshal/AbstractCompositeType.java | 11 +-
.../apache/cassandra/db/marshal/AbstractType.java | 23 ++
.../cassandra/db/marshal/CollectionType.java | 135 +++++++++
.../db/marshal/ColumnToCollectionType.java | 135 +++++++++
.../apache/cassandra/db/marshal/CompositeType.java | 44 ++-
.../org/apache/cassandra/db/marshal/EmptyType.java | 67 ++++
.../org/apache/cassandra/db/marshal/ListType.java | 235 +++++++++++++++
.../org/apache/cassandra/db/marshal/MapType.java | 130 ++++++++
.../org/apache/cassandra/db/marshal/SetType.java | 134 ++++++++
.../apache/cassandra/db/marshal/TypeParser.java | 91 ++++++-
src/java/org/apache/cassandra/utils/UUIDGen.java | 26 ++-
36 files changed, 2451 insertions(+), 441 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb66fd6..5578aae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,7 +27,8 @@
* stream compressed sstables directly with java nio (CASSANDRA-4297)
* Support multiple ranges in SliceQueryFilter (CASSANDRA-3885)
* Add column metadata to system column families (CASSANDRA-4018)
- * (cql3) always use composite types by default (CASSANDRA-4329)
+ * (cql3) Always use composite types by default (CASSANDRA-4329)
+ * (cql3) Add support for set, map and list (CASSANDRA-3647)
1.1.3
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 34af56e..23edd62 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -25,6 +25,7 @@ import com.google.common.collect.AbstractIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ColumnToCollectionType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.thrift.InvalidRequestException;
@@ -55,6 +56,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
// option when creating a table in that "static CF" without a composite type will have isCompact == false
// even though one must use 'WITH COMPACT STORAGE' to declare them.
public final boolean isCompact;
+ public final boolean hasCollections;
public CFDefinition(CFMetaData cfm)
{
@@ -68,6 +70,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
{
// "dense" composite
this.isCompact = true;
+ this.hasCollections = false;
for (int i = 0; i < composite.types.size(); i++)
{
ColumnIdentifier id = getColumnId(cfm, i);
@@ -81,7 +84,20 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
this.isCompact = false;
this.value = null;
assert cfm.getValueAlias() == null;
- for (int i = 0; i < composite.types.size() - 1; i++)
+ // check for collection type
+ int last = composite.types.size() - 1;
+ AbstractType<?> lastType = composite.types.get(last);
+ if (lastType instanceof ColumnToCollectionType)
+ {
+ --last;
+ this.hasCollections = true;
+ }
+ else
+ {
+ this.hasCollections = false;
+ }
+
+ for (int i = 0; i < last; i++)
{
ColumnIdentifier id = getColumnId(cfm, i);
this.columns.put(id, new Name(cfm.ksName, cfm.cfName, id, Name.Kind.COLUMN_ALIAS, i, composite.types.get(i)));
@@ -97,6 +113,7 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
else
{
this.isComposite = false;
+ this.hasCollections = false;
if (cfm.getColumn_metadata().isEmpty())
{
// dynamic CF
@@ -145,6 +162,15 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
: new ColumnIdentifier(cfm.getValueAlias(), definitionType);
}
+ public ColumnToCollectionType getCollectionType()
+ {
+ if (!hasCollections)
+ return null;
+
+ CompositeType composite = (CompositeType)cfm.comparator;
+ return (ColumnToCollectionType)composite.types.get(composite.types.size() - 1);
+ }
+
public Name get(ColumnIdentifier name)
{
if (name.equals(key.name))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index 37bcc06..6b7a56e 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -49,7 +49,6 @@ public class CFPropDefs
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
// Maps CQL short names to the respective Cassandra comparator/validator class names
- public static final Map<String, String> comparators = new HashMap<String, String>();
public static final Set<String> keywords = new HashSet<String>();
public static final Set<String> obsoleteKeywords = new HashSet<String>();
public static final Set<String> allowedKeywords = new HashSet<String>();
@@ -59,23 +58,6 @@ public class CFPropDefs
static
{
- comparators.put("ascii", "AsciiType");
- comparators.put("bigint", "LongType");
- comparators.put("blob", "BytesType");
- comparators.put("boolean", "BooleanType");
- comparators.put("counter", "CounterColumnType");
- comparators.put("decimal", "DecimalType");
- comparators.put("double", "DoubleType");
- comparators.put("float", "FloatType");
- comparators.put("inet", "InetAddressType");
- comparators.put("int", "Int32Type");
- comparators.put("text", "UTF8Type");
- comparators.put("timestamp", "DateType");
- comparators.put("uuid", "UUIDType");
- comparators.put("varchar", "UTF8Type");
- comparators.put("varint", "IntegerType");
- comparators.put("timeuuid", "TimeUUIDType");
-
keywords.add(KW_COMMENT);
keywords.add(KW_READREPAIRCHANCE);
keywords.add(KW_DCLOCALREADREPAIRCHANCE);
@@ -97,28 +79,6 @@ public class CFPropDefs
put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
}};
- public static AbstractType<?> parseType(String type) throws InvalidRequestException
- {
- try
- {
- String className = comparators.get(type);
- if (className == null)
- className = type;
- return TypeParser.parse(className);
- }
- catch (ConfigurationException e)
- {
- InvalidRequestException ex = new InvalidRequestException(e.toString());
- ex.initCause(e);
- throw ex;
- }
- }
-
- /* If not comparator/validator is not specified, default to text (BytesType is the wrong default for CQL
- * since it uses hex terms). If the value specified is not found in the comparators map, assume the user
- * knows what they are doing (a custom comparator/validator for example), and pass it on as-is.
- */
-
public void validate() throws ConfigurationException
{
// Catch the case where someone passed a kwarg that is not recognized.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index fdbf482..17d97d3 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.cql3.statements.Selector;
/**
* Represents an identifer for a CQL column definition.
*/
-public class ColumnIdentifier implements Comparable<ColumnIdentifier>, Selector
+public class ColumnIdentifier extends Selector implements Comparable<ColumnIdentifier>
{
public final ByteBuffer key;
private final String text;
@@ -76,14 +76,4 @@ public class ColumnIdentifier implements Comparable<ColumnIdentifier>, Selector
{
return this;
}
-
- public boolean hasFunction()
- {
- return false;
- }
-
- public Selector.Function function()
- {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g
index bbebedc..04c2391 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -27,6 +27,7 @@ options {
package org.apache.cassandra.cql3;
import java.util.ArrayList;
+ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -34,6 +35,8 @@ options {
import java.util.Map;
import org.apache.cassandra.cql3.statements.*;
+ import org.apache.cassandra.config.ConfigurationException;
+ import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
@@ -190,8 +193,7 @@ selector returns [Selector s]
;
selectCountClause returns [List<Selector> expr]
- : ids=cidentList { $expr = new ArrayList<Selector>(ids); }
- | '\*' { $expr = Collections.<Selector>emptyList();}
+ : '\*' { $expr = Collections.<Selector>emptyList();}
| i=INTEGER { if (!i.getText().equals("1")) addRecognitionError("Only COUNT(1) is supported, got COUNT(" + i.getText() + ")"); $expr = Collections.<Selector>emptyList();}
;
@@ -219,15 +221,15 @@ insertStatement returns [UpdateStatement expr]
@init {
Attributes attrs = new Attributes();
List<ColumnIdentifier> columnNames = new ArrayList<ColumnIdentifier>();
- List<Term> columnValues = new ArrayList<Term>();
+ List<Value> columnValues = new ArrayList<Value>();
}
: K_INSERT K_INTO cf=columnFamilyName
'(' c1=cident { columnNames.add(c1); } ( ',' cn=cident { columnNames.add(cn); } )+ ')'
K_VALUES
- '(' v1=term { columnValues.add(v1); } ( ',' vn=term { columnValues.add(vn); } )+ ')'
+ '(' v1=value { columnValues.add(v1); } ( ',' vn=value { columnValues.add(vn); } )+ ')'
( usingClause[attrs] )?
{
- $expr = new UpdateStatement(cf, columnNames, columnValues, attrs);
+ $expr = new UpdateStatement(cf, attrs, columnNames, columnValues);
}
;
@@ -258,7 +260,7 @@ usingClauseObjective[Attributes attrs]
updateStatement returns [UpdateStatement expr]
@init {
Attributes attrs = new Attributes();
- Map<ColumnIdentifier, Operation> columns = new HashMap<ColumnIdentifier, Operation>();
+ List<Pair<ColumnIdentifier, Operation>> columns = new ArrayList<Pair<ColumnIdentifier, Operation>>();
}
: K_UPDATE cf=columnFamilyName
( usingClause[attrs] )?
@@ -278,9 +280,9 @@ updateStatement returns [UpdateStatement expr]
deleteStatement returns [DeleteStatement expr]
@init {
Attributes attrs = new Attributes();
- List<ColumnIdentifier> columnsList = Collections.emptyList();
+ List<Selector> columnsList = Collections.emptyList();
}
- : K_DELETE ( ids=cidentList { columnsList = ids; } )?
+ : K_DELETE ( ids=deleteSelection { columnsList = ids; } )?
K_FROM cf=columnFamilyName
( usingClauseDelete[attrs] )?
K_WHERE wclause=whereClause
@@ -289,6 +291,14 @@ deleteStatement returns [DeleteStatement expr]
}
;
+deleteSelection returns [List<Selector> expr]
+ : t1=deleteSelector { $expr = new ArrayList<Selector>(); $expr.add(t1); } (',' tN=deleteSelector { $expr.add(tN); })*
+ ;
+
+deleteSelector returns [Selector s]
+ : c=cident { $s = c; }
+ | c=cident '[' t=term ']' { $s = new Selector.WithKey(c, t); }
+ ;
/**
* BEGIN BATCH [USING CONSISTENCY <LVL>]
@@ -461,12 +471,32 @@ cfOrKsName[CFName name, boolean isKs]
| k=unreserved_keyword { if (isKs) $name.setKeyspace(k, false); else $name.setColumnFamily(k, false); }
;
-cidentList returns [List<ColumnIdentifier> items]
- @init{ $items = new ArrayList<ColumnIdentifier>(); }
- : t1=cident { $items.add(t1); } (',' tN=cident { $items.add(tN); })*
+// Values (includes prepared statement markers)
+value returns [Value value]
+ : t=term { $value = t; }
+ | c=collection_literal { $value = c; }
+ ;
+
+collection_literal returns [Value value]
+ : ll=list_literal { $value = ll; }
+ | sl=set_literal { $value = sl; }
+ | ml=map_literal { $value = ml; }
+ ;
+
+list_literal returns [Value.ListLiteral value]
+ : '[' { Value.ListLiteral l = new Value.ListLiteral(); } ( t1=term { l.add(t1); } ( ',' tn=term { l.add(tn); } )* )? ']' { $value = l; }
+ ;
+
+set_literal returns [Value.SetLiteral value]
+ : '{' { Value.SetLiteral s = new Value.SetLiteral(); } ( t1=term { s.add(t1); } ( ',' tn=term { s.add(tn); } )* )? '}' { $value = s; }
+ ;
+
+map_literal returns [Value.MapLiteral value]
+ // Note that we have an ambiguity between maps and set for "{}". So we force it to a set, and deal with it later based on the type of the column
+ : '{' { Value.MapLiteral m = new Value.MapLiteral(); } k1=term ':' v1=term { m.put(k1, v1); } ( ',' kn=term ':' vn=term { m.put(kn, vn); } )* '}'
+ { $value = m; }
;
-// Values (includes prepared statement markers)
extendedTerm returns [Term term]
: K_TOKEN '(' t=term ')' { $term = Term.tokenOf(t); }
| t=term { $term = t; }
@@ -482,19 +512,45 @@ intTerm returns [Term integer]
| t=QMARK { $integer = new Term($t.text, $t.type, ++currentBindMarkerIdx); }
;
-termPairWithOperation[Map<ColumnIdentifier, Operation> columns]
+termPairWithOperation[List<Pair<ColumnIdentifier, Operation>> columns]
: key=cident '='
- ( value=term { columns.put(key, new Operation(value)); }
- | c=cident ( '+' v=intTerm { columns.put(key, new Operation(c, Operation.Type.PLUS, v)); }
- | op='-'? v=intTerm
- {
- validateMinusSupplied(op, v, input);
- if (op == null)
- v = new Term(-(Long.valueOf(v.getText())), v.getType());
- columns.put(key, new Operation(c, Operation.Type.MINUS, v));
- }
- )
+ ( v=value { columns.add(Pair.<ColumnIdentifier, Operation>create(key, new Operation.Set(v))); }
+ | c=cident op=operation
+ {
+ if (!key.equals(c))
+ addRecognitionError("Only expressions like X = X <op> <value> are supported.");
+ columns.add(Pair.<ColumnIdentifier, Operation>create(key, op));
+ }
+ | ll=list_literal '+' c=cident
+ {
+ if (!key.equals(c))
+ addRecognitionError("Only expressions like X = <value> + X are supported.");
+ columns.add(Pair.<ColumnIdentifier, Operation>create(key, new Operation.Function(CollectionType.Function.PREPEND, ll)));
+ }
)
+ | key=cident '[' t=term ']' '=' vv=term
+ {
+ List<Term> args = Arrays.asList(t, vv);
+ columns.add(Pair.<ColumnIdentifier, Operation>create(key, new Operation.Function(CollectionType.Function.SET, args)));
+ }
+ ;
+
+operation returns [Operation op]
+ : '+' v=intTerm { $op = new Operation.Counter(v, false); }
+ | sign='-'? v=intTerm
+ {
+ validateMinusSupplied(sign, v, input);
+ if (sign == null)
+ v = new Term(-(Long.valueOf(v.getText())), v.getType());
+ $op = new Operation.Counter(v, true);
+ }
+ | '+' ll=list_literal { $op = new Operation.Function(CollectionType.Function.APPEND, ll); }
+ | '-' ll=list_literal { $op = new Operation.Function(CollectionType.Function.DISCARD_LIST, ll); }
+
+ | '+' sl=set_literal { $op = new Operation.Function(CollectionType.Function.ADD, sl.asList()); }
+ | '-' sl=set_literal { $op = new Operation.Function(CollectionType.Function.DISCARD_SET, sl.asList()); }
+
+ | '+' ml=map_literal { $op = new Operation.Function(CollectionType.Function.SET, ml.asList()); }
;
property returns [String str]
@@ -519,29 +575,45 @@ relation returns [Relation rel]
'(' f1=term { $rel.addInValue(f1); } (',' fN=term { $rel.addInValue(fN); } )* ')'
;
-comparatorType returns [String str]
- : c=native_type { $str=c; }
- | s=STRING_LITERAL { $str = $s.text; }
+comparatorType returns [ParsedType t]
+ : c=native_type { $t = c; }
+ | c=collection_type { $t = c; }
+ | s=STRING_LITERAL
+ {
+ try {
+ $t = new ParsedType.Custom($s.text);
+ } catch (ConfigurationException e) {
+ addRecognitionError("Cannot parse type " + $s.text + ": " + e.getMessage());
+ }
+ }
;
-native_type returns [String str]
- : c=( K_ASCII
- | K_BIGINT
- | K_BLOB
- | K_BOOLEAN
- | K_COUNTER
- | K_DECIMAL
- | K_DOUBLE
- | K_FLOAT
- | K_INET
- | K_INT
- | K_TEXT
- | K_TIMESTAMP
- | K_UUID
- | K_VARCHAR
- | K_VARINT
- | K_TIMEUUID
- ) { return $c.text; }
+native_type returns [ParsedType t]
+ : K_ASCII { $t = ParsedType.Native.ASCII; }
+ | K_BIGINT { $t = ParsedType.Native.BIGINT; }
+ | K_BLOB { $t = ParsedType.Native.BLOB; }
+ | K_BOOLEAN { $t = ParsedType.Native.BOOLEAN; }
+ | K_COUNTER { $t = ParsedType.Native.COUNTER; }
+ | K_DECIMAL { $t = ParsedType.Native.DECIMAL; }
+ | K_DOUBLE { $t = ParsedType.Native.DOUBLE; }
+ | K_FLOAT { $t = ParsedType.Native.FLOAT; }
+ | K_INET { $t = ParsedType.Native.INET;}
+ | K_INT { $t = ParsedType.Native.INT; }
+ | K_TEXT { $t = ParsedType.Native.TEXT; }
+ | K_TIMESTAMP { $t = ParsedType.Native.TIMESTAMP; }
+ | K_UUID { $t = ParsedType.Native.UUID; }
+ | K_VARCHAR { $t = ParsedType.Native.VARCHAR; }
+ | K_VARINT { $t = ParsedType.Native.VARINT; }
+ | K_TIMEUUID { $t = ParsedType.Native.TIMEUUID; }
+ ;
+
+collection_type returns [ParsedType pt]
+ : K_MAP '<' t1=comparatorType ',' t2=comparatorType '>'
+ { try { $pt = ParsedType.Collection.map(t1, t2); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+ | K_LIST '<' t=comparatorType '>'
+ { try { $pt = ParsedType.Collection.list(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
+ | K_SET '<' t=comparatorType '>'
+ { try { $pt = ParsedType.Collection.set(t); } catch (InvalidRequestException e) { addRecognitionError(e.getMessage()); } }
;
unreserved_keyword returns [String str]
@@ -556,8 +628,10 @@ unreserved_keyword returns [String str]
| K_TYPE
| K_VALUES
| K_WRITETIME
+ | K_MAP
+ | K_LIST
) { $str = $k.text; }
- | t=native_type { $str = t; }
+ | t=native_type { $str = t.toString(); }
;
@@ -634,6 +708,9 @@ K_TIMEUUID: T I M E U U I D;
K_TOKEN: T O K E N;
K_WRITETIME: W R I T E T I M E;
+K_MAP: M A P;
+K_LIST: L I S T;
+
// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Operation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java
index 4b7989e..e7291c3 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -17,37 +17,92 @@
*/
package org.apache.cassandra.cql3;
-public class Operation
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public abstract class Operation
{
- public static enum Type { PLUS, MINUS }
+ public static enum Type { SET, COUNTER, FUNCTION }
public final Type type;
- public final ColumnIdentifier ident;
- public final Term value;
- // unary operation
- public Operation(Term a)
+ protected Operation(Type type)
{
- this(null, null, a);
+ this.type = type;
}
- // binary operation
- public Operation(ColumnIdentifier a, Type type, Term b)
+ public abstract Iterable<Term> allTerms();
+
+ public static class Set extends Operation
{
- this.ident = a;
- this.type = type;
- this.value = b;
+ public final Value value;
+
+ public Set(Value value)
+ {
+ super(Type.SET);
+ this.value = value;
+ }
+
+ @Override
+ public String toString()
+ {
+ return " = " + value;
+ }
+
+ public List<Term> allTerms()
+ {
+ return value.asList();
+ }
}
- public boolean isUnary()
+ public static class Counter extends Operation
{
- return type == null && ident == null;
+ public final Term value;
+ public final boolean isSubstraction;
+
+ public Counter(Term value, boolean isSubstraction)
+ {
+ super(Type.COUNTER);
+ this.value = value;
+ this.isSubstraction = isSubstraction;
+ }
+
+ @Override
+ public String toString()
+ {
+ return (isSubstraction ? "-" : "+") + "= " + value;
+ }
+
+ public Iterable<Term> allTerms()
+ {
+ return Collections.singletonList(value);
+ }
}
- public String toString()
+ public static class Function extends Operation
{
- return (isUnary())
- ? String.format("UnaryOperation(%s)", value)
- : String.format("BinaryOperation(%s, %s, %s)", ident, type, value);
+ public final CollectionType.Function fct;
+ public final List<Term> arguments;
+
+ public Function(CollectionType.Function fct, List<Term> arguments)
+ {
+ super(Type.FUNCTION);
+ this.fct = fct;
+ this.arguments = arguments;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "." + fct + arguments;
+ }
+
+ public Iterable<Term> allTerms()
+ {
+ return arguments;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/ParsedType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ParsedType.java b/src/java/org/apache/cassandra/cql3/ParsedType.java
new file mode 100644
index 0000000..28e6be3
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/ParsedType.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public interface ParsedType
+{
+ public boolean isCollection();
+ public AbstractType<?> getType();
+
+ public enum Native implements ParsedType
+ {
+ ASCII (AsciiType.instance),
+ BIGINT (LongType.instance),
+ BLOB (BytesType.instance),
+ BOOLEAN (BooleanType.instance),
+ COUNTER (CounterColumnType.instance),
+ DECIMAL (DecimalType.instance),
+ DOUBLE (DoubleType.instance),
+ FLOAT (FloatType.instance),
+ INET (InetAddressType.instance),
+ INT (Int32Type.instance),
+ TEXT (UTF8Type.instance),
+ TIMESTAMP(DateType.instance),
+ UUID (UUIDType.instance),
+ VARCHAR (UTF8Type.instance),
+ VARINT (IntegerType.instance),
+ TIMEUUID (TimeUUIDType.instance);
+
+ private final AbstractType<?> type;
+
+ private Native(AbstractType<?> type)
+ {
+ this.type = type;
+ }
+
+ public boolean isCollection()
+ {
+ return false;
+ }
+
+ public AbstractType<?> getType()
+ {
+ return type;
+ }
+ }
+
+ public static class Custom implements ParsedType
+ {
+ private final AbstractType<?> type;
+
+ public Custom(String className) throws ConfigurationException
+ {
+ this.type = TypeParser.parse(className);
+ }
+
+ public boolean isCollection()
+ {
+ return false;
+ }
+
+ public AbstractType<?> getType()
+ {
+ return type;
+ }
+ }
+
+ public static class Collection implements ParsedType
+ {
+ CollectionType type;
+
+ private Collection(CollectionType type)
+ {
+ this.type = type;
+ }
+
+ public static Collection map(ParsedType t1, ParsedType t2) throws InvalidRequestException
+ {
+ if (t1.isCollection() || t2.isCollection())
+ throw new InvalidRequestException("map type cannot contain another collection");
+
+ return new Collection(MapType.getInstance(t1.getType(), t2.getType()));
+ }
+
+ public static Collection list(ParsedType t) throws InvalidRequestException
+ {
+ if (t.isCollection())
+ throw new InvalidRequestException("list type cannot contain another collection");
+
+ return new Collection(ListType.getInstance(t.getType()));
+ }
+
+ public static Collection set(ParsedType t) throws InvalidRequestException
+ {
+ if (t.isCollection())
+ throw new InvalidRequestException("set type cannot contain another collection");
+
+ return new Collection(SetType.getInstance(t.getType()));
+ }
+
+ public boolean isCollection()
+ {
+ return true;
+ }
+
+ public AbstractType<?> getType()
+ {
+ return type;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index dde9b26..d084480 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -222,15 +222,15 @@ public class QueryProcessor
CqlLexer lexer = new CqlLexer(stream);
TokenStream tokenStream = new CommonTokenStream(lexer);
CqlParser parser = new CqlParser(tokenStream);
-
+
// Parse the query string to a statement instance
ParsedStatement statement = parser.query();
-
+
// The lexer and parser queue up any errors they may have encountered
// along the way, if necessary, we turn them into exceptions here.
lexer.throwLastRecognitionError();
parser.throwLastRecognitionError();
-
+
return statement;
}
catch (RuntimeException re)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Term.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java
index 04fa5fa..81be6de 100644
--- a/src/java/org/apache/cassandra/cql3/Term.java
+++ b/src/java/org/apache/cassandra/cql3/Term.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.cql3;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import org.apache.cassandra.config.ConfigurationException;
@@ -32,14 +34,36 @@ import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.thrift.InvalidRequestException;
/** A term parsed from a CQL statement. */
-public class Term
+public class Term implements Value
{
+ public enum Type
+ {
+ STRING, INTEGER, UUID, FLOAT, QMARK;
+
+ static Type forInt(int type)
+ {
+ if ((type == CqlParser.STRING_LITERAL) || (type == CqlParser.IDENT))
+ return STRING;
+ else if (type == CqlParser.INTEGER)
+ return INTEGER;
+ else if (type == CqlParser.UUID)
+ return UUID;
+ else if (type == CqlParser.FLOAT)
+ return FLOAT;
+ else if (type == CqlParser.QMARK)
+ return QMARK;
+
+ // FIXME: handled scenario that should never occur.
+ return null;
+ }
+ }
+
private final String text;
- private final TermType type;
+ private final Type type;
public final int bindIndex;
public final boolean isToken;
- private Term(String text, TermType type, int bindIndex, boolean isToken)
+ private Term(String text, Type type, int bindIndex, boolean isToken)
{
this.text = text == null ? "" : text;
this.type = type;
@@ -47,7 +71,7 @@ public class Term
this.isToken = isToken;
}
- public Term(String text, TermType type)
+ public Term(String text, Type type)
{
this(text, type, -1, false);
}
@@ -61,17 +85,17 @@ public class Term
*/
public Term(String text, int type)
{
- this(text, TermType.forInt(type));
+ this(text, Type.forInt(type));
}
- public Term(long value, TermType type)
+ public Term(long value, Type type)
{
this(String.valueOf(value), type);
}
public Term(String text, int type, int index)
{
- this(text, TermType.forInt(type), index, false);
+ this(text, Type.forInt(type), index, false);
}
public static Term tokenOf(Term t)
@@ -119,7 +143,7 @@ public class Term
public Token getAsToken(AbstractType<?> validator, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException
{
- if (!(isToken || type == TermType.STRING))
+ if (!(isToken || type == Type.STRING))
throw new InvalidRequestException("Invalid value for token (use a string literal of the token value or the token() function)");
try
@@ -146,14 +170,19 @@ public class Term
*
* @return the type
*/
- public TermType getType()
+ public Type getType()
{
return type;
}
public boolean isBindMarker()
{
- return type == TermType.QMARK;
+ return type == Type.QMARK;
+ }
+
+ public List<Term> asList()
+ {
+ return Collections.singletonList(this);
}
@Override
@@ -182,7 +211,7 @@ public class Term
if (getClass() != obj.getClass())
return false;
Term other = (Term) obj;
- if (type==TermType.QMARK) return false; // markers are never equal
+ if (type==Type.QMARK) return false; // markers are never equal
if (text == null)
{
if (other.text != null)
@@ -196,25 +225,3 @@ public class Term
return true;
}
}
-
-enum TermType
-{
- STRING, INTEGER, UUID, FLOAT, QMARK;
-
- static TermType forInt(int type)
- {
- if ((type == CqlParser.STRING_LITERAL) || (type == CqlParser.IDENT))
- return STRING;
- else if (type == CqlParser.INTEGER)
- return INTEGER;
- else if (type == CqlParser.UUID)
- return UUID;
- else if (type == CqlParser.FLOAT)
- return FLOAT;
- else if (type == CqlParser.QMARK)
- return QMARK;
-
- // FIXME: handled scenario that should never occur.
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
new file mode 100644
index 0000000..1372c87
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.db.*;
+
+/**
+ * A simple container that simplify passing parameters for collections methods.
+ */
+public class UpdateParameters
+{
+ public final List<ByteBuffer> variables;
+ public final long timestamp;
+ private final int ttl;
+ public final int localDeletionTime;
+
+ public UpdateParameters(List<ByteBuffer> variables, long timestamp, int ttl)
+ {
+ this.variables = variables;
+ this.timestamp = timestamp;
+ this.ttl = ttl;
+ this.localDeletionTime = (int)(System.currentTimeMillis() / 1000);
+ }
+
+ public Column makeColumn(ByteBuffer name, ByteBuffer value)
+ {
+ return ttl > 0
+ ? new ExpiringColumn(name, value, timestamp, ttl)
+ : new Column(name, value, timestamp);
+ }
+
+ public Column makeTombstone(ByteBuffer name)
+ {
+ return new DeletedColumn(name, localDeletionTime, timestamp);
+ }
+
+ public RangeTombstone makeRangeTombstone(ByteBuffer start, ByteBuffer end)
+ {
+ return new RangeTombstone(start, end, timestamp, localDeletionTime);
+ }
+
+ public RangeTombstone makeTombstoneForOverwrite(ByteBuffer start, ByteBuffer end)
+ {
+ return new RangeTombstone(start, end, timestamp - 1, localDeletionTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/Value.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Value.java b/src/java/org/apache/cassandra/cql3/Value.java
new file mode 100644
index 0000000..64f588b
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/Value.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+public interface Value
+{
+ public List<Term> asList();
+
+ public interface CollectionLiteral extends Value
+ {
+ /**
+ * Validate that this literal is compatible with the provided column and
+ * throws an InvalidRequestException if not.
+ */
+ public void validateType(CFDefinition.Name value) throws InvalidRequestException;
+
+ /**
+ * Returns whether this litteral is empty or not.
+ */
+ public boolean isEmpty();
+
+ /**
+ * Returns the internal function to use to construct this literal.
+ */
+ public CollectionType.Function constructionFunction();
+ }
+
+ public static class MapLiteral extends HashMap<Term, Term> implements CollectionLiteral
+ {
+ public List<Term> asList()
+ {
+ List<Term> l = new ArrayList<Term>(2 * size());
+ for (Map.Entry<Term, Term> entry : entrySet())
+ {
+ l.add(entry.getKey());
+ l.add(entry.getValue());
+ }
+ return l;
+ }
+
+ public void validateType(CFDefinition.Name value) throws InvalidRequestException
+ {
+ if (!(value.type instanceof MapType))
+ throw new InvalidRequestException(String.format("Invalid value: %s is not a map", value.name));
+ }
+
+ public CollectionType.Function constructionFunction()
+ {
+ return CollectionType.Function.SET;
+ }
+ }
+
+ public static class ListLiteral extends ArrayList<Term> implements CollectionLiteral
+ {
+ public List<Term> asList()
+ {
+ return this;
+ }
+
+ public void validateType(CFDefinition.Name value) throws InvalidRequestException
+ {
+ if (!(value.type instanceof ListType))
+ throw new InvalidRequestException(String.format("Invalid value: %s is not a list", value.name));
+ }
+
+ public CollectionType.Function constructionFunction()
+ {
+ return CollectionType.Function.APPEND;
+ }
+ }
+
+ public static class SetLiteral extends HashSet<Term> implements CollectionLiteral
+ {
+ public List<Term> asList()
+ {
+ return new ArrayList<Term>(this);
+ }
+
+ public void validateType(CFDefinition.Name value) throws InvalidRequestException
+ {
+ // The parser don't distinguish between empty set and empty map and always return an empty set
+ if ((value.type instanceof MapType) && !isEmpty())
+ throw new InvalidRequestException(String.format("Invalid value: %s is not a map", value.name));
+ else if (!(value.type instanceof SetType))
+ throw new InvalidRequestException(String.format("Invalid value: %s is not a set", value.name));
+ }
+
+ public CollectionType.Function constructionFunction()
+ {
+ return CollectionType.Function.ADD;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index bb74bd4..26ad6cc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -17,11 +17,14 @@
*/
package org.apache.cassandra.cql3.statements;
+import java.nio.ByteBuffer;
import java.util.*;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ColumnToCollectionType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.cassandra.service.MigrationManager;
@@ -37,11 +40,11 @@ public class AlterTableStatement extends SchemaAlteringStatement
}
public final Type oType;
- public final String validator;
+ public final ParsedType validator;
public final ColumnIdentifier columnName;
private final CFPropDefs cfProps = new CFPropDefs();
- public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, String validator, Map<String, String> propertyMap)
+ public AlterTableStatement(CFName name, Type type, ColumnIdentifier columnName, ParsedType validator, Map<String, String> propertyMap)
{
super(name);
this.oType = type;
@@ -73,8 +76,26 @@ public class AlterTableStatement extends SchemaAlteringStatement
throw new InvalidRequestException(String.format("Invalid column name %s because it conflicts with an existing column", columnName));
}
}
+
+ AbstractType<?> type = validator.getType();
+ if (type instanceof CollectionType)
+ {
+ if (!cfDef.isComposite)
+ throw new InvalidRequestException("Cannot use collection types with non-composite PRIMARY KEY");
+
+ Map<ByteBuffer, CollectionType> collections = cfDef.hasCollections
+ ? new HashMap<ByteBuffer, CollectionType>(cfDef.getCollectionType().defined)
+ : new HashMap<ByteBuffer, CollectionType>();
+ ColumnToCollectionType newColType = ColumnToCollectionType.getInstance(collections);
+ List<AbstractType<?>> ctypes = new ArrayList<AbstractType<?>>(((CompositeType)cfm.comparator).types);
+ if (cfDef.hasCollections)
+ ctypes.set(ctypes.size() - 1, newColType);
+ else
+ ctypes.add(newColType);
+ cfm.comparator = CompositeType.getInstance(ctypes);
+ }
cfm.addColumnDefinition(new ColumnDefinition(columnName.key,
- CFPropDefs.parseType(validator),
+ validator.getType(),
null,
null,
null,
@@ -88,7 +109,7 @@ public class AlterTableStatement extends SchemaAlteringStatement
switch (name.kind)
{
case KEY_ALIAS:
- AbstractType<?> newType = CFPropDefs.parseType(validator);
+ AbstractType<?> newType = validator.getType();
if (newType instanceof CounterColumnType)
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", columnName));
cfm.keyValidator(newType);
@@ -97,16 +118,16 @@ public class AlterTableStatement extends SchemaAlteringStatement
assert cfDef.isComposite;
List<AbstractType<?>> newTypes = new ArrayList<AbstractType<?>>(((CompositeType) cfm.comparator).types);
- newTypes.set(name.position, CFPropDefs.parseType(validator));
+ newTypes.set(name.position, validator.getType());
cfm.comparator = CompositeType.getInstance(newTypes);
break;
case VALUE_ALIAS:
- cfm.defaultValidator(CFPropDefs.parseType(validator));
+ cfm.defaultValidator(validator.getType());
break;
case COLUMN_METADATA:
ColumnDefinition column = cfm.getColumnDefinition(columnName.key);
- column.setValidator(CFPropDefs.parseType(validator));
+ column.setValidator(validator.getType());
cfm.addColumnDefinition(column);
break;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 9ceb67d..cc30307 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeoutException;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
@@ -30,6 +31,7 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.RequestType;
import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.Pair;
/**
@@ -97,7 +99,7 @@ public class BatchStatement extends ModificationStatement
}
public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables)
- throws InvalidRequestException
+ throws UnavailableException, TimeoutException, InvalidRequestException
{
Map<Pair<String, ByteBuffer>, RowAndCounterMutation> mutations = new HashMap<Pair<String, ByteBuffer>, RowAndCounterMutation>();
for (ModificationStatement statement : statements)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
new file mode 100644
index 0000000..e365e81
--- /dev/null
+++ b/src/java/org/apache/cassandra/cql3/statements/ColumnGroupMap.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3.statements;
+
+import java.nio.ByteBuffer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.utils.Pair;
+
+public class ColumnGroupMap
+{
+ private final ByteBuffer[] fullPath;
+ private final Map<ByteBuffer, Value> map = new HashMap<ByteBuffer, Value>();
+
+ private ColumnGroupMap(ByteBuffer[] fullPath)
+ {
+ this.fullPath = fullPath;
+ }
+
+ private void add(ByteBuffer[] fullName, int idx, IColumn column)
+ {
+ ByteBuffer columnName = fullName[idx];
+ if (fullName.length == idx + 2)
+ {
+ // It's a collection
+ Value v = map.get(columnName);
+ if (v == null)
+ {
+ v = new Collection();
+ map.put(columnName, v);
+ }
+ assert v instanceof Collection;
+
+ ((Collection)v).add(Pair.create(fullName[idx + 1], column));
+ }
+ else
+ {
+ assert !map.containsKey(columnName);
+ map.put(columnName, new Simple(column));
+ }
+ }
+
+ public ByteBuffer getKeyComponent(int pos)
+ {
+ return fullPath[pos];
+ }
+
+ public IColumn getSimple(ByteBuffer key)
+ {
+ Value v = map.get(key);
+ if (v == null)
+ return null;
+
+ assert v instanceof Simple;
+ return ((Simple)v).column;
+ }
+
+ public List<Pair<ByteBuffer, IColumn>> getCollection(ByteBuffer key)
+ {
+ Value v = map.get(key);
+ if (v == null)
+ return null;
+
+ assert v instanceof Collection;
+ return (List<Pair<ByteBuffer, IColumn>>)v;
+ }
+
+ private interface Value {};
+
+ private static class Simple implements Value
+ {
+ public final IColumn column;
+
+ Simple(IColumn column)
+ {
+ this.column = column;
+ }
+ }
+
+ private static class Collection extends ArrayList<Pair<ByteBuffer, IColumn>> implements Value {}
+
+ public static class Builder
+ {
+ private final CompositeType composite;
+ private final int idx;
+ private ByteBuffer[] previous;
+
+ private final List<ColumnGroupMap> groups = new ArrayList<ColumnGroupMap>();
+ private ColumnGroupMap currentGroup;
+
+ public Builder(CompositeType composite, boolean hasCollections)
+ {
+ this.composite = composite;
+ this.idx = composite.types.size() - (hasCollections ? 2 : 1);
+ }
+
+ public void add(IColumn c)
+ {
+ if (c.isMarkedForDelete())
+ return;
+
+ ByteBuffer[] current = composite.split(c.name());
+
+ if (currentGroup == null)
+ {
+ currentGroup = new ColumnGroupMap(current);
+ currentGroup.add(current, idx, c);
+ previous = current;
+ return;
+ }
+
+ if (!isSameGroup(current))
+ {
+ groups.add(currentGroup);
+ currentGroup = new ColumnGroupMap(current);
+ }
+ currentGroup.add(current, idx, c);
+ previous = current;
+ }
+
+ /**
+ * For sparse composite, returns wheter the column belong to the same
+ * cqlRow than the previously added, based on the full list of component
+ * in the name.
+ * Two columns do belong together if they differ only by the last
+ * component.
+ */
+ private boolean isSameGroup(ByteBuffer[] c)
+ {
+ // Cql don't allow to insert columns who doesn't have all component of
+ // the composite set for sparse composite. Someone coming from thrift
+ // could hit that though. But since we have no way to handle this
+ // correctly, better fail here and tell whomever may hit that (if
+ // someone ever do) to change the definition to a dense composite.
+ assert c.length >= idx && previous.length >= idx : "Sparse composite should not have partial column names";
+ for (int i = 0; i < idx; i++)
+ {
+ if (!c[i].equals(previous[i]))
+ return false;
+ }
+ return true;
+ }
+
+ public List<ColumnGroupMap> groups()
+ {
+ if (currentGroup != null)
+ {
+ groups.add(currentGroup);
+ currentGroup = null;
+ }
+ return groups;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
index 1ad1007..45a55da 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.ColumnToCollectionType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.marshal.CounterColumnType;
@@ -68,7 +70,14 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
private Map<ByteBuffer, ColumnDefinition> getColumns() throws InvalidRequestException
{
Map<ByteBuffer, ColumnDefinition> columnDefs = new HashMap<ByteBuffer, ColumnDefinition>();
- Integer componentIndex = comparator instanceof CompositeType ? ((CompositeType)comparator).types.size() - 1 : null;
+ Integer componentIndex = null;
+ if (comparator instanceof CompositeType)
+ {
+ CompositeType ct = (CompositeType) comparator;
+ componentIndex = ct.types.get(ct.types.size() - 1) instanceof ColumnToCollectionType
+ ? ct.types.size() - 2
+ : ct.types.size() - 1;
+ }
for (Map.Entry<ColumnIdentifier, AbstractType> col : columns.entrySet())
{
@@ -123,7 +132,7 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
public static class RawStatement extends CFStatement
{
- private final Map<ColumnIdentifier, String> definitions = new HashMap<ColumnIdentifier, String>();
+ private final Map<ColumnIdentifier, ParsedType> definitions = new HashMap<ColumnIdentifier, ParsedType>();
private final CFPropDefs properties = new CFPropDefs();
private final List<ColumnIdentifier> keyAliases = new ArrayList<ColumnIdentifier>();
@@ -159,10 +168,19 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
CreateColumnFamilyStatement stmt = new CreateColumnFamilyStatement(cfName, properties);
stmt.setBoundTerms(getBoundsTerms());
- for (Map.Entry<ColumnIdentifier, String> entry : definitions.entrySet())
+
+ Map<ByteBuffer, CollectionType> definedCollections = null;
+ for (Map.Entry<ColumnIdentifier, ParsedType> entry : definitions.entrySet())
{
- AbstractType<?> type = CFPropDefs.parseType(entry.getValue());
- stmt.columns.put(entry.getKey(), type); // we'll remove what is not a column below
+ ColumnIdentifier id = entry.getKey();
+ ParsedType pt = entry.getValue();
+ if (pt.isCollection())
+ {
+ if (definedCollections == null)
+ definedCollections = new HashMap<ByteBuffer, CollectionType>();
+ definedCollections.put(id.key, (CollectionType)pt.getType());
+ }
+ stmt.columns.put(id, pt.getType()); // we'll remove what is not a column below
}
// Ensure that exactly one key has been specified.
@@ -183,6 +201,8 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
// standard "dynamic" CF, otherwise it's a composite
if (useCompactStorage && columnAliases.size() == 1)
{
+ if (definedCollections != null)
+ throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
stmt.columnAliases.add(columnAliases.get(0).key);
stmt.comparator = getTypeAndRemove(stmt.columns, columnAliases.get(0));
if (stmt.comparator instanceof CounterColumnType)
@@ -200,9 +220,20 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
throw new InvalidRequestException(String.format("counter type is not supported for PRIMARY KEY part %s", t.key));
types.add(type);
}
- // For sparse, we must add the last UTF8 component
- if (!useCompactStorage)
+
+ if (useCompactStorage)
+ {
+ if (definedCollections != null)
+ throw new InvalidRequestException("Collection types are not supported with COMPACT STORAGE");
+ }
+ else
+ {
+ // For sparse, we must add the last UTF8 component
+ // and the collection type if there is one
types.add(CFDefinition.definitionType);
+ if (definedCollections != null)
+ types.add(ColumnToCollectionType.getInstance(definedCollections));
+ }
if (types.isEmpty())
throw new IllegalStateException("Nonsensical empty parameter list for CompositeType");
@@ -212,9 +243,15 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
else
{
if (useCompactStorage)
+ {
+ if (definedCollections != null)
+ throw new InvalidRequestException("Collection types are not supported with non composite PRIMARY KEY");
stmt.comparator = CFDefinition.definitionType;
+ }
else
+ {
stmt.comparator = CompositeType.getInstance(Collections.<AbstractType<?>>singletonList(CFDefinition.definitionType));
+ }
}
if (stmt.columns.isEmpty())
@@ -250,12 +287,14 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement
AbstractType type = columns.get(t);
if (type == null)
throw new InvalidRequestException(String.format("Unkown definition %s referenced in PRIMARY KEY", t));
+ if (type instanceof CollectionType)
+ throw new InvalidRequestException(String.format("Invalid collection type for PRIMARY KEY component %s", t));
columns.remove(t);
Boolean isReversed = definedOrdering.get(t);
return isReversed != null && isReversed ? ReversedType.getInstance(type) : type;
}
- public void addDefinition(ColumnIdentifier def, String type)
+ public void addDefinition(ColumnIdentifier def, ParsedType type)
{
definedNames.add(def);
definitions.put(def, type);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index 2b3c114..eb96417 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -18,12 +18,8 @@
package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.config.CFMetaData;
@@ -33,9 +29,15 @@ import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.thrift.ThriftValidation;
+import org.apache.cassandra.utils.Pair;
/**
* A <code>DELETE</code> parsed from a CQL query statement.
@@ -43,20 +45,23 @@ import org.apache.cassandra.thrift.ThriftValidation;
public class DeleteStatement extends ModificationStatement
{
private CFDefinition cfDef;
- private final List<ColumnIdentifier> columns;
+ private final List<Selector> columns;
private final List<Relation> whereClause;
+ private final List<Pair<CFDefinition.Name, Term>> toRemove;
+
private final Map<ColumnIdentifier, List<Term>> processedKeys = new HashMap<ColumnIdentifier, List<Term>>();
- public DeleteStatement(CFName name, List<ColumnIdentifier> columns, List<Relation> whereClause, Attributes attrs)
+ public DeleteStatement(CFName name, List<Selector> columns, List<Relation> whereClause, Attributes attrs)
{
super(name, attrs);
this.columns = columns;
this.whereClause = whereClause;
+ this.toRemove = new ArrayList<Pair<CFDefinition.Name, Term>>(columns.size());
}
- public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws InvalidRequestException
+ public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws UnavailableException, TimeoutException, InvalidRequestException
{
// Check key
List<Term> keys = processedKeys.get(cfDef.key.name);
@@ -83,79 +88,109 @@ public class DeleteStatement extends ModificationStatement
}
}
- List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size());
+ boolean fullKey = builder.componentCount() == cfDef.columns.size();
+ boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || toRemove.isEmpty());
- for (Term key : keys)
+ if (!toRemove.isEmpty() && isRange)
+ throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", firstEmpty, toRemove.iterator().next().left));
+
+ // Lists DISCARD operation incurs a read. Do that now.
+ boolean needsReading = false;
+ for (Pair<CFDefinition.Name, Term> p : toRemove)
{
- ByteBuffer rawKey = key.getByteBuffer(cfDef.key.type, variables);
- rowMutations.add(mutationForKey(cfDef, clientState, rawKey, builder, firstEmpty, variables));
+ CFDefinition.Name name = p.left;
+ Term value = p.right;
+
+ if ((name.type instanceof ListType) && value != null)
+ {
+ needsReading = true;
+ break;
+ }
}
+ List<ByteBuffer> rawKeys = new ArrayList<ByteBuffer>(keys.size());
+ for (Term key: keys)
+ rawKeys.add(key.getByteBuffer(cfDef.key.type, variables));
+
+ Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(rawKeys, builder, (CompositeType)cfDef.cfm.comparator) : null;
+
+ List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size());
+ UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1);
+
+ for (ByteBuffer key : rawKeys)
+ rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params, rows == null ? null : rows.get(key)));
+
return rowMutations;
}
- public RowMutation mutationForKey(CFDefinition cfDef, ClientState clientState, ByteBuffer key, ColumnNameBuilder builder, CFDefinition.Name firstEmpty, List<ByteBuffer> variables)
+ public RowMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, boolean isRange, UpdateParameters params, ColumnGroupMap group)
throws InvalidRequestException
{
QueryProcessor.validateKey(key);
RowMutation rm = new RowMutation(cfDef.cfm.ksName, key);
ColumnFamily cf = rm.addOrGet(columnFamily());
- int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
if (columns.isEmpty() && builder.componentCount() == 0)
{
// No columns, delete the row
- cf.delete(new DeletionInfo(getTimestamp(clientState), localDeleteTime));
+ cf.delete(new DeletionInfo(params.timestamp, params.localDeletionTime));
}
else
{
- boolean fullKey = builder.componentCount() == cfDef.columns.size();
- boolean isRange = cfDef.isCompact ? !fullKey : (!fullKey || columns.isEmpty());
-
- if (!columns.isEmpty())
- {
- if (isRange)
- throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s since %s specified", firstEmpty, columns.iterator().next()));
-
- for (ColumnIdentifier column : columns)
- {
- CFDefinition.Name name = cfDef.get(column);
- if (name == null)
- throw new InvalidRequestException(String.format("Unknown identifier %s", column));
-
- // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
- // list. However, we support having the value name for coherence with the static/sparse case
- if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
- throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", column));
- }
- }
-
if (isRange)
{
ByteBuffer start = builder.copy().build();
ByteBuffer end = builder.buildAsEndOfRange();
QueryProcessor.validateColumnName(start); // If start is good, end is too
- cf.delete(new DeletionInfo(start, end, cfDef.cfm.comparator, getTimestamp(clientState), localDeleteTime));
+ cf.addAtom(params.makeRangeTombstone(start, end));
}
else
{
// Delete specific columns
if (cfDef.isCompact)
{
- ByteBuffer columnName = builder.build();
- QueryProcessor.validateColumnName(columnName);
- cf.addTombstone(columnName, localDeleteTime, getTimestamp(clientState));
+ ByteBuffer columnName = builder.build();
+ QueryProcessor.validateColumnName(columnName);
+ cf.addColumn(params.makeTombstone(columnName));
}
else
{
- Iterator<ColumnIdentifier> iter = columns.iterator();
+ Iterator<Pair<CFDefinition.Name, Term>> iter = toRemove.iterator();
while (iter.hasNext())
{
- ColumnIdentifier column = iter.next();
- ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
- ByteBuffer columnName = b.add(column.key).build();
- QueryProcessor.validateColumnName(columnName);
- cf.addTombstone(columnName, localDeleteTime, getTimestamp(clientState));
+ Pair<CFDefinition.Name, Term> p = iter.next();
+ CFDefinition.Name column = p.left;
+ if (column.type instanceof CollectionType)
+ {
+ Term keySelected = p.right;
+ if (keySelected == null)
+ {
+ // Delete the whole collection
+ ByteBuffer start = builder.copy().add(column.name.key).build();
+ QueryProcessor.validateColumnName(start);
+ ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
+ ByteBuffer end = b.add(column.name.key).buildAsEndOfRange();
+ cf.addAtom(params.makeRangeTombstone(start, end));
+ }
+ else
+ {
+ builder.add(column.name.key);
+ CollectionType.Function fct = CollectionType.Function.DISCARD_KEY;
+ List<Term> args = Collections.singletonList(keySelected);
+
+ if (column.type instanceof ListType)
+ ((ListType)column.type).execute(cf, builder, fct, args, params, group == null ? null : group.getCollection(column.name.key));
+ else
+ ((CollectionType)column.type).executeFunction(cf, builder, fct, args, params);
+ }
+ }
+ else
+ {
+ ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder;
+ ByteBuffer columnName = b.add(column.name.key).build();
+ QueryProcessor.validateColumnName(columnName);
+ cf.addColumn(params.makeTombstone(columnName));
+ }
}
}
}
@@ -169,6 +204,24 @@ public class DeleteStatement extends ModificationStatement
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
cfDef = metadata.getCfDef();
UpdateStatement.processKeys(cfDef, whereClause, processedKeys, boundNames);
+
+ for (Selector column : columns)
+ {
+ CFDefinition.Name name = cfDef.get(column.id());
+ if (name == null)
+ throw new InvalidRequestException(String.format("Unknown identifier %s", column));
+
+ // For compact, we only have one value except the key, so the only form of DELETE that make sense is without a column
+ // list. However, we support having the value name for coherence with the static/sparse case
+ if (name.kind != CFDefinition.Name.Kind.COLUMN_METADATA && name.kind != CFDefinition.Name.Kind.VALUE_ALIAS)
+ throw new InvalidRequestException(String.format("Invalid identifier %s for deletion (should not be a PRIMARY KEY part)", column));
+
+ if (column.key() != null && !(name.type instanceof ListType || name.type instanceof MapType))
+ throw new InvalidRequestException(String.format("Invalid selection %s since %s is neither a list or a map", column, column.id()));
+
+ toRemove.add(Pair.create(name, column.key()));
+ }
+
return new ParsedStatement.Prepared(this, Arrays.<ColumnSpecification>asList(boundNames));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/91bdf7fb/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 34fffd4..353115a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -17,15 +17,19 @@
*/
package org.apache.cassandra.cql3.statements;
+import java.io.IOError;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.transport.messages.ResultMessage;
-import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.ConsistencyLevel;
@@ -75,8 +79,15 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws InvalidRequestException, UnavailableException, TimedOutException
{
- StorageProxy.mutate(getMutations(state, variables), getConsistencyLevel());
- return null;
+ try
+ {
+ StorageProxy.mutate(getMutations(state, variables), getConsistencyLevel());
+ return null;
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
}
public ConsistencyLevel getConsistencyLevel()
@@ -109,6 +120,46 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
return timeToLive;
}
+ public Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, CompositeType composite) throws UnavailableException, TimeoutException, InvalidRequestException
+ {
+ List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
+ for (ByteBuffer key : keys)
+ {
+ commands.add(new SliceFromReadCommand(keyspace(),
+ key,
+ new QueryPath(columnFamily()),
+ builder.copy().build(),
+ builder.copy().buildAsEndOfRange(),
+ false,
+ Integer.MAX_VALUE));
+ }
+
+ try
+ {
+ List<Row> rows = StorageProxy.read(commands, getConsistencyLevel());
+ Map<ByteBuffer, ColumnGroupMap> map = new HashMap<ByteBuffer, ColumnGroupMap>();
+ for (Row row : rows)
+ {
+ if (row.cf == null || row.cf.isEmpty())
+ continue;
+
+ ColumnGroupMap.Builder groupBuilder = new ColumnGroupMap.Builder(composite, true);
+ for (IColumn column : row.cf)
+ groupBuilder.add(column);
+
+ List<ColumnGroupMap> groups = groupBuilder.groups();
+ assert groups.isEmpty() || groups.size() == 1;
+ if (!groups.isEmpty())
+ map.put(row.key.key, groups.get(0));
+ }
+ return map;
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+
/**
* Convert statement into a list of mutations to apply on the server
*
@@ -118,7 +169,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- public abstract List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws InvalidRequestException;
+ public abstract List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables) throws UnavailableException, TimeoutException, InvalidRequestException;
public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException;
}