You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/26 16:53:54 UTC
svn commit: r1096771 - in /cassandra/trunk: ./ doc/cql/
src/java/org/apache/cassandra/cql/ test/system/
Author: jbellis
Date: Tue Apr 26 14:53:53 2011
New Revision: 1096771
URL: http://svn.apache.org/viewvc?rev=1096771&view=rev
Log:
add support for insert, delete in cql BATCH
patch by Pavel Yaskevich; reviewed by jbellis for CASSANDRA-2537
Added:
cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java
cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/doc/cql/CQL.textile
cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java
cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
cassandra/trunk/test/system/test_cql.py
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Apr 26 14:53:53 2011
@@ -3,6 +3,7 @@
* Remove checking all column families on startup for compaction candidates (CASSANDRA-2444)
* validate CQL create keyspace options (CASSANDRA-2525)
* fix nodetool setcompactionthroughput (CASSANDRA-2550)
+ * add support for insert, delete in cql BATCH (CASSANDRA-2537)
0.8.0-beta1
Modified: cassandra/trunk/doc/cql/CQL.textile
URL: http://svn.apache.org/viewvc/cassandra/trunk/doc/cql/CQL.textile?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/doc/cql/CQL.textile (original)
+++ cassandra/trunk/doc/cql/CQL.textile Tue Apr 26 14:53:53 2011
@@ -99,19 +99,6 @@ UPDATE ... SET name1 = value1, name2 = v
Rows are created or updated by supplying column names and values in term assignment format. Multiple columns can be set by separating the name/value pairs using commas. Each update statement requires exactly one key to be specified using a WHERE clause and the @KEY@ keyword.
-Additionally, it is also possible to send multiple UPDATES to a node at once using a batch syntax:
-
-bc.
-BEGIN BATCH [USING <CONSISTENCY>]
-UPDATE CF1 SET name1 = value1, name2 = value2 WHERE KEY = keyname1;
-UPDATE CF1 SET name3 = value3 WHERE KEY = keyname2;
-UPDATE CF2 SET name4 = value4, name5 = value5 WHERE KEY = keyname3;
-APPLY BATCH
-
-When batching UPDATEs, a single consistency level is used for the entire batch, it appears after the @BEGIN BATCH@ statement, and uses the standard "consistency level specification":#consistency. Batch UPDATEs default to @CONSISTENCY.ONE@ when left unspecified.
-
-_NOTE: While there are no isolation guarantees, @UPDATE@ queries are atomic within a give record._
-
h2. DELETE
_Synopsis:_
@@ -150,6 +137,30 @@ UPDATE ... WHERE KEY = keyname1
UPDATE ... WHERE KEY IN (keyname1, keyname2)
The @WHERE@ clause is used to determine which row(s) a @DELETE@ applies to. The first form allows the specification of a single keyname using the @KEY@ keyword and the @=@ operator. The second form allows a list of keyname terms to be specified using the @IN@ notation and a parenthesized list of comma-delimited keyname terms.
+
+h2. BATCH
+
+_Synopsis:_
+
+bc.
+BATCH BEGIN BATCH [USING CONSISTENCY <LEVEL>]
+ INSERT or UPDATE or DELETE statements separated by semicolon or "end of line"
+APPLY BATCH
+
+A single consistency level is used for the entire batch, it appears after the @BEGIN BATCH@ statement, and uses the standard "consistency level specification":#consistency. Batch default to @CONSISTENCY.ONE@ when left unspecified.
+
+_NOTE: While there are no isolation guarantees, @UPDATE@ queries are atomic within a give record._
+
+_Example:_
+
+bc.
+BEGIN BATCH USING CONSISTENCY QUORUM
+ INSERT INTO users (KEY, password, name) VALUES ('user2', 'ch@ngem3b', 'second user')
+ UPDATE users SET password = 'ps22dhds' WHERE KEY = 'user2'
+ INSERT INTO users (KEY, password) VALUES ('user3', 'ch@ngem3c')
+ DELETE name FROM users WHERE key = 'user2'
+ INSERT INTO users (KEY, password, name) VALUES ('user4', 'ch@ngem3c', 'Andrew')
+APPLY BATCH
h2. TRUNCATE
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java?rev=1096771&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java Tue Apr 26 14:53:53 2011
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.cql;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+import java.util.List;
+
+public abstract class AbstractModification
+{
+ public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE;
+
+ protected final String columnFamily;
+ protected final ConsistencyLevel cLevel;
+
+ public AbstractModification(String columnFamily, ConsistencyLevel cLevel)
+ {
+ this.columnFamily = columnFamily;
+ this.cLevel = cLevel;
+ }
+
+ public String getColumnFamily()
+ {
+ return columnFamily;
+ }
+
+ public ConsistencyLevel getConsistencyLevel()
+ {
+ return (cLevel != null) ? cLevel : defaultConsistency;
+ }
+
+ /**
+ * True if an explicit consistency level was parsed from the statement.
+ *
+ * @return true if a consistency was parsed, false otherwise.
+ */
+ public boolean isSetConsistencyLevel()
+ {
+ return cLevel != null;
+ }
+
+ /**
+ * Convert statement into a list of mutations to apply on the server
+ *
+ * @param keyspace The working keyspace
+ * @param clientState current client status
+ *
+ * @return list of the mutations
+ *
+ * @throws org.apache.cassandra.thrift.InvalidRequestException on the wrong request
+ */
+ public abstract List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState)
+ throws org.apache.cassandra.thrift.InvalidRequestException;
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java?rev=1096771&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java Tue Apr 26 14:53:53 2011
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.cql;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+/**
+ * A <code>BATCH</code> statement parsed from a CQL query.
+ *
+ */
+public class BatchStatement
+{
+ // statements to execute
+ protected final List<AbstractModification> statements;
+
+ // global consistency level
+ protected final ConsistencyLevel consistency;
+
+
+ /**
+ * Creates a new BatchStatement from a list of statements and a
+ * Thrift consistency level.
+ *
+ * @param statements a list of UpdateStatements
+ * @param level Thrift consistency level enum
+ */
+ public BatchStatement(List<AbstractModification> statements, ConsistencyLevel level)
+ {
+ this.statements = statements;
+ consistency = level;
+ }
+
+ public List<AbstractModification> getStatements()
+ {
+ return statements;
+ }
+
+ public ConsistencyLevel getConsistencyLevel()
+ {
+ return consistency;
+ }
+
+ public List<RowMutation> getMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+ {
+ List<RowMutation> batch = new LinkedList<RowMutation>();
+
+ for (AbstractModification statement : statements)
+ {
+ batch.addAll(statement.prepareRowMutations(keyspace, clientState));
+ }
+
+ return batch;
+ }
+
+
+ public String toString()
+ {
+ return String.format("BatchStatement(statements=%s, consistency=%s)", statements, consistency);
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Tue Apr 26 14:53:53 2011
@@ -102,12 +102,12 @@ options {
query returns [CQLStatement stmnt]
: selectStatement { $stmnt = new CQLStatement(StatementType.SELECT, $selectStatement.expr); }
- | insertStatement { $stmnt = new CQLStatement(StatementType.INSERT, $insertStatement.expr); }
+ | insertStatement endStmnt { $stmnt = new CQLStatement(StatementType.INSERT, $insertStatement.expr); }
| updateStatement endStmnt { $stmnt = new CQLStatement(StatementType.UPDATE, $updateStatement.expr); }
- | batchUpdateStatement { $stmnt = new CQLStatement(StatementType.BATCH_UPDATE, $batchUpdateStatement.expr); }
+ | batchStatement { $stmnt = new CQLStatement(StatementType.BATCH, $batchStatement.expr); }
| useStatement { $stmnt = new CQLStatement(StatementType.USE, $useStatement.keyspace); }
| truncateStatement { $stmnt = new CQLStatement(StatementType.TRUNCATE, $truncateStatement.cfam); }
- | deleteStatement { $stmnt = new CQLStatement(StatementType.DELETE, $deleteStatement.expr); }
+ | deleteStatement endStmnt { $stmnt = new CQLStatement(StatementType.DELETE, $deleteStatement.expr); }
| createKeyspaceStatement { $stmnt = new CQLStatement(StatementType.CREATE_KEYSPACE, $createKeyspaceStatement.expr); }
| createColumnFamilyStatement { $stmnt = new CQLStatement(StatementType.CREATE_COLUMNFAMILY, $createColumnFamilyStatement.expr); }
| createIndexStatement { $stmnt = new CQLStatement(StatementType.CREATE_INDEX, $createIndexStatement.expr); }
@@ -191,7 +191,7 @@ whereClause returns [WhereClause clause]
*/
insertStatement returns [UpdateStatement expr]
: {
- ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+ ConsistencyLevel cLevel = null;
Map<Term, Term> columns = new HashMap<Term, Term>();
List<Term> columnNames = new ArrayList<Term>();
@@ -202,32 +202,54 @@ insertStatement returns [UpdateStatement
K_VALUES
'(' key=term ( ',' column_value=term { columnValues.add($column_value.item); })+ ')'
( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); } )?
- endStmnt
{
return new UpdateStatement($columnFamily.text, cLevel, columnNames, columnValues, key);
}
;
/**
- * BEGIN BATCH [USING CONSISTENCY.<LVL>]
- * UPDATE <CF> SET name1 = value1 WHERE KEY = keyname1;
- * UPDATE <CF> SET name2 = value2 WHERE KEY = keyname2;
- * UPDATE <CF> SET name3 = value3 WHERE KEY = keyname3;
+ * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ * UPDATE <CF> SET name1 = value1 WHERE KEY = keyname1;
+ * UPDATE <CF> SET name2 = value2 WHERE KEY = keyname2;
+ * UPDATE <CF> SET name3 = value3 WHERE KEY = keyname3;
+ * ...
+ * APPLY BATCH
+ *
+ * OR
+ *
+ * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ * INSERT INTO <CF> (KEY, <name>) VALUES ('<key>', '<value>');
+ * INSERT INTO <CF> (KEY, <name>) VALUES ('<key>', '<value>');
+ * ...
+ * APPLY BATCH
+ *
+ * OR
+ *
+ * BEGIN BATCH [USING CONSISTENCY <LVL>]
+ * DELETE name1, name2 FROM <CF> WHERE key = <key>
+ * DELETE name3, name4 FROM <CF> WHERE key = <key>
+ * ...
* APPLY BATCH
*/
-batchUpdateStatement returns [BatchUpdateStatement expr]
+batchStatement returns [BatchStatement expr]
: {
ConsistencyLevel cLevel = ConsistencyLevel.ONE;
- List<UpdateStatement> updates = new ArrayList<UpdateStatement>();
+ List<AbstractModification> statements = new ArrayList<AbstractModification>();
}
K_BEGIN K_BATCH ( K_USING K_CONSISTENCY K_LEVEL { cLevel = ConsistencyLevel.valueOf($K_LEVEL.text); } )?
- u1=updateStatement ';'? { updates.add(u1); } ( uN=updateStatement ';'? { updates.add(uN); } )*
- K_APPLY K_BATCH EOF
+ s1=batchStatementObjective ';'? { statements.add(s1); } ( sN=batchStatementObjective ';'? { statements.add(sN); } )*
+ K_APPLY K_BATCH endStmnt
{
- return new BatchUpdateStatement(updates, cLevel);
+ return new BatchStatement(statements, cLevel);
}
;
+batchStatementObjective returns [AbstractModification statement]
+ : i=insertStatement { $statement = i; }
+ | u=updateStatement { $statement = u; }
+ | d=deleteStatement { $statement = d; }
+ ;
+
/**
* UPDATE
* <CF>
@@ -265,7 +287,7 @@ updateStatement returns [UpdateStatement
*/
deleteStatement returns [DeleteStatement expr]
: {
- ConsistencyLevel cLevel = ConsistencyLevel.ONE;
+ ConsistencyLevel cLevel = null;
List<Term> keyList = null;
List<Term> columnsList = Collections.emptyList();
}
@@ -274,7 +296,7 @@ deleteStatement returns [DeleteStatement
K_FROM columnFamily=( IDENT | STRING_LITERAL | INTEGER ) ( K_USING K_CONSISTENCY K_LEVEL )?
K_WHERE ( K_KEY '=' key=term { keyList = Collections.singletonList(key); }
| K_KEY K_IN '(' keys=termList { keyList = $keys.items; } ')'
- )? endStmnt
+ )?
{
return new DeleteStatement(columnsList, $columnFamily.text, cLevel, keyList);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java Tue Apr 26 14:53:53 2011
@@ -20,26 +20,37 @@
*/
package org.apache.cassandra.cql;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
+import static org.apache.cassandra.cql.QueryProcessor.validateColumnName;
/**
* A <code>DELETE</code> parsed from a CQL query statement.
*
*/
-public class DeleteStatement
+public class DeleteStatement extends AbstractModification
{
private List<Term> columns;
- private String columnFamily;
- private ConsistencyLevel cLevel;
private List<Term> keys;
public DeleteStatement(List<Term> columns, String columnFamily, ConsistencyLevel cLevel, List<Term> keys)
{
+ super(columnFamily, cLevel);
+
this.columns = columns;
- this.columnFamily = columnFamily;
- this.cLevel = cLevel;
this.keys = keys;
}
@@ -48,21 +59,44 @@ public class DeleteStatement
return columns;
}
- public String getColumnFamily()
+ public List<Term> getKeys()
{
- return columnFamily;
+ return keys;
}
- public ConsistencyLevel getConsistencyLevel()
+ /** {@inheritDoc} */
+ public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
{
- return cLevel;
- }
+ clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
+ CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
- public List<Term> getKeys()
- {
- return keys;
+ AbstractType comparator = metadata.getComparatorFor(null);
+ AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace, columnFamily).getKeyValidator();
+
+ List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+
+ for (Term key : keys)
+ {
+ RowMutation rm = new RowMutation(keyspace, key.getByteBuffer(keyType));
+
+ if (columns.size() < 1) // No columns, delete the row
+ rm.delete(new QueryPath(columnFamily), System.currentTimeMillis());
+ else // Delete specific columns
+ {
+ for (Term column : columns)
+ {
+ ByteBuffer columnName = column.getByteBuffer(comparator);
+ validateColumnName(columnName);
+ rm.delete(new QueryPath(columnFamily, null, columnName), System.currentTimeMillis());
+ }
+ }
+
+ rowMutations.add(rm);
+ }
+
+ return rowMutations;
}
-
+
public String toString()
{
return String.format("DeleteStatement(columns=%s, columnFamily=%s, consistency=%s keys=%s)",
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Tue Apr 26 14:53:53 2011
@@ -367,7 +367,7 @@ public class QueryProcessor
}
}
- private static void validateKey(ByteBuffer key) throws InvalidRequestException
+ public static void validateKey(ByteBuffer key) throws InvalidRequestException
{
if (key == null || key.remaining() == 0)
{
@@ -396,13 +396,13 @@ public class QueryProcessor
}
}
- private static void validateColumnName(ByteBuffer column)
+ public static void validateColumnName(ByteBuffer column)
throws InvalidRequestException
{
validateColumnNames(Arrays.asList(column));
}
- private static void validateColumn(CFMetaData metadata, ByteBuffer name, ByteBuffer value)
+ public static void validateColumn(CFMetaData metadata, ByteBuffer name, ByteBuffer value)
throws InvalidRequestException
{
validateColumnName(name);
@@ -543,15 +543,27 @@ public class QueryProcessor
result.type = CqlResultType.VOID;
return result;
- case BATCH_UPDATE:
- BatchUpdateStatement batch = (BatchUpdateStatement)statement.statement;
-
- for (UpdateStatement up : batch.getUpdates())
+ case BATCH:
+ BatchStatement batch = (BatchStatement) statement.statement;
+
+ for (AbstractModification up : batch.getStatements())
if (up.isSetConsistencyLevel())
throw new InvalidRequestException(
- "Consistency level must be set on the BATCH, not individual UPDATE statements");
-
- batchUpdate(clientState, batch.getUpdates(), batch.getConsistencyLevel());
+ "Consistency level must be set on the BATCH, not individual statements");
+
+ try
+ {
+ StorageProxy.mutate(batch.getMutations(keyspace, clientState), batch.getConsistencyLevel());
+ }
+ catch (org.apache.cassandra.thrift.UnavailableException e)
+ {
+ throw new UnavailableException();
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
+
result.type = CqlResultType.VOID;
return result;
@@ -583,34 +595,10 @@ public class QueryProcessor
case DELETE:
DeleteStatement delete = (DeleteStatement)statement.statement;
- clientState.hasColumnFamilyAccess(delete.getColumnFamily(), Permission.WRITE);
- metadata = validateColumnFamily(keyspace, delete.getColumnFamily(), false);
- comparator = metadata.getComparatorFor(null);
- AbstractType<?> keyType = DatabaseDescriptor.getCFMetaData(keyspace,
- delete.getColumnFamily()).getKeyValidator();
-
- List<RowMutation> rowMutations = new ArrayList<RowMutation>();
- for (Term key : delete.getKeys())
- {
- RowMutation rm = new RowMutation(keyspace, key.getByteBuffer(keyType));
- if (delete.getColumns().size() < 1) // No columns, delete the row
- rm.delete(new QueryPath(delete.getColumnFamily()), System.currentTimeMillis());
- else // Delete specific columns
- {
- for (Term column : delete.getColumns())
- {
- ByteBuffer columnName = column.getByteBuffer(comparator);
- validateColumnName(columnName);
- rm.delete(new QueryPath(delete.getColumnFamily(), null, columnName),
- System.currentTimeMillis());
- }
- }
- rowMutations.add(rm);
- }
-
+
try
{
- StorageProxy.mutate(rowMutations, delete.getConsistencyLevel());
+ StorageProxy.mutate(delete.prepareRowMutations(keyspace, clientState), delete.getConsistencyLevel());
}
catch (TimeoutException e)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Tue Apr 26 14:53:53 2011
@@ -24,7 +24,7 @@ import java.util.EnumSet;
public enum StatementType
{
- SELECT, INSERT, UPDATE, BATCH_UPDATE, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX,
+ SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX,
DROP_KEYSPACE, DROP_COLUMNFAMILY;
// Statement types that don't require a keyspace to be set.
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Tue Apr 26 14:53:53 2011
@@ -21,24 +21,29 @@
package org.apache.cassandra.cql;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
+import static org.apache.cassandra.cql.QueryProcessor.validateKey;
+import static org.apache.cassandra.cql.QueryProcessor.validateColumn;
+
+import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
+
/**
* An <code>UPDATE</code> statement parsed from a CQL query statement.
*
*/
-public class UpdateStatement
+public class UpdateStatement extends AbstractModification
{
- public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE;
- private String columnFamily;
- private ConsistencyLevel cLevel = null;
private Map<Term, Term> columns;
private List<Term> columnNames, columnValues;
private Term key;
@@ -54,8 +59,8 @@ public class UpdateStatement
*/
public UpdateStatement(String columnFamily, ConsistencyLevel cLevel, Map<Term, Term> columns, Term key)
{
- this.columnFamily = columnFamily;
- this.cLevel = cLevel;
+ super(columnFamily, cLevel);
+
this.columns = columns;
this.key = key;
}
@@ -86,8 +91,8 @@ public class UpdateStatement
*/
public UpdateStatement(String columnFamily, ConsistencyLevel cLevel, List<Term> columnNames, List<Term> columnValues, Term key)
{
- this.columnFamily = columnFamily;
- this.cLevel = cLevel;
+ super(columnFamily, cLevel);
+
this.columnNames = columnNames;
this.columnValues = columnValues;
this.key = key;
@@ -114,6 +119,37 @@ public class UpdateStatement
return (cLevel != null);
}
+ /** {@inheritDoc} */
+ public List<RowMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException
+ {
+ List<String> cfamsSeen = new ArrayList<String>();
+
+ CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false);
+
+ // Avoid unnecessary authorizations.
+ if (!(cfamsSeen.contains(columnFamily)))
+ {
+ clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE);
+ cfamsSeen.add(columnFamily);
+ }
+
+ ByteBuffer key = this.key.getByteBuffer(getKeyType(keyspace));
+ validateKey(key);
+ AbstractType<?> comparator = getComparator(keyspace);
+
+ RowMutation rm = new RowMutation(keyspace, key);
+ for (Map.Entry<Term, Term> column : getColumns().entrySet())
+ {
+ ByteBuffer colName = column.getKey().getByteBuffer(comparator);
+ ByteBuffer colValue = column.getValue().getByteBuffer(getValueValidator(keyspace, colName));
+
+ validateColumn(metadata, colName, colValue);
+ rm.add(new QueryPath(columnFamily, null, colName), colValue, System.currentTimeMillis());
+ }
+
+ return Arrays.asList(rm);
+ }
+
public String getColumnFamily()
{
return columnFamily;
Modified: cassandra/trunk/test/system/test_cql.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1096771&r1=1096770&r2=1096771&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Tue Apr 26 14:53:53 2011
@@ -699,3 +699,60 @@ class TestCql(ThriftTester):
r = cursor.fetchone()
assert r[1] == "some_value", \
"unrecognized value '%s'" % r[1]
+
+ def test_batch_with_mixed_statements(self):
+ "handle BATCH with INSERT/UPDATE/DELETE statements mixed in it"
+ cursor = init()
+ cursor.compression = 'NONE'
+ cursor.execute("""
+ BEGIN BATCH USING CONSISTENCY ONE
+ UPDATE StandardString1 SET :name = :val WHERE KEY = :key1
+ INSERT INTO StandardString1 (KEY, :col1) VALUES (:key2, :val)
+ INSERT INTO StandardString1 (KEY, :col2) VALUES (:key3, :val)
+ DELETE :col2 FROM StandardString1 WHERE key = :key3
+ APPLY BATCH
+ """,
+ dict(key1="bKey1", key2="bKey2", key3="bKey3", name="bName", col1="bCol1", col2="bCol2", val="bVal"))
+
+ cursor.execute("SELECT :name FROM StandardString1 WHERE KEY = :key",
+ dict(name="bName", key="bKey1"))
+
+ assert cursor.rowcount == 1, "expected 1 result, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+ assert colnames[1] == "bName", \
+ "unrecognized name '%s'" % colnames[1]
+ r = cursor.fetchone()
+ assert r[1] == "bVal", \
+ "unrecognized value '%s'" % r[1]
+
+ cursor.execute("SELECT :name FROM StandardString1 WHERE KEY = :key",
+ dict(name="bCol2", key="bKey3"))
+
+ assert cursor.rowcount == 1, "expected 1 result, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+ assert colnames[1] == "bCol2", \
+ "unrecognized name '%s'" % colnames[1]
+ # was deleted by DELETE statement
+ r = cursor.fetchone()
+ assert r[1] == None, \
+ "unrecognized value '%s'" % r[1]
+
+ cursor.execute("SELECT :name FROM StandardString1 WHERE KEY = :key",
+ dict(name="bCol1", key="bKey2"))
+
+ assert cursor.rowcount == 1, "expected 1 result, got %d" % cursor.rowcount
+ colnames = [col_d[0] for col_d in cursor.description]
+ assert colnames[1] == "bCol1", \
+ "unrecognized name '%s'" % colnames[1]
+ r = cursor.fetchone()
+ assert r[1] == "bVal", \
+ "unrecognized value '%s'" % r[1]
+
+ assert_raises(cql.ProgrammingError,
+ cursor.execute,
+ """
+ BEGIN BATCH USING CONSISTENCY ONE
+ UPDATE USING CONSISTENCY QUORUM StandardString1 SET 'name' = 'value' WHERE KEY = 'bKey4'
+ DELETE 'name' FROM StandardString1 WHERE KEY = 'bKey4'
+ APPLY BATCH
+ """)