You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/06/21 00:23:56 UTC
[4/5] git commit: Allow counter mutations in UNLOGGED batches
Allow counter mutations in UNLOGGED batches
patch by tjake; reviewed by Aleksey Yeschenko for CASSANDRA-7351
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee401cf8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee401cf8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee401cf8
Branch: refs/heads/trunk
Commit: ee401cf8131a779069805cbe9ef4ab05d4a63b9a
Parents: a7ad9d2
Author: Jake Luciani <ja...@apache.org>
Authored: Fri Jun 20 15:16:12 2014 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jun 20 15:16:12 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/QueryProcessor.java | 1 +
.../cql3/statements/BatchStatement.java | 75 ++++++----
.../transport/messages/BatchMessage.java | 11 +-
.../org/apache/cassandra/cql3/BatchTests.java | 145 +++++++++++++++++++
.../org/apache/cassandra/cql3/DeleteTest.java | 27 ++--
6 files changed, 216 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b0d8e49..4f68cf7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.0-rc2
+ * Allow counter mutations in UNLOGGED batches (CASSANDRA-7351)
* Modify reconcile logic to always pick a tombstone over a counter cell
(CASSANDRA-7346)
* Avoid incremental compaction on Windows (CASSANDRA-7365)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 287a700..86362f7 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -418,6 +418,7 @@ public class QueryProcessor implements QueryHandler
{
ClientState clientState = queryState.getClientState();
batch.checkAccess(clientState);
+ batch.validate();
batch.validate(clientState);
return batch.execute(queryState, options);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 3cec81b..b7d69cc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -95,7 +95,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
statement.checkAccess(state);
}
- public void validate(ClientState state) throws InvalidRequestException
+ // Validates a prepared batch statement without validating its nested statements.
+ public void validate() throws InvalidRequestException
{
if (attrs.isTimeToLiveSet())
throw new InvalidRequestException("Global TTL on the BATCH statement is not supported.");
@@ -109,15 +110,54 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
throw new InvalidRequestException("Cannot provide custom timestamp for counter BATCH");
}
+ boolean hasCounters = false;
+ boolean hasNonCounters = false;
+
for (ModificationStatement statement : statements)
{
+ if (timestampSet && statement.isCounter())
+ throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters");
+
if (timestampSet && statement.isTimestampSet())
throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
- statement.validate(state);
+ if (type == Type.COUNTER && !statement.isCounter())
+ throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
+
+ if (type == Type.LOGGED && statement.isCounter())
+ throw new InvalidRequestException("Cannot include a counter statement in a logged batch");
+
+ if (statement.isCounter())
+ hasCounters = true;
+ else
+ hasNonCounters = true;
+ }
+
+ if (hasCounters && hasNonCounters)
+ throw new InvalidRequestException("Counter and non-counter mutations cannot exist in the same batch");
+
+ if (hasConditions)
+ {
+ String ksName = null;
+ String cfName = null;
+ for (ModificationStatement stmt : statements)
+ {
+ if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName)))
+ throw new InvalidRequestException("Batch with conditions cannot span multiple tables");
+ ksName = stmt.keyspace();
+ cfName = stmt.columnFamily();
+ }
}
}
+ // The batch itself will be validated in either Parsed#prepare() - for regular CQL3 batches,
+ // or in QueryProcessor.processBatch() - for native protocol batches.
+ public void validate(ClientState state) throws InvalidRequestException
+ {
+ for (ModificationStatement statement : statements)
+ statement.validate(state);
+ }
+
public List<ModificationStatement> getStatements()
{
return statements;
@@ -180,12 +220,12 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
{
mut = new Mutation(ksName, key);
mut.setSourceFrame(sourceFrame);
- mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut;
+ mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut;
ksMap.put(key, mutation);
}
else
{
- mut = type == Type.COUNTER ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
+ mut = statement.cfm.isCounter() ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
}
statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
@@ -356,40 +396,25 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
{
VariableSpecifications boundNames = getBoundVariables();
- List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size());
+ List<ModificationStatement> statements = new ArrayList<>(parsedStatements.size());
boolean hasConditions = false;
+
for (ModificationStatement.Parsed parsed : parsedStatements)
{
ModificationStatement stmt = parsed.prepare(boundNames);
if (stmt.hasConditions())
hasConditions = true;
- if (stmt.isCounter() && type != Type.COUNTER)
- throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
-
- if (!stmt.isCounter() && type == Type.COUNTER)
- throw new InvalidRequestException("Only counter mutations are allowed in COUNTER batches");
-
statements.add(stmt);
}
- if (hasConditions)
- {
- String ksName = null;
- String cfName = null;
- for (ModificationStatement stmt : statements)
- {
- if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName)))
- throw new InvalidRequestException("Batch with conditions cannot span multiple tables");
- ksName = stmt.keyspace();
- cfName = stmt.columnFamily();
- }
- }
-
Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
prepAttrs.collectMarkerSpecification(boundNames);
- return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions), boundNames);
+ BatchStatement batchStatement = new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions);
+ batchStatement.validate();
+
+ return new ParsedStatement.Prepared(batchStatement, boundNames);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index e2cb8a1..c199a62 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -209,16 +209,7 @@ public class BatchMessage extends Message.Request
ModificationStatement mst = (ModificationStatement)statement;
hasConditions |= mst.hasConditions();
- if (mst.isCounter())
- {
- if (type != BatchStatement.Type.COUNTER)
- throw new InvalidRequestException("Cannot include counter statement in a non-counter batch");
- }
- else
- {
- if (type == BatchStatement.Type.COUNTER)
- throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
- }
+
statements.add(mst);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/test/unit/org/apache/cassandra/cql3/BatchTests.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/BatchTests.java b/test/unit/org/apache/cassandra/cql3/BatchTests.java
new file mode 100644
index 0000000..27d407e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/BatchTests.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class BatchTests
+{
+ private static EmbeddedCassandraService cassandra;
+
+ private static Cluster cluster;
+ private static Session session;
+
+
+ private static PreparedStatement counter;
+ private static PreparedStatement noncounter;
+
+ @BeforeClass()
+ public static void setup() throws ConfigurationException, IOException
+ {
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+
+ cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+ session = cluster.connect();
+
+ session.execute("drop keyspace if exists junit;");
+ session.execute("create keyspace junit WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
+ session.execute("CREATE TABLE junit.noncounter (\n" +
+ " id int PRIMARY KEY,\n" +
+ " val text\n" +
+ ");");
+ session.execute("CREATE TABLE junit.counter (\n" +
+ " id int PRIMARY KEY,\n" +
+ " val counter,\n" +
+ ");");
+
+
+ noncounter = session.prepare("insert into junit.noncounter(id, val)values(?,?)");
+ counter = session.prepare("update junit.counter set val = val + ? where id = ?");
+ }
+
+ @Test(expected = InvalidQueryException.class)
+ public void testMixedInCounterBatch()
+ {
+ sendBatch(BatchStatement.Type.COUNTER, true, true);
+ }
+
+ @Test(expected = InvalidQueryException.class)
+ public void testMixedInLoggedBatch()
+ {
+ sendBatch(BatchStatement.Type.LOGGED, true, true);
+ }
+
+ @Test(expected = InvalidQueryException.class)
+ public void testMixedInUnLoggedBatch()
+ {
+ sendBatch(BatchStatement.Type.UNLOGGED, true, true);
+ }
+
+ @Test(expected = InvalidQueryException.class)
+ public void testNonCounterInCounterBatch()
+ {
+ sendBatch(BatchStatement.Type.COUNTER, false, true);
+ }
+
+ @Test
+ public void testNonCounterInLoggedBatch()
+ {
+ sendBatch(BatchStatement.Type.LOGGED, false, true);
+ }
+
+ @Test
+ public void testNonCounterInUnLoggedBatch()
+ {
+ sendBatch(BatchStatement.Type.UNLOGGED, false, true);
+ }
+
+ @Test
+ public void testCounterInCounterBatch()
+ {
+ sendBatch(BatchStatement.Type.COUNTER, true, false);
+ }
+
+ @Test
+ public void testCounterInUnLoggedBatch()
+ {
+ sendBatch(BatchStatement.Type.UNLOGGED, true, false);
+ }
+
+
+ @Test(expected = InvalidQueryException.class)
+ public void testCounterInLoggedBatch()
+ {
+ sendBatch(BatchStatement.Type.LOGGED, true, false);
+ }
+
+
+
+ public void sendBatch(BatchStatement.Type type, boolean addCounter, boolean addNonCounter)
+ {
+
+ assert addCounter || addNonCounter;
+ BatchStatement b = new BatchStatement(type);
+
+ for (int i = 0; i < 10; i++)
+ {
+ if (addNonCounter)
+ b.add(noncounter.bind(i, "foo"));
+
+ if (addCounter)
+ b.add(counter.bind((long)i, i));
+ }
+
+ session.execute(b);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/test/unit/org/apache/cassandra/cql3/DeleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/DeleteTest.java
index 3395dcc..d2dbc79 100644
--- a/test/unit/org/apache/cassandra/cql3/DeleteTest.java
+++ b/test/unit/org/apache/cassandra/cql3/DeleteTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.cassandra.cql3;
@@ -16,9 +33,8 @@ import org.junit.Test;
import java.io.IOException;
-public class DeleteTest extends SchemaLoader
+public class DeleteTest
{
-
private static EmbeddedCassandraService cassandra;
private static Cluster cluster;
@@ -35,8 +51,6 @@ public class DeleteTest extends SchemaLoader
@BeforeClass()
public static void setup() throws ConfigurationException, IOException
{
-
- Schema.instance.clear(); // Schema are now written on disk and will be reloaded
cassandra = new EmbeddedCassandraService();
cassandra.start();
@@ -80,11 +94,6 @@ public class DeleteTest extends SchemaLoader
" val text ,\n" +
" PRIMARY KEY ( ( id ), cid )\n" +
");");
- try {
- Thread.sleep(2000L);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
pstmtI = session.prepare("insert into junit.tpc_inherit_b ( id, cid, inh_b, val) values (?, ?, ?, ?)");
pstmtU = session.prepare("update junit.tpc_inherit_b set inh_b=?, val=? where id=? and cid=?");