You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/05 15:36:56 UTC
svn commit: r1132402 - in /cassandra/branches/cassandra-0.8: ./
drivers/py/cql/ src/java/org/apache/cassandra/cql/ test/system/
Author: jbellis
Date: Sun Jun 5 13:36:55 2011
New Revision: 1132402
URL: http://svn.apache.org/viewvc?rev=1132402&view=rev
Log:
add cql counter support
patch by pyaskevich; reviewed by jbellis for CASSANDRA-2473
Added:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java
cassandra/branches/cassandra-0.8/test/system/test_cql.py
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Sun Jun 5 13:36:55 2011
@@ -1,11 +1,15 @@
0.8.1
- * add support for insert, delete in cql BATCH (CASSANDRA-2537)
- * add support for IN to cql SELECT, UPDATE (CASSANDRA-2553)
- * add timestamp support to cql INSERT, UPDATE, and BATCH (CASSANDRA-2555)
+ * CQL:
+ - support for insert, delete in BATCH (CASSANDRA-2537)
+ - support for IN to SELECT, UPDATE (CASSANDRA-2553)
+ - timestamp support for INSERT, UPDATE, and BATCH (CASSANDRA-2555)
+ - TTL support (CASSANDRA-2476)
+ - counter support (CASSANDRA-2473)
+ - improve JDBC spec compliance (CASSANDRA-2720)
+ - ALTER TABLE (CASSANDRA-1709)
* add support for comparator parameters and a generic ReverseType
(CASSANDRA-2355)
* add CompositeType and DynamicCompositeType (CASSANDRA-2231)
- * add CQL TTL support (CASSANDRA-2476)
* optimize batches containing multiple updates to the same row
(CASSANDRA-2583)
* adjust hinted handoff page size to avoid OOM with large columns
@@ -31,8 +35,6 @@
* Added statusthrift to nodetool to report if thrift server is running (CASSANDRA-2722)
* Fixed rows being cached if they do not exist (CASSANDRA-2723)
* fix truncate/compaction race (CASSANDRA-2673)
- * improve CQL JDBC spec compliance (CASSANDRA-2720)
- * add CQL ALTER TABLE (CASSANDRA-1709)
* Support passing tableName and cfName to RowCacheProviders (CASSANDRA-2702)
* workaround large resultsets causing large allocation retention
by nio sockets (CASSANDRA-2654)
Modified: cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py (original)
+++ cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py Sun Jun 5 13:36:55 2011
@@ -39,6 +39,7 @@ LONG_TYPE = "org.apache.cassandra.db.mar
UUID_TYPE = "org.apache.cassandra.db.marshal.UUIDType"
LEXICAL_UUID_TYPE = "org.apache.cassandra.db.marshal.LexicalType"
TIME_UUID_TYPE = "org.apache.cassandra.db.marshal.TimeUUIDType"
+COUNTER_COLUMN_TYPE = "org.apache.cassandra.db.marshal.CounterColumnType"
def prepare(query, params):
# For every match of the form ":param_name", call marshal
@@ -76,14 +77,15 @@ else:
def unmarshal_uuid(bytestr):
return UUID(bytes=bytestr)
-unmarshallers = {BYTES_TYPE: unmarshal_noop,
- ASCII_TYPE: unmarshal_noop,
- UTF8_TYPE: unmarshal_utf8,
- INTEGER_TYPE: unmarshal_int,
- LONG_TYPE: unmarshal_long,
- UUID_TYPE: unmarshal_uuid,
- LEXICAL_UUID_TYPE: unmarshal_uuid,
- TIME_UUID_TYPE: unmarshal_uuid}
+unmarshallers = {BYTES_TYPE: unmarshal_noop,
+ ASCII_TYPE: unmarshal_noop,
+ UTF8_TYPE: unmarshal_utf8,
+ INTEGER_TYPE: unmarshal_int,
+ LONG_TYPE: unmarshal_long,
+ UUID_TYPE: unmarshal_uuid,
+ LEXICAL_UUID_TYPE: unmarshal_uuid,
+ TIME_UUID_TYPE: unmarshal_uuid,
+ COUNTER_COLUMN_TYPE: unmarshal_long}
def decode_bigint(term):
val = int(term.encode('hex'), 16)
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java Sun Jun 5 13:36:55 2011
@@ -20,7 +20,7 @@
*/
package org.apache.cassandra.cql;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
@@ -95,8 +95,8 @@ public abstract class AbstractModificati
*
* @throws InvalidRequestException on the wrong request
*/
- public abstract List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState)
- throws InvalidRequestException;
+ public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState)
+ throws org.apache.cassandra.thrift.InvalidRequestException;
/**
* Convert statement into a list of mutations to apply on the server
@@ -109,37 +109,6 @@ public abstract class AbstractModificati
*
* @throws InvalidRequestException on the wrong request
*/
- public abstract List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp)
- throws InvalidRequestException;
-
- /**
- * Compute a row mutation for a single key
- *
- * @param key The key for mutation
- * @param keyspace The keyspace
- * @param timestamp The global timestamp for mutation
- *
- * @return row mutation
- *
- * @throws InvalidRequestException on the wrong request
- */
- public abstract RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp)
- throws InvalidRequestException;
-
- /**
- * Compute a row mutation for a single key and add it to the given RowMutation object
- *
- * @param mutation The row mutation to add computed mutation into
- * @param keyspace The keyspace
- * @param timestamp The global timestamp for mutation
- *
- * @throws InvalidRequestException on the wrong request
- */
- public abstract void mutationForKey(RowMutation mutation, String keyspace, Long timestamp)
- throws InvalidRequestException;
-
- /**
- * @return a list of the keys associated with the statement
- */
- public abstract List<Term> getKeys();
+ public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp)
+ throws org.apache.cassandra.thrift.InvalidRequestException;
}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java Sun Jun 5 13:36:55 2011
@@ -20,23 +20,14 @@
*/
package org.apache.cassandra.cql;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
-import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
-
/**
* A <code>BATCH</code> statement parsed from a CQL query.
*
@@ -85,41 +76,12 @@ public class BatchStatement
return timeToLive;
}
- public List<RowMutation> getMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+ public List<IMutation> getMutations(String keyspace, ClientState clientState) throws InvalidRequestException
{
- // To avoid unnecessary authorizations.
- List<String> seenColumnFamilies = new ArrayList<String>();
-
- List<RowMutation> batch = new LinkedList<RowMutation>();
-
- for (AbstractModification statement : statements)
- {
- final String columnFamily = statement.getColumnFamily();
-
- authorizeColumnFamily(keyspace, columnFamily, clientState, seenColumnFamilies);
-
- AbstractType<?> keyValidator = getKeyType(keyspace, columnFamily);
-
- for (Term rawKey : statement.getKeys()) // for each key of the statement
- {
- ByteBuffer key = rawKey.getByteBuffer(keyValidator);
+ List<IMutation> batch = new LinkedList<IMutation>();
- boolean found = false;
-
- for (RowMutation mutation : batch)
- {
- if (mutation.key().equals(key) && hasColumnFamily(mutation.getColumnFamilies(), columnFamily))
- {
- statement.mutationForKey(mutation, keyspace, timestamp);
-
- found = true;
- break;
- }
- }
-
- if (!found) // if mutation was not found we should add a new one
- batch.add(statement.mutationForKey(key, keyspace, timestamp));
- }
+ for (AbstractModification statement : statements) {
+ batch.addAll(statement.prepareRowMutations(keyspace, clientState, timestamp));
}
return batch;
@@ -130,34 +92,6 @@ public class BatchStatement
return timestamp != null;
}
- private boolean hasColumnFamily(Collection<ColumnFamily> columnFamilies, String columnFamily)
- {
- for (ColumnFamily cf : columnFamilies)
- {
- if (cf.metadata().cfName.equals(columnFamily))
- return true;
- }
-
- return false;
- }
-
- private void authorizeColumnFamily(String keyspace, String columnFamily, ClientState state, List<String> seenCFs)
- throws InvalidRequestException
- {
- validateColumnFamily(keyspace, columnFamily, false);
-
- if (!seenCFs.contains(columnFamily))
- {
- state.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
- seenCFs.add(columnFamily);
- }
- }
-
- public AbstractType<?> getKeyType(String keyspace, String columnFamily)
- {
- return DatabaseDescriptor.getCFMetaData(keyspace, columnFamily).getKeyValidator();
- }
-
public String toString()
{
return String.format("BatchStatement(statements=%s, consistency=%s)", statements, consistency);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g Sun Jun 5 13:36:55 2011
@@ -291,12 +291,12 @@ batchStatementObjective returns [Abstrac
updateStatement returns [UpdateStatement expr]
: {
Attributes attrs = new Attributes();
- Map<Term, Term> columns = new HashMap<Term, Term>();
+ Map<Term, Operation> columns = new HashMap<Term, Operation>();
List<Term> keyList = null;
}
K_UPDATE columnFamily=( IDENT | STRING_LITERAL | INTEGER )
( usingClause[attrs] )?
- K_SET termPair[columns] (',' termPair[columns])*
+ K_SET termPairWithOperation[columns] (',' termPairWithOperation[columns])*
K_WHERE ( K_KEY '=' key=term { keyList = Collections.singletonList(key); }
|
K_KEY K_IN '(' keys=termList { keyList = $keys.items; } ')' )
@@ -416,7 +416,7 @@ dropColumnFamilyStatement returns [Strin
;
comparatorType
- : 'bytea' | 'ascii' | 'text' | 'varchar' | 'int' | 'varint' | 'bigint' | 'uuid'
+ : 'bytea' | 'ascii' | 'text' | 'varchar' | 'int' | 'varint' | 'bigint' | 'uuid' | 'counter'
;
term returns [Term item]
@@ -433,6 +433,12 @@ termPair[Map<Term, Term> columns]
: key=term '=' value=term { columns.put(key, value); }
;
+termPairWithOperation[Map<Term, Operation> columns]
+ : key=term '=' (value=term { columns.put(key, new Operation(value)); }
+ | c=term ( '+' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.PLUS, v)); }
+ | '-' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.MINUS, v)); } ))
+ ;
+
// Note: ranges are inclusive so >= and >, and < and <= all have the same semantics.
relation returns [Relation rel]
: { Term entity = new Term("KEY", STRING_LITERAL); }
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java Sun Jun 5 13:36:55 2011
@@ -71,6 +71,7 @@ public class CreateColumnFamilyStatement
comparators.put("int", "LongType");
comparators.put("bigint", "LongType");
comparators.put("uuid", "UUIDType");
+ comparators.put("counter", "CounterColumnType");
keywords.add(KW_COMPARATOR);
keywords.add(KW_COMMENT);
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java Sun Jun 5 13:36:55 2011
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -66,18 +67,18 @@ public class DeleteStatement extends Abs
}
/** {@inheritDoc} */
- public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+ public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
{
return prepareRowMutations(keyspace, clientState, null);
}
/** {@inheritDoc} */
- public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
+ public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
{
clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace, columnFamily).getKeyValidator();
- List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+ List<IMutation> rowMutations = new ArrayList<IMutation>();
for (Term key : keys)
{
@@ -100,7 +101,8 @@ public class DeleteStatement extends Abs
/** {@inheritDoc} */
public void mutationForKey(RowMutation mutation, String keyspace, Long timestamp) throws InvalidRequestException
{
- CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+ CFMetaData metadata = validateColumnFamily(keyspace, columnFamily);
+
AbstractType comparator = metadata.getComparatorFor(null);
if (columns.size() < 1) // No columns, delete the row
Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java?rev=1132402&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java Sun Jun 5 13:36:55 2011
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.cql;
+
+public class Operation
+{
+ public static enum OperationType
+ { PLUS, MINUS }
+
+ public final OperationType type;
+ public final Term a, b;
+
+ // unary operation
+ public Operation(Term a)
+ {
+ this.a = a;
+ type = null;
+ b = null;
+ }
+
+ // binary operation
+ public Operation(Term a, OperationType type, Term b)
+ {
+ this.a = a;
+ this.type = type;
+ this.b = b;
+ }
+
+ public boolean isUnary()
+ {
+ return type == null && b == null;
+ }
+
+ public String toString()
+ {
+ return (isUnary())
+ ? String.format("UnaryOperation(%s)", a)
+ : String.format("BinaryOperation(%s, %s, %s)", a, type, b);
+ }
+}
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Jun 5 13:36:55 2011
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeoutExcep
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
+import org.apache.cassandra.db.CounterColumn;
+import org.apache.cassandra.db.context.CounterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,7 +68,7 @@ public class QueryProcessor
throws InvalidRequestException, TimedOutException, UnavailableException
{
QueryPath queryPath = new QueryPath(select.getColumnFamily());
- CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
List<ReadCommand> commands = new ArrayList<ReadCommand>();
// ...of a list of column names
@@ -160,7 +162,7 @@ public class QueryProcessor
}
AbstractBounds bounds = new Bounds(startToken, finishToken);
- CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
// XXX: Our use of Thrift structs internally makes me Sad. :(
SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -213,7 +215,7 @@ public class QueryProcessor
private static List<org.apache.cassandra.db.Row> getIndexedSlices(String keyspace, SelectStatement select)
throws TimedOutException, UnavailableException, InvalidRequestException
{
- CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
// XXX: Our use of Thrift structs internally (still) makes me Sad. :~(
SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -260,7 +262,7 @@ public class QueryProcessor
throws InvalidRequestException, UnavailableException, TimedOutException
{
String keyspace = clientState.getKeyspace();
- List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+ List<IMutation> rowMutations = new ArrayList<IMutation>();
List<String> cfamsSeen = new ArrayList<String>();
for (UpdateStatement update : updateStatements)
@@ -490,7 +492,7 @@ public class QueryProcessor
case SELECT:
SelectStatement select = (SelectStatement)statement.statement;
clientState.hasColumnFamilyAccess(select.getColumnFamily(), Permission.READ);
- metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+ metadata = validateColumnFamily(keyspace, select.getColumnFamily());
validateSelect(keyspace, select);
List<org.apache.cassandra.db.Row> rows;
@@ -840,7 +842,7 @@ public class QueryProcessor
{
if (c.isMarkedForDelete())
continue;
- thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+ thriftColumns.add(thriftify(c));
}
}
else
@@ -867,16 +869,25 @@ public class QueryProcessor
{
throw new AssertionError(e);
}
+
IColumn c = row.cf.getColumn(name);
if (c == null || c.isMarkedForDelete())
thriftColumns.add(new Column().setName(name));
else
- thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+ thriftColumns.add(thriftify(c));
}
}
return thriftColumns;
}
+ private static Column thriftify(IColumn c)
+ {
+ ByteBuffer value = (c instanceof CounterColumn)
+ ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
+ : c.value();
+ return new Column(c.name()).setValue(value).setTimestamp(c.timestamp());
+ }
+
private static String getKeyString(CFMetaData metadata)
{
String keyString;
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java Sun Jun 5 13:36:55 2011
@@ -26,6 +26,8 @@ import java.util.*;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -35,6 +37,7 @@ import org.apache.cassandra.thrift.Inval
import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
+import static org.apache.cassandra.cql.Operation.OperationType;
import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
/**
@@ -43,7 +46,7 @@ import static org.apache.cassandra.thrif
*/
public class UpdateStatement extends AbstractModification
{
- private Map<Term, Term> columns;
+ private Map<Term, Operation> columns;
private List<Term> columnNames, columnValues;
private List<Term> keys;
@@ -57,7 +60,7 @@ public class UpdateStatement extends Abs
* @param attrs additional attributes for statement (CL, timestamp, timeToLive)
*/
public UpdateStatement(String columnFamily,
- Map<Term, Term> columns,
+ Map<Term, Operation> columns,
List<Term> keys,
Attributes attrs)
{
@@ -113,17 +116,28 @@ public class UpdateStatement extends Abs
}
/** {@inheritDoc} */
- public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+ public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
{
return prepareRowMutations(keyspace, clientState, null);
}
/** {@inheritDoc} */
- public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
+ public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
{
List<String> cfamsSeen = new ArrayList<String>();
- CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+ boolean hasCommutativeOperation = false;
+
+ for (Map.Entry<Term, Operation> column : getColumns().entrySet())
+ {
+ if (!column.getValue().isUnary())
+ hasCommutativeOperation = true;
+
+ if (hasCommutativeOperation && column.getValue().isUnary())
+ throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+ }
+
+ CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, hasCommutativeOperation);
// Avoid unnecessary authorizations.
if (!(cfamsSeen.contains(columnFamily)))
@@ -132,7 +146,7 @@ public class UpdateStatement extends Abs
cfamsSeen.add(columnFamily);
}
- List<RowMutation> rowMutations = new LinkedList<RowMutation>();
+ List<IMutation> rowMutations = new LinkedList<IMutation>();
for (Term key: keys)
{
@@ -154,43 +168,61 @@ public class UpdateStatement extends Abs
*
* @throws InvalidRequestException on the wrong request
*/
- private RowMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
- {
- RowMutation rm = new RowMutation(keyspace, key);
-
- mutationForKey(rm, keyspace, metadata, timestamp);
-
- return rm;
- }
-
- /** {@inheritDoc} */
- public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp) throws InvalidRequestException
- {
- return mutationForKey(keyspace, key, validateColumnFamily(keyspace, columnFamily, false), timestamp);
- }
-
- /** {@inheritDoc} */
- public void mutationForKey(RowMutation mutation, String keyspace, Long timestamp) throws InvalidRequestException
- {
- mutationForKey(mutation, keyspace, validateColumnFamily(keyspace, columnFamily, false), timestamp);
- }
-
- private void mutationForKey(RowMutation mutation, String keyspace, CFMetaData metadata, Long timestamp) throws InvalidRequestException
+ private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
{
AbstractType<?> comparator = getComparator(keyspace);
- for (Map.Entry<Term, Term> column : getColumns().entrySet())
+ // if true we need to wrap RowMutation into CounterMutation
+ boolean hasCounterColumn = false;
+ RowMutation rm = new RowMutation(keyspace, key);
+
+ for (Map.Entry<Term, Operation> column : getColumns().entrySet())
{
ByteBuffer colName = column.getKey().getByteBuffer(comparator);
- ByteBuffer colValue = column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
+ Operation op = column.getValue();
- validateColumn(metadata, colName, colValue);
+ if (op.isUnary())
+ {
+ if (hasCounterColumn)
+ throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+
+ ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName));
+
+ validateColumn(metadata, colName, colValue);
+ rm.add(new QueryPath(columnFamily, null, colName),
+ colValue,
+ (timestamp == null) ? getTimestamp() : timestamp,
+ getTimeToLive());
+ }
+ else
+ {
+ hasCounterColumn = true;
+
+ if (!column.getKey().getText().equals(op.a.getText()))
+ throw new InvalidRequestException("Only expressions like X = X + <long> are supported.");
+
+ long value;
+
+ try
+ {
+ value = Long.parseLong(op.b.getText());
+
+ if (op.type == OperationType.MINUS)
+ {
+ value *= -1;
+ }
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.",
+ op.b.getText()));
+ }
- mutation.add(new QueryPath(columnFamily, null, colName),
- colValue,
- (timestamp == null) ? getTimestamp() : timestamp,
- getTimeToLive());
+ rm.addCounter(new QueryPath(columnFamily, null, colName), value);
+ }
}
+
+ return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
}
public String getColumnFamily()
@@ -203,8 +235,8 @@ public class UpdateStatement extends Abs
{
return keys;
}
-
- public Map<Term, Term> getColumns() throws InvalidRequestException
+
+ public Map<Term, Operation> getColumns() throws InvalidRequestException
{
// Created from an UPDATE
if (columns != null)
@@ -218,11 +250,11 @@ public class UpdateStatement extends Abs
if (columnNames.size() < 1)
throw new InvalidRequestException("no columns specified for INSERT");
- columns = new HashMap<Term, Term>();
+ columns = new HashMap<Term, Operation>();
for (int i = 0; i < columnNames.size(); i++)
- columns.put(columnNames.get(i), columnValues.get(i));
-
+ columns.put(columnNames.get(i), new Operation(columnValues.get(i)));
+
return columns;
}
Modified: cassandra/branches/cassandra-0.8/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/system/test_cql.py?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/system/test_cql.py (original)
+++ cassandra/branches/cassandra-0.8/test/system/test_cql.py Sun Jun 5 13:36:55 2011
@@ -75,6 +75,10 @@ def load_sample(dbconn):
CREATE COLUMNFAMILY IndexedA (KEY text PRIMARY KEY, birthdate int)
WITH comparator = ascii AND default_validation = ascii;
""")
+ dbconn.execute("""
+ CREATE COLUMNFAMILY CounterCF (KEY text PRIMARY KEY, count_me counter)
+ WITH comparator = ascii AND default_validation = counter;
+ """)
dbconn.execute("CREATE INDEX ON IndexedA (birthdate)")
query = "UPDATE StandardString1 SET :c1 = :v1, :c2 = :v2 WHERE KEY = :key"
@@ -1081,3 +1085,76 @@ class TestCql(ThriftTester):
assert_raises(cql.ProgrammingError,
cursor.execute,
"ALTER TABLE NewCf1 DROP name")
+
+ def test_counter_column_support(self):
+ "update statement should be able to work with counter columns"
+ cursor = init()
+
+ # increment counter
+ cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 2, \
+ "unrecognized value '%s'" % r[1]
+
+ cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 4, \
+ "unrecognized value '%s'" % r[1]
+
+ # decrement counter
+ cursor.execute("UPDATE CounterCF SET count_me = count_me - 4 WHERE key = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 0, \
+ "unrecognized value '%s'" % r[1]
+
+ cursor.execute("SELECT * FROM CounterCF")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+
+ assert colnames[1] == "count_me", \
+ "unrecognized name '%s'" % colnames[1]
+
+ r = cursor.fetchone()
+ assert r[1] == 0, \
+ "unrecognized value '%s'" % r[1]
+
+ # deleting a counter column
+ cursor.execute("DELETE count_me FROM CounterCF WHERE KEY = 'counter1'")
+ cursor.execute("SELECT * FROM CounterCF")
+ assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+ assert len(colnames) == 1
+
+ r = cursor.fetchone()
+ assert len(r) == 1
+
+ # can't mix counter and normal statements
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "UPDATE CounterCF SET count_me = count_me + 2, x = 'a' WHERE key = 'counter1'")
+
+ # column names must match
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ "UPDATE CounterCF SET count_me = count_not_me + 2 WHERE key = 'counter1'")