You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/26 16:53:54 UTC

svn commit: r1096771 - in /cassandra/trunk: ./ doc/cql/ src/java/org/apache/cassandra/cql/ test/system/

Author: jbellis
Date: Tue Apr 26 14:53:53 2011
New Revision: 1096771

URL: http://svn.apache.org/viewvc?rev=1096771&view=rev
Log:
add support for insert, delete in cql BATCH
patch by Pavel Yaskevich; reviewed by jbellis for CASSANDRA-2537

Added:
    cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/doc/cql/CQL.textile
    cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
    cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
    cassandra/trunk/test/system/test_cql.py

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Apr 26 14:53:53 2011
@@ -3,6 +3,7 @@
  * Remove checking all column families on startup for compaction candidates (CASSANDRA-2444)
  * validate CQL create keyspace options (CASSANDRA-2525)
  * fix nodetool setcompactionthroughput (CASSANDRA-2550)
+ * add support for insert, delete in cql BATCH (CASSANDRA-2537)
 
 
 0.8.0-beta1

Modified: cassandra/trunk/doc/cql/CQL.textile
URL: http://svn.apache.org/viewvc/cassandra/trunk/doc/cql/CQL.textile?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/doc/cql/CQL.textile (original)
+++ cassandra/trunk/doc/cql/CQL.textile Tue Apr 26 14:53:53 2011
@@ -99,19 +99,6 @@ UPDATE ... SET name1 = value1, name2 = v
 
 Rows are created or updated by supplying column names and values in term assignment format. Multiple columns can be set by separating the name/value pairs using commas.  Each update statement requires exactly one key to be specified using a WHERE clause and the @KEY@ keyword.
 
-Additionally, it is also possible to send multiple UPDATES to a node at once using a batch syntax:
-
-bc. 
-BEGIN BATCH [USING <CONSISTENCY>]
-UPDATE CF1 SET name1 = value1, name2 = value2 WHERE KEY = keyname1;
-UPDATE CF1 SET name3 = value3 WHERE KEY = keyname2;
-UPDATE CF2 SET name4 = value4, name5 = value5 WHERE KEY = keyname3;
-APPLY BATCH
-
-When batching UPDATEs, a single consistency level is used for the entire batch, it appears after the @BEGIN BATCH@ statement, and uses the standard "consistency level specification":#consistency. Batch UPDATEs default to @CONSISTENCY.ONE@ when left unspecified.
-
-_NOTE: While there are no isolation guarantees,  @UPDATE@ queries are atomic within a give record._
-
 h2. DELETE
 
 _Synopsis:_
@@ -150,6 +137,30 @@ UPDATE ... WHERE KEY = keyname1
 UPDATE ... WHERE KEY IN (keyname1, keyname2)
 
 The @WHERE@ clause is used to determine which row(s) a @DELETE@ applies to.  The first form allows the specification of a single keyname using the @KEY@ keyword and the @=@ operator.  The second form allows a list of keyname terms to be specified using the @IN@ notation and a parenthesized list of comma-delimited keyname terms.
+
+h2. BATCH
+
+_Synopsis:_
+
+bc.
+BATCH BEGIN BATCH [USING CONSISTENCY <LEVEL>]
+    INSERT or UPDATE or DELETE statements separated by semicolon or "end of line"
+APPLY BATCH
+
+A single consistency level is used for the entire batch, it appears after the @BEGIN BATCH@ statement, and uses the standard "consistency level specification":#consistency. Batch default to @CONSISTENCY.ONE@ when left unspecified.
+
+_NOTE: While there are no isolation guarantees,  @UPDATE@ queries are atomic within a give record._
+
+_Example:_
+
+bc.
+BEGIN BATCH USING CONSISTENCY QUORUM
+  INSERT INTO users (KEY, password, name) VALUES ('user2', 'ch@ngem3b', 'second user')
+  UPDATE users SET password = 'ps22dhds' WHERE KEY = 'user2'
+  INSERT INTO users (KEY, password) VALUES ('user3', 'ch@ngem3c')
+  DELETE name FROM users WHERE key = 'user2'
+  INSERT INTO users (KEY, password, name) VALUES ('user4', 'ch@ngem3c', 'Andrew')
+APPLY BATCH
      
 h2. TRUNCATE
 

Added: cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java?rev=1096771&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java Tue Apr 26 14:53:53 2011
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.cql;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+import java.util.List;
+
+public abstract class AbstractModification
+{
+    public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE;
+
+    protected final String columnFamily;
+    protected final ConsistencyLevel cLevel;
+
+    public AbstractModification(String columnFamily, ConsistencyLevel cLevel)
+    {
+        this.columnFamily = columnFamily;
+        this.cLevel = cLevel;
+    }
+
+    public String getColumnFamily()
+    {
+        return columnFamily;
+    }
+
+    public ConsistencyLevel getConsistencyLevel()
+    {
+        return (cLevel != null) ? cLevel : defaultConsistency;
+    }
+
+    /**
+     * True if an explicit consistency level was parsed from the statement.
+     *
+     * @return true if a consistency was parsed, false otherwise.
+     */
+    public boolean isSetConsistencyLevel()
+    {
+        return cLevel != null;
+    }
+
+    /**
+     * Convert statement into a list of mutations to apply on the server
+     *
+     * @param keyspace The working keyspace
+     * @param clientState current client status
+     *
+     * @return list of the mutations
+     *
+     * @throws org.apache.cassandra.thrift.InvalidRequestException on the wrong request
+     */
+    public abstract List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState)
+            throws org.apache.cassandra.thrift.InvalidRequestException;
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java?rev=1096771&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java Tue Apr 26 14:53:53 2011
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.cql;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+/**
+ * A <code>BATCH</code> statement parsed from a CQL query.
+ *
+ */
+public class BatchStatement
+{
+    // statements to execute
+    protected final List<AbstractModification> statements;
+
+    // global consistency level
+    protected final ConsistencyLevel consistency;
+
+
+    /**
+     * Creates a new BatchStatement from a list of statements and a
+     * Thrift consistency level.
+     *
+     * @param statements a list of UpdateStatements
+     * @param level Thrift consistency level enum
+     */
+    public BatchStatement(List<AbstractModification> statements, ConsistencyLevel level)
+    {
+        this.statements = statements;
+        consistency = level;
+    }
+
+    public List<AbstractModification> getStatements()
+    {
+        return statements;
+    }
+
+    public ConsistencyLevel getConsistencyLevel()
+    {
+        return consistency;
+    }
+
+    public List<RowMutation> getMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+    {
+        List<RowMutation> batch = new LinkedList<RowMutation>();
+
+        for (AbstractModification statement : statements)
+        {
+            batch.addAll(statement.prepareRowMutations(keyspace, clientState));
+        }
+
+        return batch;
+    }
+
+
+    public String toString()
+    {
+        return String.format("BatchStatement(statements=%s, consistency=%s)", statements, consistency);
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Tue Apr 26 14:53:53 2011
@@ -102,12 +102,12 @@ options {
 
 query returns [CQLStatement stmnt]
     : selectStatement   { $stmnt = new CQLStatement(StatementType.SELECT, $selectStatement.expr); }
-    | insertStatement   { $stmnt = new CQLStatement(StatementType.INSERT, $insertStatement.expr); }
+    | insertStatement endStmnt { $stmnt = new CQLStatement(StatementType.INSERT, $insertStatement.expr); }
     | updateStatement endStmnt { $stmnt = new CQLStatement(StatementType.UPDATE, $updateStatement.expr); }
-    | batchUpdateStatement { $stmnt = new CQLStatement(StatementType.BATCH_UPDATE, $batchUpdateStatement.expr); }
+    | batchStatement { $stmnt = new CQLStatement(StatementType.BATCH, $batchStatement.expr); }
     | useStatement      { $stmnt = new CQLStatement(StatementType.USE, $useStatement.keyspace); }
     | truncateStatement { $stmnt = new CQLStatement(StatementType.TRUNCATE, $truncateStatement.cfam); }
-    | deleteStatement   { $stmnt = new CQLStatement(StatementType.DELETE, $deleteStatement.expr); }
+    | deleteStatement endStmnt { $stmnt = new CQLStatement(StatementType.DELETE, $deleteStatement.expr); }
     | createKeyspaceStatement { $stmnt = new CQLStatement(StatementType.CREATE_KEYSPACE, $createKeyspaceStatement.expr); }
     | createColumnFamilyStatement { $stmnt = new CQLStatement(StatementType.CREATE_COLUMNFAMILY, $createColumnFamilyStatement.expr); }
     | createIndexStatement { $stmnt = new CQLStatement(StatementType.CREATE_INDEX, $createIndexStatement.expr); }
@@ -191,7 +191,7 @@ whereClause returns [WhereClause clause]
  */
 insertStatement returns [UpdateStatement expr]
     : {
-          ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+          ConsistencyLevel cLevel = null;
           Map<Term, Term> columns = new HashMap<Term, Term>();
 
           List<Term> columnNames  = new ArrayList<Term>();
@@ -202,32 +202,54 @@ insertStatement returns [UpdateStatement
         K_VALUES
           '(' key=term ( ',' column_value=term { columnValues.add($column_value.item); })+ ')'
         ( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); } )?
-      endStmnt
       {
           return new UpdateStatement($columnFamily.text, cLevel, columnNames, columnValues, key);
       }
     ;
 
 /**
- * BEGIN BATCH [USING CONSISTENCY.<LVL>]
- * UPDATE <CF> SET name1 = value1 WHERE KEY = keyname1;
- * UPDATE <CF> SET name2 = value2 WHERE KEY = keyname2;
- * UPDATE <CF> SET name3 = value3 WHERE KEY = keyname3;
+ * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ *   UPDATE <CF> SET name1 = value1 WHERE KEY = keyname1;
+ *   UPDATE <CF> SET name2 = value2 WHERE KEY = keyname2;
+ *   UPDATE <CF> SET name3 = value3 WHERE KEY = keyname3;
+ *   ...
+ * APPLY BATCH
+ *
+ * OR
+ *
+ * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ *   INSERT INTO <CF> (KEY, <name>) VALUES ('<key>', '<value>');
+ *   INSERT INTO <CF> (KEY, <name>) VALUES ('<key>', '<value>');
+ *   ...
+ * APPLY BATCH
+ *
+ * OR
+ *
+ * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ *   DELETE name1, name2 FROM <CF> WHERE key = <key>
+ *   DELETE name3, name4 FROM <CF> WHERE key = <key>
+ *   ...
  * APPLY BATCH
  */
-batchUpdateStatement returns [BatchUpdateStatement expr]
+batchStatement returns [BatchStatement expr]
     : {
           ConsistencyLevel cLevel = ConsistencyLevel.ONE;
-          List<UpdateStatement> updates = new ArrayList<UpdateStatement>();
+          List<AbstractModification> statements = new ArrayList<AbstractModification>();
       }
       K_BEGIN K_BATCH ( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); } )?
-          u1=updateStatement ';'? { updates.add(u1); } ( uN=updateStatement ';'? { updates.add(uN); } )*
-      K_APPLY K_BATCH EOF
+          s1=batchStatementObjective ';'? { statements.add(s1); } ( sN=batchStatementObjective ';'? { statements.add(sN); } )*
+      K_APPLY K_BATCH endStmnt
       {
-          return new BatchUpdateStatement(updates, cLevel);
+          return new BatchStatement(statements, cLevel);
       }
     ;
 
+batchStatementObjective returns [AbstractModification statement]
+    : i=insertStatement  { $statement = i; }
+    | u=updateStatement  { $statement = u; }
+    | d=deleteStatement  { $statement = d; }
+    ;
+
 /**
  * UPDATE
  *     <CF>
@@ -265,7 +287,7 @@ updateStatement returns [UpdateStatement
  */
 deleteStatement returns [DeleteStatement expr]
     : {
-          ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+          ConsistencyLevel cLevel = null;
           List<Term> keyList = null;
           List<Term> columnsList = Collections.emptyList();
       }
@@ -274,7 +296,7 @@ deleteStatement returns [DeleteStatement
           K_FROM columnFamily=( IDENT | STRING_LITERAL | INTEGER ) ( K_USING K_CONSISTENCY K_LEVEL )?
           K_WHERE ( K_KEY '=' key=term           { keyList = Collections.singletonList(key); }
                   | K_KEY K_IN '(' keys=termList { keyList = $keys.items; } ')'
-                  )? endStmnt
+                  )?
       {
           return new DeleteStatement(columnsList, $columnFamily.text, cLevel, keyList);
       }

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java Tue Apr 26 14:53:53 2011
@@ -20,26 +20,37 @@
  */
 package org.apache.cassandra.cql;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
+import static org.apache.cassandra.cql.QueryProcessor.validateColumnName;
 
 /**
  * A <code>DELETE</code> parsed from a CQL query statement.
  *
  */
-public class DeleteStatement
+public class DeleteStatement extends AbstractModification
 {
     private List<Term> columns;
-    private String columnFamily;
-    private ConsistencyLevel cLevel;
     private List<Term> keys;
     
     public DeleteStatement(List<Term> columns, String columnFamily, ConsistencyLevel cLevel, List<Term> keys)
     {
+        super(columnFamily, cLevel);
+
         this.columns = columns;
-        this.columnFamily = columnFamily;
-        this.cLevel = cLevel;
         this.keys = keys;
     }
 
@@ -48,21 +59,44 @@ public class DeleteStatement
         return columns;
     }
 
-    public String getColumnFamily()
+    public List<Term> getKeys()
     {
-        return columnFamily;
+        return keys;
     }
 
-    public ConsistencyLevel getConsistencyLevel()
+    /** {@inheritDoc} */
+    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
     {
-        return cLevel;
-    }
+        clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
+        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
 
-    public List<Term> getKeys()
-    {
-        return keys;
+        AbstractType comparator = metadata.getComparatorFor(null);
+        AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace, columnFamily).getKeyValidator();
+
+        List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+
+        for (Term key : keys)
+        {
+            RowMutation rm = new RowMutation(keyspace, key.getByteBuffer(keyType));
+
+            if (columns.size() < 1) // No columns, delete the row
+                rm.delete(new QueryPath(columnFamily), System.currentTimeMillis());
+            else    // Delete specific columns
+            {
+                for (Term column : columns)
+                {
+                    ByteBuffer columnName = column.getByteBuffer(comparator);
+                    validateColumnName(columnName);
+                    rm.delete(new QueryPath(columnFamily, null, columnName), System.currentTimeMillis());
+                }
+            }
+
+            rowMutations.add(rm);
+        }
+
+        return rowMutations;
     }
-    
+
     public String toString()
     {
         return String.format("DeleteStatement(columns=%s, columnFamily=%s, consistency=%s keys=%s)",

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Tue Apr 26 14:53:53 2011
@@ -367,7 +367,7 @@ public class QueryProcessor
         }
     }
     
-    private static void validateKey(ByteBuffer key) throws InvalidRequestException
+    public static void validateKey(ByteBuffer key) throws InvalidRequestException
     {
         if (key == null || key.remaining() == 0)
         {
@@ -396,13 +396,13 @@ public class QueryProcessor
         }
     }
 
-    private static void validateColumnName(ByteBuffer column)
+    public static void validateColumnName(ByteBuffer column)
     throws InvalidRequestException
     {
         validateColumnNames(Arrays.asList(column));
     }
     
-    private static void validateColumn(CFMetaData metadata, ByteBuffer name, ByteBuffer value)
+    public static void validateColumn(CFMetaData metadata, ByteBuffer name, ByteBuffer value)
     throws InvalidRequestException
     {
         validateColumnName(name);
@@ -543,15 +543,27 @@ public class QueryProcessor
                 result.type = CqlResultType.VOID;
                 return result;
                 
-            case BATCH_UPDATE:
-                BatchUpdateStatement batch = (BatchUpdateStatement)statement.statement;
-                
-                for (UpdateStatement up : batch.getUpdates())
+            case BATCH:
+                BatchStatement batch = (BatchStatement) statement.statement;
+
+                for (AbstractModification up : batch.getStatements())
                     if (up.isSetConsistencyLevel())
                         throw new InvalidRequestException(
-                                "Consistency level must be set on the BATCH, not individual UPDATE statements");
-                
-                batchUpdate(clientState, batch.getUpdates(), batch.getConsistencyLevel());
+                                "Consistency level must be set on the BATCH, not individual statements");
+
+                try
+                {
+                    StorageProxy.mutate(batch.getMutations(keyspace, clientState), batch.getConsistencyLevel());
+                }
+                catch (org.apache.cassandra.thrift.UnavailableException e)
+                {
+                    throw new UnavailableException();
+                }
+                catch (TimeoutException e)
+                {
+                    throw new TimedOutException();
+                }
+
                 result.type = CqlResultType.VOID;
                 return result;
                 
@@ -583,34 +595,10 @@ public class QueryProcessor
             
             case DELETE:
                 DeleteStatement delete = (DeleteStatement)statement.statement;
-                clientState.hasColumnFamilyAccess(delete.getColumnFamily(), Permission.WRITE);
-                metadata = validateColumnFamily(keyspace, delete.getColumnFamily(), false);
-                comparator = metadata.getComparatorFor(null);
-                AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace,
-                                                                           delete.getColumnFamily()).getKeyValidator();
-                
-                List<RowMutation> rowMutations = new ArrayList<RowMutation>();
-                for (Term key : delete.getKeys())
-                {
-                    RowMutation rm = new RowMutation(keyspace, key.getByteBuffer(keyType));
-                    if (delete.getColumns().size() < 1)     // No columns, delete the row
-                        rm.delete(new QueryPath(delete.getColumnFamily()), System.currentTimeMillis());
-                    else    // Delete specific columns
-                    {
-                        for (Term column : delete.getColumns())
-                        {
-                            ByteBuffer columnName = column.getByteBuffer(comparator);
-                            validateColumnName(columnName);
-                            rm.delete(new QueryPath(delete.getColumnFamily(), null, columnName),
-                                      System.currentTimeMillis());
-                        }
-                    }
-                    rowMutations.add(rm);
-                }
-                
+
                 try
                 {
-                    StorageProxy.mutate(rowMutations, delete.getConsistencyLevel());
+                    StorageProxy.mutate(delete.prepareRowMutations(keyspace, clientState), delete.getConsistencyLevel());
                 }
                 catch (TimeoutException e)
                 {

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Tue Apr 26 14:53:53 2011
@@ -24,7 +24,7 @@ import java.util.EnumSet;
 
 public enum StatementType
 {
-    SELECT, INSERT, UPDATE, BATCH_UPDATE, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX,
+    SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX,
         DROP_KEYSPACE, DROP_COLUMNFAMILY;
     
     // Statement types that don't require a keyspace to be set.

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Tue Apr 26 14:53:53 2011
@@ -21,24 +21,29 @@
 package org.apache.cassandra.cql;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.InvalidRequestException;
 
+import static org.apache.cassandra.cql.QueryProcessor.validateKey;
+import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
+
+import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
+
 /**
  * An <code>UPDATE</code> statement parsed from a CQL query statement.
  *
  */
-public class UpdateStatement
+public class UpdateStatement extends AbstractModification
 {
-    public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE;
-    private String columnFamily;
-    private ConsistencyLevel cLevel = null;
     private Map<Term, Term> columns;
     private List<Term> columnNames, columnValues;
     private Term key;
@@ -54,8 +59,8 @@ public class UpdateStatement
      */
     public UpdateStatement(String columnFamily, ConsistencyLevel cLevel, Map<Term, Term> columns, Term key)
     {
-        this.columnFamily = columnFamily;
-        this.cLevel = cLevel;
+        super(columnFamily, cLevel);
+
         this.columns = columns;
         this.key = key;
     }
@@ -86,8 +91,8 @@ public class UpdateStatement
      */
     public UpdateStatement(String columnFamily, ConsistencyLevel cLevel, List<Term> columnNames, List<Term> columnValues, Term key)
     {
-        this.columnFamily = columnFamily;
-        this.cLevel = cLevel;
+        super(columnFamily, cLevel);
+
         this.columnNames = columnNames;
         this.columnValues = columnValues;
         this.key = key;
@@ -114,6 +119,37 @@ public class UpdateStatement
         return (cLevel != null);
     }
 
+    /** {@inheritDoc} */
+    public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+    {
+        List<String> cfamsSeen = new ArrayList<String>();
+
+        CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+
+        // Avoid unnecessary authorizations.
+        if (!(cfamsSeen.contains(columnFamily)))
+        {
+            clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
+            cfamsSeen.add(columnFamily);
+        }
+
+        ByteBuffer key = this.key.getByteBuffer(getKeyType(keyspace));
+        validateKey(key);
+        AbstractType<?> comparator = getComparator(keyspace);
+
+        RowMutation rm = new RowMutation(keyspace, key);
+        for (Map.Entry<Term, Term> column : getColumns().entrySet())
+        {
+            ByteBuffer colName = column.getKey().getByteBuffer(comparator);
+            ByteBuffer colValue = column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
+
+            validateColumn(metadata, colName, colValue);
+            rm.add(new QueryPath(columnFamily, null, colName), colValue, System.currentTimeMillis());
+        }
+
+        return Arrays.asList(rm);
+    }
+
     public String getColumnFamily()
     {
         return columnFamily;

Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Tue Apr 26 14:53:53 2011
@@ -699,3 +699,60 @@ class TestCql(ThriftTester):
         r = cursor.fetchone()
         assert r[1] == "some_value", \
                "unrecognized value '%s'" % r[1]
+
+    def test_batch_with_mixed_statements(self):
+        "handle BATCH with INSERT/UPDATE/DELETE statements mixed in it"
+        cursor = init()
+        cursor.compression = 'NONE'
+        cursor.execute("""
+          BEGIN BATCH USING CONSISTENCY ONE
+            UPDATE StandardString1 SET :name = :val WHERE KEY = :key1
+            INSERT INTO StandardString1 (KEY, :col1) VALUES (:key2, :val)
+            INSERT INTO StandardString1 (KEY, :col2) VALUES (:key3, :val)
+            DELETE :col2 FROM StandardString1 WHERE key = :key3
+          APPLY BATCH
+        """,
+        dict(key1="bKey1", key2="bKey2", key3="bKey3", name="bName", col1="bCol1", col2="bCol2", val="bVal"))
+
+        cursor.execute("SELECT :name FROM StandardString1 WHERE KEY = :key",
+                       dict(name="bName", key="bKey1"))
+
+        assert cursor.rowcount == 1, "expected 1 result, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+        assert colnames[1] == "bName", \
+               "unrecognized name '%s'" % colnames[1]
+        r = cursor.fetchone()
+        assert r[1] == "bVal", \
+               "unrecognized value '%s'" % r[1]
+
+        cursor.execute("SELECT :name FROM StandardString1 WHERE KEY = :key",
+                       dict(name="bCol2", key="bKey3"))
+
+        assert cursor.rowcount == 1, "expected 1 result, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+        assert colnames[1] == "bCol2", \
+               "unrecognized name '%s'" % colnames[1]
+        # was deleted by DELETE statement
+        r = cursor.fetchone()
+        assert r[1] == None, \
+               "unrecognized value '%s'" % r[1]
+
+        cursor.execute("SELECT :name FROM StandardString1 WHERE KEY = :key",
+                       dict(name="bCol1", key="bKey2"))
+
+        assert cursor.rowcount == 1, "expected 1 result, got %d" % cursor.rowcount
+        colnames = [col_d[0] for col_d in cursor.description]
+        assert colnames[1] == "bCol1", \
+               "unrecognized name '%s'" % colnames[1]
+        r = cursor.fetchone()
+        assert r[1] == "bVal", \
+               "unrecognized value '%s'" % r[1]
+
+        assert_raises(cql.ProgrammingError,
+                      cursor.execute,
+                      """
+                      BEGIN BATCH USING CONSISTENCY ONE
+                          UPDATE USING CONSISTENCY QUORUM StandardString1 SET 'name' = 'value' WHERE KEY = 'bKey4'
+                          DELETE 'name' FROM StandardString1 WHERE KEY = 'bKey4'
+                      APPLY BATCH
+                      """)