You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/11/17 17:57:35 UTC

svn commit: r1036112 - in /cassandra/trunk: src/java/org/apache/cassandra/cql/ test/system/

Author: eevans
Date: Wed Nov 17 16:57:35 2010
New Revision: 1036112

URL: http://svn.apache.org/viewvc?rev=1036112&view=rev
Log:
batched UPDATEs

Patch by eevans

Added:
    cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
    cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
    cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
    cassandra/trunk/test/system/test_cql.py

Added: cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java?rev=1036112&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/BatchUpdateStatement.java Wed Nov 17 16:57:35 2010
@@ -0,0 +1,65 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+package org.apache.cassandra.cql;
+
+import java.util.List;
+
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+/**
+ * A <code>BATCH UPDATE</code> statement parsed from a CQL query.
+ *
+ */
+public class BatchUpdateStatement
+{
+    private ConsistencyLevel consistency;
+    private List<UpdateStatement> updates;
+    
+    /**
+     * Creates a new BatchUpdateStatement from a list of UpdateStatements and a
+     * Thrift consistency level.
+     * 
+     * @param updates a list of UpdateStatements
+     * @param consistency Thrift consistency level enum
+     */
+    public BatchUpdateStatement(List<UpdateStatement> updates, ConsistencyLevel consistency)
+    {
+        this.updates = updates;
+        this.consistency = consistency;
+    }
+    
+    public ConsistencyLevel getConsistencyLevel()
+    {
+        return consistency;
+    }
+    
+    public List<UpdateStatement> getUpdates()
+    {
+        return updates;
+    }
+    
+    public String toString()
+    {
+        return String.format("BatchUpdateStatement(updates=%s, consistency=%s)",
+                             updates,
+                             consistency);
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1036112&r1=1036111&r2=1036112&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Wed Nov 17 16:57:35 2010
@@ -45,6 +45,7 @@ options {
 query returns [CQLStatement stmnt]
     : selectStatement { $stmnt = new CQLStatement(StatementType.SELECT, $selectStatement.expr); }
     | updateStatement { $stmnt = new CQLStatement(StatementType.UPDATE, $updateStatement.expr); }
+    | batchUpdateStatement { $stmnt = new CQLStatement(StatementType.BATCH_UPDATE, $batchUpdateStatement.expr); }
     | useStatement    { $stmnt = new CQLStatement(StatementType.USE, $useStatement.keyspace); }
     ;
 
@@ -90,6 +91,26 @@ selectStatement returns [SelectStatement
     ;
 
 /**
+ * BEGIN BATCH [USING CONSISTENCY.<LVL>]
+ * UPDATE <CF> SET name1 = value1 WHERE KEY = keyname1;
+ * UPDATE <CF> SET name2 = value2 WHERE KEY = keyname2;
+ * UPDATE <CF> SET name3 = value3 WHERE KEY = keyname3;
+ * APPLY BATCH
+ */
+batchUpdateStatement returns [BatchUpdateStatement expr]
+    : {
+          ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+          List<UpdateStatement> updates = new ArrayList<UpdateStatement>();
+      }
+      K_BEGIN K_BATCH ( K_USING K_CONSISTENCY '.' K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); } )?
+          u1=updateStatement { updates.add(u1); } ( uN=updateStatement { updates.add(uN); } )*
+      K_APPLY K_BATCH EOF
+      {
+          return new BatchUpdateStatement(updates, cLevel);
+      }
+    ;
+
+/**
  * UPDATE
  *     <CF>
  * USING
@@ -102,7 +123,7 @@ selectStatement returns [SelectStatement
  */
 updateStatement returns [UpdateStatement expr]
     : {
-          ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+          ConsistencyLevel cLevel = null;
           Map<Term, Term> columns = new HashMap<Term, Term>();
       }
       K_UPDATE columnFamily=IDENT
@@ -177,6 +198,9 @@ K_FIRST:       F I R S T;
 K_REVERSED:    R E V E R S E D;
 K_COUNT:       C O U N T;
 K_SET:         S E T;
+K_BEGIN:       B E G I N;
+K_APPLY:       A P P L Y;
+K_BATCH:       B A T C H;
 
 // Case-insensitive alpha characters
 fragment A: ('a'|'A');

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1036112&r1=1036111&r2=1036112&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Wed Nov 17 16:57:35 2010
@@ -24,7 +24,6 @@ package org.apache.cassandra.cql;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -211,10 +210,9 @@ public class QueryProcessor
         return rows;
     }
     
-    private static void batchUpdate(String keyspace, List<UpdateStatement> updateStatements)
+    private static void batchUpdate(String keyspace, List<UpdateStatement> updateStatements, ConsistencyLevel consistency)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        ConsistencyLevel consistency = updateStatements.get(0).getConsistencyLevel();
         List<RowMutation> rowMutations = new ArrayList<RowMutation>();
 
         for (UpdateStatement update : updateStatements)
@@ -392,7 +390,19 @@ public class QueryProcessor
                 
             case UPDATE:
                 UpdateStatement update = (UpdateStatement)statement.statement;
-                batchUpdate(keyspace, Collections.singletonList(update));
+                batchUpdate(keyspace, Collections.singletonList(update), update.getConsistencyLevel());
+                avroResult.type = CqlResultType.VOID;
+                return avroResult;
+                
+            case BATCH_UPDATE:
+                BatchUpdateStatement batch = (BatchUpdateStatement)statement.statement;
+                
+                for (UpdateStatement up : batch.getUpdates())
+                    if (up.isSetConsistencyLevel())
+                        throw newInvalidRequestException(
+                                "Consistency level must be set on the BATCH, not individual UPDATE statements");
+                
+                batchUpdate(keyspace, batch.getUpdates(), batch.getConsistencyLevel());
                 avroResult.type = CqlResultType.VOID;
                 return avroResult;
                 

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1036112&r1=1036111&r2=1036112&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Wed Nov 17 16:57:35 2010
@@ -22,5 +22,5 @@ package org.apache.cassandra.cql;
 
 public enum StatementType
 {
-    SELECT, UPDATE, USE;
+    SELECT, UPDATE, BATCH_UPDATE, USE;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1036112&r1=1036111&r2=1036112&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Wed Nov 17 16:57:35 2010
@@ -30,8 +30,9 @@ import org.apache.cassandra.thrift.Consi
  */
 public class UpdateStatement
 {
+    public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE;
     private String columnFamily;
-    private ConsistencyLevel cLevel;
+    private ConsistencyLevel cLevel = null;
     private Map<Term, Term> columns;
     private Term key;
     
@@ -52,9 +53,38 @@ public class UpdateStatement
         this.key = key;
     }
 
+    /**
+     * Creates a new UpdateStatement from a column family name, columns map,
+     * and key term.
+     * 
+     * @param columnFamily column family name
+     * @param columns a map of column name/values pairs
+     * @param key the key name
+     */
+    public UpdateStatement(String columnFamily, Map<Term, Term> columns, Term key)
+    {
+        this(columnFamily, null, columns, key);
+    }
+
+    /**
+     * Returns the consistency level of this <code>UPDATE</code> statement, either
+     * one parsed from the CQL statement, or the default level otherwise.
+     * 
+     * @return the consistency level as a Thrift enum.
+     */
     public ConsistencyLevel getConsistencyLevel()
     {
-        return cLevel;
+        return (cLevel != null) ? cLevel : defaultConsistency;
+    }
+    
+    /**
+     * True if an explicit consistency level was parsed from the statement.
+     * 
+     * @return true if a consistency was parsed, false otherwise.
+     */
+    public boolean isSetConsistencyLevel()
+    {
+        return (cLevel != null);
     }
 
     public String getColumnFamily()

Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1036112&r1=1036111&r2=1036112&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Wed Nov 17 16:57:35 2010
@@ -23,53 +23,25 @@ def load_sample(dbconn):
     """)
 
     dbconn.execute("""
-        UPDATE StandardLong1 SET 1L = "1", 2L = "2", 3L = "3", 4L = "4"
-                WHERE KEY = "aa";
-    """)
-    dbconn.execute("""
-        UPDATE StandardLong1 SET 5L = "5", 6L = "6", 7L = "8", 9L = "9"
-                WHERE KEY = "ab";
-    """)
-    dbconn.execute("""
-        UPDATE StandardLong1 SET 9L = "9", 8L = "8", 7L = "7", 6L = "6"
-                WHERE KEY = "ac";
-    """)
-    dbconn.execute("""
-        UPDATE StandardLong1 SET 5L = "5", 4L = "4", 3L = "3", 2L = "2"
-                WHERE KEY = "ad";
-    """)
-    dbconn.execute("""
-        UPDATE StandardLong1 SET 1L = "1", 2L = "2", 3L = "3", 4L = "4"
-                WHERE KEY = "ae";
-    """)
-    dbconn.execute("""
-        UPDATE StandardLong1 SET 1L = "1", 2L = "2", 3L = "3", 4L = "4"
-                WHERE KEY = "af";
-    """)
-    dbconn.execute("""
-        UPDATE StandardLong1 SET 5L = "5", 6L = "6", 7L = "8", 9L = "9"
-                WHERE KEY = "ag";
+    BEGIN BATCH USING CONSISTENCY.ONE
+     UPDATE StandardLong1 SET 1L="1", 2L="2", 3L="3", 4L="4" WHERE KEY="aa";
+     UPDATE StandardLong1 SET 5L="5", 6L="6", 7L="8", 9L="9" WHERE KEY="ab";
+     UPDATE StandardLong1 SET 9L="9", 8L="8", 7L="7", 6L="6" WHERE KEY="ac";
+     UPDATE StandardLong1 SET 5L="5", 4L="4", 3L="3", 2L="2" WHERE KEY="ad";
+     UPDATE StandardLong1 SET 1L="1", 2L="2", 3L="3", 4L="4" WHERE KEY="ae";
+     UPDATE StandardLong1 SET 1L="1", 2L="2", 3L="3", 4L="4" WHERE KEY="af";
+     UPDATE StandardLong1 SET 5L="5", 6L="6", 7L="8", 9L="9" WHERE KEY="ag";
+    APPLY BATCH
     """)
 
     dbconn.execute("""
-        UPDATE Indexed1 SET "birthdate" = 100L, "unindexed" = 250L
-                WHERE KEY = "asmith";
-    """)
-    dbconn.execute("""
-        UPDATE Indexed1 SET "birthdate" = 100L, "unindexed" = 200L
-                WHERE KEY = "dozer";
-    """)
-    dbconn.execute("""
-        UPDATE Indexed1 SET "birthdate" = 175L, "unindexed" = 200L
-                WHERE KEY = "morpheus";
-    """)
-    dbconn.execute("""
-        UPDATE Indexed1 SET "birthdate" = 150L, "unindexed" = 250L
-                WHERE KEY = "neo";
-    """)
-    dbconn.execute("""
-        UPDATE Indexed1 SET "birthdate" = 125L, "unindexed" = 200L
-                WHERE KEY = "trinity";
+    BEGIN BATCH
+    UPDATE Indexed1 SET "birthdate"=100L, "unindexed"=250L WHERE KEY="asmith";
+    UPDATE Indexed1 SET "birthdate"=100L, "unindexed"=200L WHERE KEY="dozer";
+    UPDATE Indexed1 SET "birthdate"=175L, "unindexed"=200L WHERE KEY="morpheus";
+    UPDATE Indexed1 SET "birthdate"=150L, "unindexed"=250L WHERE KEY="neo";
+    UPDATE Indexed1 SET "birthdate"=125L, "unindexed"=200L WHERE KEY="trinity";
+    APPLY BATCH
     """)
 
 def init(keyspace="Keyspace1"):