You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/05 15:36:56 UTC

svn commit: r1132402 - in /cassandra/branches/cassandra-0.8: ./ drivers/py/cql/ src/java/org/apache/cassandra/cql/ test/system/

Author: jbellis
Date: Sun Jun  5 13:36:55 2011
New Revision: 1132402

URL: http://svn.apache.org/viewvc?rev=1132402&view=rev
Log:
add cql counter support
patch by pyaskevich; reviewed by jbellis for CASSANDRA-2473

Added:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java
Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java
    cassandra/branches/cassandra-0.8/test/system/test_cql.py

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Sun Jun  5 13:36:55 2011
@@ -1,11 +1,15 @@
 0.8.1
- * add support for insert, delete in cql BATCH (CASSANDRA-2537)
- * add support for IN to cql SELECT, UPDATE (CASSANDRA-2553)
- * add timestamp support to cql INSERT, UPDATE, and BATCH (CASSANDRA-2555)
+ * CQL:
+   - support for insert, delete in BATCH (CASSANDRA-2537)
+   - support for IN to SELECT, UPDATE (CASSANDRA-2553)
+   - timestamp support for INSERT, UPDATE, and BATCH (CASSANDRA-2555)
+   - TTL support (CASSANDRA-2476)
+   - counter support (CASSANDRA-2473)
+   - improve JDBC spec compliance (CASSANDRA-2720)
+   - ALTER TABLE (CASSANDRA-1709)
  * add support for comparator parameters and a generic ReverseType
    (CASSANDRA-2355)
  * add CompositeType and DynamicCompositeType (CASSANDRA-2231)
- * add CQL TTL support (CASSANDRA-2476)
  * optimize batches containing multiple updates to the same row
    (CASSANDRA-2583)
  * adjust hinted handoff page size to avoid OOM with large columns 
@@ -31,8 +35,6 @@
  * Added statusthrift to nodetool to report if thrift server is running (CASSANDRA-2722)
  * Fixed rows being cached if they do not exist (CASSANDRA-2723)
  * fix truncate/compaction race (CASSANDRA-2673)
- * improve CQL JDBC spec compliance (CASSANDRA-2720)
- * add CQL ALTER TABLE (CASSANDRA-1709)
  * Support passing tableName and cfName to RowCacheProviders (CASSANDRA-2702)
  * workaround large resultsets causing large allocation retention
    by nio sockets (CASSANDRA-2654)

Modified: cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py (original)
+++ cassandra/branches/cassandra-0.8/drivers/py/cql/marshal.py Sun Jun  5 13:36:55 2011
@@ -39,6 +39,7 @@ LONG_TYPE = "org.apache.cassandra.db.mar
 UUID_TYPE = "org.apache.cassandra.db.marshal.UUIDType"
 LEXICAL_UUID_TYPE = "org.apache.cassandra.db.marshal.LexicalType"
 TIME_UUID_TYPE = "org.apache.cassandra.db.marshal.TimeUUIDType"
+COUNTER_COLUMN_TYPE = "org.apache.cassandra.db.marshal.CounterColumnType"
 
 def prepare(query, params):
     # For every match of the form ":param_name", call marshal
@@ -76,14 +77,15 @@ else:
 def unmarshal_uuid(bytestr):
     return UUID(bytes=bytestr)
 
-unmarshallers = {BYTES_TYPE:        unmarshal_noop,
-                 ASCII_TYPE:        unmarshal_noop,
-                 UTF8_TYPE:         unmarshal_utf8,
-                 INTEGER_TYPE:      unmarshal_int,
-                 LONG_TYPE:         unmarshal_long,
-                 UUID_TYPE:         unmarshal_uuid,
-                 LEXICAL_UUID_TYPE: unmarshal_uuid,
-                 TIME_UUID_TYPE:    unmarshal_uuid}
+unmarshallers = {BYTES_TYPE:          unmarshal_noop,
+                 ASCII_TYPE:          unmarshal_noop,
+                 UTF8_TYPE:           unmarshal_utf8,
+                 INTEGER_TYPE:        unmarshal_int,
+                 LONG_TYPE:           unmarshal_long,
+                 UUID_TYPE:           unmarshal_uuid,
+                 LEXICAL_UUID_TYPE:   unmarshal_uuid,
+                 TIME_UUID_TYPE:      unmarshal_uuid,
+                 COUNTER_COLUMN_TYPE: unmarshal_long}
 
 def decode_bigint(term):
     val = int(term.encode('hex'), 16)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/AbstractModification.java Sun Jun  5 13:36:55 2011
@@ -20,7 +20,7 @@
  */
 package org.apache.cassandra.cql;
 
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.InvalidRequestException;
@@ -95,8 +95,8 @@ public abstract class AbstractModificati
      *
      * @throws InvalidRequestException on the wrong request
      */
-    public abstract List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState)
-            throws InvalidRequestException;
+    public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState)
+            throws org.apache.cassandra.thrift.InvalidRequestException;
 
     /**
      * Convert statement into a list of mutations to apply on the server
@@ -109,37 +109,6 @@ public abstract class AbstractModificati
      *
      * @throws InvalidRequestException on the wrong request
      */
-    public abstract List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp)
-            throws InvalidRequestException;
-
-    /**
-     * Compute a row mutation for a single key
-     *
-     * @param key The key for mutation
-     * @param keyspace The keyspace
-     * @param timestamp The global timestamp for mutation
-     *
-     * @return row mutation
-     *
-     * @throws InvalidRequestException on the wrong request
-     */
-    public abstract RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp)
-        throws InvalidRequestException;
-
-    /**
-     * Compute a row mutation for a single key and add it to the given RowMutation object
-     *
-     * @param mutation The row mutation to add computed mutation into
-     * @param keyspace The keyspace
-     * @param timestamp The global timestamp for mutation
-     *
-     * @throws InvalidRequestException on the wrong request
-     */
-    public abstract void mutationForKey(RowMutation mutation, String keyspace, Long timestamp)
-            throws InvalidRequestException;
-
-    /**
-     * @return a list of the keys associated with the statement
-     */
-    public abstract List<Term> getKeys();
+    public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp)
+            throws org.apache.cassandra.thrift.InvalidRequestException;
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/BatchStatement.java Sun Jun  5 13:36:55 2011
@@ -20,23 +20,14 @@
  */
 package org.apache.cassandra.cql;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.cassandra.auth.Permission;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.InvalidRequestException;
 
-import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
-
 /**
  * A <code>BATCH</code> statement parsed from a CQL query.
  *
@@ -85,41 +76,12 @@ public class BatchStatement
         return timeToLive;
     }
 
-    public List<RowMutation> getMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+    public List<IMutation> getMutations(String keyspace, ClientState clientState) throws InvalidRequestException
     {
-        // To avoid unnecessary authorizations.
-        List<String> seenColumnFamilies = new ArrayList<String>();
-
-        List<RowMutation> batch = new LinkedList<RowMutation>();
-
-        for (AbstractModification statement : statements)
-        {
-            final String columnFamily = statement.getColumnFamily();
-
-            authorizeColumnFamily(keyspace, columnFamily, clientState, seenColumnFamilies);
-
-            AbstractType<?> keyValidator = getKeyType(keyspace, columnFamily);
-
-            for (Term rawKey : statement.getKeys()) // for each key of the statement
-            {
-                ByteBuffer key = rawKey.getByteBuffer(keyValidator);
+        List<IMutation> batch = new LinkedList<IMutation>();
 
-                boolean found = false;
-
-                for (RowMutation mutation : batch)
-                {
-                    if (mutation.key().equals(key) && hasColumnFamily(mutation.getColumnFamilies(), columnFamily))
-                    {
-                        statement.mutationForKey(mutation, keyspace, timestamp);
-
-                        found = true;
-                        break;
-                    }
-                }
-
-                if (!found) // if mutation was not found we should add a new one
-                    batch.add(statement.mutationForKey(key, keyspace, timestamp));
-            }
+        for (AbstractModification statement : statements) {
+            batch.addAll(statement.prepareRowMutations(keyspace, clientState, timestamp));
         }
 
         return batch;
@@ -130,34 +92,6 @@ public class BatchStatement
         return timestamp != null;
     }
 
-    private boolean hasColumnFamily(Collection<ColumnFamily> columnFamilies, String columnFamily)
-    {
-        for (ColumnFamily cf : columnFamilies)
-        {
-            if (cf.metadata().cfName.equals(columnFamily))
-                return true;
-        }
-
-        return false;
-    }
-
-    private void authorizeColumnFamily(String keyspace, String columnFamily, ClientState state, List<String> seenCFs)
-    throws InvalidRequestException
-    {
-        validateColumnFamily(keyspace, columnFamily, false);
-
-        if (!seenCFs.contains(columnFamily))
-        {
-            state.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
-            seenCFs.add(columnFamily);
-        }
-    }
-
-    public AbstractType<?> getKeyType(String keyspace, String columnFamily)
-    {
-        return DatabaseDescriptor.getCFMetaData(keyspace, columnFamily).getKeyValidator();
-    }
-
     public String toString()
     {
         return String.format("BatchStatement(statements=%s, consistency=%s)", statements, consistency);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Cql.g Sun Jun  5 13:36:55 2011
@@ -291,12 +291,12 @@ batchStatementObjective returns [Abstrac
 updateStatement returns [UpdateStatement expr]
     : {
           Attributes attrs = new Attributes();
-          Map<Term, Term> columns = new HashMap<Term, Term>();
+          Map<Term, Operation> columns = new HashMap<Term, Operation>();
           List<Term> keyList = null;
       }
       K_UPDATE columnFamily=( IDENT | STRING_LITERAL | INTEGER )
           ( usingClause[attrs] )?
-          K_SET termPair[columns] (',' termPair[columns])*
+          K_SET termPairWithOperation[columns] (',' termPairWithOperation[columns])*
           K_WHERE ( K_KEY '=' key=term { keyList = Collections.singletonList(key); }
                     |
                     K_KEY K_IN '(' keys=termList { keyList = $keys.items; } ')' )
@@ -416,7 +416,7 @@ dropColumnFamilyStatement returns [Strin
     ;
 
 comparatorType
-    : 'bytea' | 'ascii' | 'text' | 'varchar' | 'int' | 'varint' | 'bigint' | 'uuid'
+    : 'bytea' | 'ascii' | 'text' | 'varchar' | 'int' | 'varint' | 'bigint' | 'uuid' | 'counter'
     ;
 
 term returns [Term item]
@@ -433,6 +433,12 @@ termPair[Map<Term, Term> columns]
     :   key=term '=' value=term { columns.put(key, value); }
     ;
 
+termPairWithOperation[Map<Term, Operation> columns]
+    : key=term '=' (value=term { columns.put(key, new Operation(value)); }
+		    | c=term ( '+' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.PLUS, v)); }
+                            | '-' v=term { columns.put(key, new Operation(c, org.apache.cassandra.cql.Operation.OperationType.MINUS, v)); } ))
+    ;
+
 // Note: ranges are inclusive so >= and >, and < and <= all have the same semantics.  
 relation returns [Relation rel]
     : { Term entity = new Term("KEY", STRING_LITERAL); }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java Sun Jun  5 13:36:55 2011
@@ -71,6 +71,7 @@ public class CreateColumnFamilyStatement
         comparators.put("int", "LongType");
         comparators.put("bigint", "LongType");
         comparators.put("uuid", "UUIDType");
+        comparators.put("counter", "CounterColumnType");
 
         keywords.add(KW_COMPARATOR);
         keywords.add(KW_COMMENT);

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/DeleteStatement.java Sun Jun  5 13:36:55 2011
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -66,18 +67,18 @@ public class DeleteStatement extends Abs
     }
 
     /** {@inheritDoc} */
-    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
     {
         return prepareRowMutations(keyspace, clientState, null);
     }
 
     /** {@inheritDoc} */
-    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
+    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
     {
         clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
         AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace, columnFamily).getKeyValidator();
 
-        List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+        List<IMutation> rowMutations = new ArrayList<IMutation>();
 
         for (Term key : keys)
         {
@@ -100,7 +101,8 @@ public class DeleteStatement extends Abs
     /** {@inheritDoc} */
     public void mutationForKey(RowMutation mutation, String keyspace, Long timestamp) throws InvalidRequestException
     {
-        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily);
+
         AbstractType comparator = metadata.getComparatorFor(null);
 
         if (columns.size() < 1) // No columns, delete the row

Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java?rev=1132402&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java (added)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/Operation.java Sun Jun  5 13:36:55 2011
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.cql;
+
+public class Operation
+{
+    public static enum OperationType
+    { PLUS, MINUS }
+
+    public final OperationType type;
+    public final Term a, b;
+
+    // unary operation
+    public Operation(Term a)
+    {
+        this.a = a;
+        type = null;
+        b = null;
+    }
+
+    // binary operation
+    public Operation(Term a, OperationType type, Term b)
+    {
+        this.a = a;
+        this.type = type;
+        this.b = b;
+    }
+
+    public boolean isUnary()
+    {
+        return type == null && b == null;
+    }
+
+    public String toString()
+    {
+        return (isUnary())
+                ? String.format("UnaryOperation(%s)", a)
+                : String.format("BinaryOperation(%s, %s, %s)", a, type, b);
+    }
+}

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Jun  5 13:36:55 2011
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeoutExcep
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
+import org.apache.cassandra.db.CounterColumn;
+import org.apache.cassandra.db.context.CounterContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +68,7 @@ public class QueryProcessor
     throws InvalidRequestException, TimedOutException, UnavailableException
     {
         QueryPath queryPath = new QueryPath(select.getColumnFamily());
-        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
         List<ReadCommand> commands = new ArrayList<ReadCommand>();
 
         // ...of a list of column names
@@ -160,7 +162,7 @@ public class QueryProcessor
         }
         AbstractBounds bounds = new Bounds(startToken, finishToken);
         
-        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
         // XXX: Our use of Thrift structs internally makes me Sad. :(
         SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
         validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -213,7 +215,7 @@ public class QueryProcessor
     private static List<org.apache.cassandra.db.Row> getIndexedSlices(String keyspace, SelectStatement select)
     throws TimedOutException, UnavailableException, InvalidRequestException
     {
-        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+        CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily());
         // XXX: Our use of Thrift structs internally (still) makes me Sad. :~(
         SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata);
         validateSlicePredicate(metadata, thriftSlicePredicate);
@@ -260,7 +262,7 @@ public class QueryProcessor
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         String keyspace = clientState.getKeyspace();
-        List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+        List<IMutation> rowMutations = new ArrayList<IMutation>();
         List<String> cfamsSeen = new ArrayList<String>();
 
         for (UpdateStatement update : updateStatements)
@@ -490,7 +492,7 @@ public class QueryProcessor
             case SELECT:
                 SelectStatement select = (SelectStatement)statement.statement;
                 clientState.hasColumnFamilyAccess(select.getColumnFamily(), Permission.READ);
-                metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false);
+                metadata = validateColumnFamily(keyspace, select.getColumnFamily());
                 validateSelect(keyspace, select);
                 
                 List<org.apache.cassandra.db.Row> rows;
@@ -840,7 +842,7 @@ public class QueryProcessor
             {
                 if (c.isMarkedForDelete())
                     continue;
-                thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+                thriftColumns.add(thriftify(c));
             }
         }
         else
@@ -867,16 +869,25 @@ public class QueryProcessor
                 {
                     throw new AssertionError(e);
                 }
+
                 IColumn c = row.cf.getColumn(name);
                 if (c == null || c.isMarkedForDelete())
                     thriftColumns.add(new Column().setName(name));
                 else
-                    thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp()));
+                    thriftColumns.add(thriftify(c));
             }
         }
         return thriftColumns;
     }
 
+    private static Column thriftify(IColumn c)
+    {
+        ByteBuffer value = (c instanceof CounterColumn)
+                           ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value()))
+                           : c.value();
+        return new Column(c.name()).setValue(value).setTimestamp(c.timestamp());
+    }
+
     private static String getKeyString(CFMetaData metadata)
     {
         String keyString;

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/UpdateStatement.java Sun Jun  5 13:36:55 2011
@@ -26,6 +26,8 @@ import java.util.*;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -35,6 +37,7 @@ import org.apache.cassandra.thrift.Inval
 
 import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
 
+import static org.apache.cassandra.cql.Operation.OperationType;
 import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
 
 /**
@@ -43,7 +46,7 @@ import static org.apache.cassandra.thrif
  */
 public class UpdateStatement extends AbstractModification
 {
-    private Map<Term, Term> columns;
+    private Map<Term, Operation> columns;
     private List<Term> columnNames, columnValues;
     private List<Term> keys;
     
@@ -57,7 +60,7 @@ public class UpdateStatement extends Abs
      * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
      */
     public UpdateStatement(String columnFamily,
-                           Map<Term, Term> columns,
+                           Map<Term, Operation> columns,
                            List<Term> keys,
                            Attributes attrs)
     {
@@ -113,17 +116,28 @@ public class UpdateStatement extends Abs
     }
 
     /** {@inheritDoc} */
-    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
     {
         return prepareRowMutations(keyspace, clientState, null);
     }
 
     /** {@inheritDoc} */
-    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
+    public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException
     {
         List<String> cfamsSeen = new ArrayList<String>();
 
-        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+        boolean hasCommutativeOperation = false;
+
+        for (Map.Entry<Term, Operation> column : getColumns().entrySet())
+        {
+            if (!column.getValue().isUnary())
+                hasCommutativeOperation = true;
+
+            if (hasCommutativeOperation && column.getValue().isUnary())
+                throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+        }
+
+        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, hasCommutativeOperation);
 
         // Avoid unnecessary authorizations.
         if (!(cfamsSeen.contains(columnFamily)))
@@ -132,7 +146,7 @@ public class UpdateStatement extends Abs
             cfamsSeen.add(columnFamily);
         }
 
-        List<RowMutation> rowMutations = new LinkedList<RowMutation>();
+        List<IMutation> rowMutations = new LinkedList<IMutation>();
 
         for (Term key: keys)
         {
@@ -154,43 +168,61 @@ public class UpdateStatement extends Abs
      *
      * @throws InvalidRequestException on the wrong request
      */
-    private RowMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
-    {
-        RowMutation rm = new RowMutation(keyspace, key);
-
-        mutationForKey(rm, keyspace, metadata, timestamp);
-
-        return rm;
-    }
-
-    /** {@inheritDoc} */
-    public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp) throws InvalidRequestException
-    {
-        return mutationForKey(keyspace, key, validateColumnFamily(keyspace, columnFamily, false), timestamp);
-    }
-
-    /** {@inheritDoc} */
-    public void mutationForKey(RowMutation mutation, String keyspace, Long timestamp) throws InvalidRequestException
-    {
-        mutationForKey(mutation, keyspace, validateColumnFamily(keyspace, columnFamily, false), timestamp);
-    }
-
-    private void mutationForKey(RowMutation mutation, String keyspace, CFMetaData metadata, Long timestamp) throws InvalidRequestException
+    private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException
     {
         AbstractType<?> comparator = getComparator(keyspace);
 
-        for (Map.Entry<Term, Term> column : getColumns().entrySet())
+        // if true we need to wrap RowMutation into CounterMutation
+        boolean hasCounterColumn = false;
+        RowMutation rm = new RowMutation(keyspace, key);
+
+        for (Map.Entry<Term, Operation> column : getColumns().entrySet())
         {
             ByteBuffer colName = column.getKey().getByteBuffer(comparator);
-            ByteBuffer colValue = column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
+            Operation op = column.getValue();
 
-            validateColumn(metadata, colName, colValue);
+            if (op.isUnary())
+            {
+                if (hasCounterColumn)
+                    throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed.");
+
+                ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName));
+
+                validateColumn(metadata, colName, colValue);
+                rm.add(new QueryPath(columnFamily, null, colName),
+                       colValue,
+                       (timestamp == null) ? getTimestamp() : timestamp,
+                       getTimeToLive());
+            }
+            else
+            {
+                hasCounterColumn = true;
+
+                if (!column.getKey().getText().equals(op.a.getText()))
+                    throw new InvalidRequestException("Only expressions like X = X + <long> are supported.");
+
+                long value;
+
+                try
+                {
+                    value = Long.parseLong(op.b.getText());
+
+                    if (op.type == OperationType.MINUS)
+                    {
+                        value *= -1;
+                    }
+                }
+                catch (NumberFormatException e)
+                {
+                    throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.",
+                                                      op.b.getText()));
+                }
 
-            mutation.add(new QueryPath(columnFamily, null, colName),
-                         colValue,
-                         (timestamp == null) ? getTimestamp() : timestamp,
-                         getTimeToLive());
+                rm.addCounter(new QueryPath(columnFamily, null, colName), value);
+            }
         }
+
+        return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
     }
 
     public String getColumnFamily()
@@ -203,8 +235,8 @@ public class UpdateStatement extends Abs
     {
         return keys;
     }
-
-    public Map<Term, Term> getColumns() throws InvalidRequestException
+    
+    public Map<Term, Operation> getColumns() throws InvalidRequestException
     {
         // Created from an UPDATE
         if (columns != null)
@@ -218,11 +250,11 @@ public class UpdateStatement extends Abs
         if (columnNames.size() < 1)
             throw new InvalidRequestException("no columns specified for INSERT");
         
-        columns = new HashMap<Term, Term>();
+        columns = new HashMap<Term, Operation>();
         
         for (int i = 0; i < columnNames.size(); i++)
-            columns.put(columnNames.get(i), columnValues.get(i));
-        
+            columns.put(columnNames.get(i), new Operation(columnValues.get(i)));
+
         return columns;
     }
     

Modified: cassandra/branches/cassandra-0.8/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/system/test_cql.py?rev=1132402&r1=1132401&r2=1132402&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/system/test_cql.py (original)
+++ cassandra/branches/cassandra-0.8/test/system/test_cql.py Sun Jun  5 13:36:55 2011
@@ -75,6 +75,10 @@ def load_sample(dbconn):
         CREATE COLUMNFAMILY IndexedA (KEY text PRIMARY KEY, birthdate int)
             WITH comparator = ascii AND default_validation = ascii;
     """)
+    dbconn.execute("""
+        CREATE COLUMNFAMILY CounterCF (KEY text PRIMARY KEY, count_me counter)
+            WITH comparator = ascii AND default_validation = counter;
+    """)
     dbconn.execute("CREATE INDEX ON IndexedA (birthdate)")
 
     query = "UPDATE StandardString1 SET :c1 = :v1, :c2 = :v2 WHERE KEY = :key"
@@ -1081,3 +1085,76 @@ class TestCql(ThriftTester):
         assert_raises(cql.ProgrammingError,
                       cursor.execute,
                       "ALTER TABLE NewCf1 DROP name")
+    
+    def test_counter_column_support(self):
+        "update statement should be able to work with counter columns"
+        cursor = init()
+
+        # increment counter
+        cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 2, \
+               "unrecognized value '%s'" % r[1]
+
+        cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 4, \
+               "unrecognized value '%s'" % r[1]
+
+        # decrement counter
+        cursor.execute("UPDATE CounterCF SET count_me = count_me - 4 WHERE key = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 0, \
+               "unrecognized value '%s'" % r[1]
+
+        cursor.execute("SELECT * FROM CounterCF")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+
+        assert colnames[1] == "count_me", \
+               "unrecognized name '%s'" % colnames[1]
+
+        r = cursor.fetchone()
+        assert r[1] == 0, \
+               "unrecognized value '%s'" % r[1]
+
+        # deleting a counter column
+        cursor.execute("DELETE count_me FROM CounterCF WHERE KEY = 'counter1'")
+        cursor.execute("SELECT * FROM CounterCF")
+        assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+        assert len(colnames) == 1
+
+        r = cursor.fetchone()
+        assert len(r) == 1
+
+        # can't mix counter and normal statements
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "UPDATE CounterCF SET count_me = count_me + 2, x = 'a' WHERE key = 'counter1'")
+
+        # column names must match
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      "UPDATE CounterCF SET count_me = count_not_me + 2 WHERE key = 'counter1'")