You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/06/21 00:23:56 UTC

[4/5] git commit: Allow counter mutations in UNLOGGED batches

Allow counter mutations in UNLOGGED batches

patch by tjake; reviewed by Aleksey Yeschenko for CASSANDRA-7351


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee401cf8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee401cf8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee401cf8

Branch: refs/heads/trunk
Commit: ee401cf8131a779069805cbe9ef4ab05d4a63b9a
Parents: a7ad9d2
Author: Jake Luciani <ja...@apache.org>
Authored: Fri Jun 20 15:16:12 2014 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Jun 20 15:16:12 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cql3/QueryProcessor.java   |   1 +
 .../cql3/statements/BatchStatement.java         |  75 ++++++----
 .../transport/messages/BatchMessage.java        |  11 +-
 .../org/apache/cassandra/cql3/BatchTests.java   | 145 +++++++++++++++++++
 .../org/apache/cassandra/cql3/DeleteTest.java   |  27 ++--
 6 files changed, 216 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b0d8e49..4f68cf7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc2
+ * Allow counter mutations in UNLOGGED batches (CASSANDRA-7351)
  * Modify reconcile logic to always pick a tombstone over a counter cell
    (CASSANDRA-7346)
  * Avoid incremental compaction on Windows (CASSANDRA-7365)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 287a700..86362f7 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -418,6 +418,7 @@ public class QueryProcessor implements QueryHandler
     {
         ClientState clientState = queryState.getClientState();
         batch.checkAccess(clientState);
+        batch.validate();
         batch.validate(clientState);
         return batch.execute(queryState, options);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 3cec81b..b7d69cc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -95,7 +95,8 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             statement.checkAccess(state);
     }
 
-    public void validate(ClientState state) throws InvalidRequestException
+    // Validates a prepared batch statement without validating its nested statements.
+    public void validate() throws InvalidRequestException
     {
         if (attrs.isTimeToLiveSet())
             throw new InvalidRequestException("Global TTL on the BATCH statement is not supported.");
@@ -109,15 +110,54 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                 throw new InvalidRequestException("Cannot provide custom timestamp for counter BATCH");
         }
 
+        boolean hasCounters = false;
+        boolean hasNonCounters = false;
+
         for (ModificationStatement statement : statements)
         {
+            if (timestampSet && statement.isCounter())
+                throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters");
+
             if (timestampSet && statement.isTimestampSet())
                 throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements");
 
-            statement.validate(state);
+            if (type == Type.COUNTER && !statement.isCounter())
+                throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
+
+            if (type == Type.LOGGED && statement.isCounter())
+                throw new InvalidRequestException("Cannot include a counter statement in a logged batch");
+
+            if (statement.isCounter())
+                hasCounters = true;
+            else
+                hasNonCounters = true;
+        }
+
+        if (hasCounters && hasNonCounters)
+            throw new InvalidRequestException("Counter and non-counter mutations cannot exist in the same batch");
+
+        if (hasConditions)
+        {
+            String ksName = null;
+            String cfName = null;
+            for (ModificationStatement stmt : statements)
+            {
+                if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName)))
+                    throw new InvalidRequestException("Batch with conditions cannot span multiple tables");
+                ksName = stmt.keyspace();
+                cfName = stmt.columnFamily();
+            }
         }
     }
 
+    // The batch itself will be validated in either Parsed#prepare() - for regular CQL3 batches,
+    //   or in QueryProcessor.processBatch() - for native protocol batches.
+    public void validate(ClientState state) throws InvalidRequestException
+    {
+        for (ModificationStatement statement : statements)
+            statement.validate(state);
+    }
+
     public List<ModificationStatement> getStatements()
     {
         return statements;
@@ -180,12 +220,12 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             {
                 mut = new Mutation(ksName, key);
                 mut.setSourceFrame(sourceFrame);
-                mutation = type == Type.COUNTER ? new CounterMutation(mut, options.getConsistency()) : mut;
+                mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency()) : mut;
                 ksMap.put(key, mutation);
             }
             else
             {
-                mut = type == Type.COUNTER ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
+                mut = statement.cfm.isCounter() ? ((CounterMutation)mutation).getMutation() : (Mutation)mutation;
             }
 
             statement.addUpdateForKey(mut.addOrGet(statement.cfm), key, clusteringPrefix, params);
@@ -356,40 +396,25 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         {
             VariableSpecifications boundNames = getBoundVariables();
 
-            List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size());
+            List<ModificationStatement> statements = new ArrayList<>(parsedStatements.size());
             boolean hasConditions = false;
+
             for (ModificationStatement.Parsed parsed : parsedStatements)
             {
                 ModificationStatement stmt = parsed.prepare(boundNames);
                 if (stmt.hasConditions())
                     hasConditions = true;
 
-                if (stmt.isCounter() && type != Type.COUNTER)
-                    throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches");
-
-                if (!stmt.isCounter() && type == Type.COUNTER)
-                    throw new InvalidRequestException("Only counter mutations are allowed in COUNTER batches");
-
                 statements.add(stmt);
             }
 
-            if (hasConditions)
-            {
-                String ksName = null;
-                String cfName = null;
-                for (ModificationStatement stmt : statements)
-                {
-                    if (ksName != null && (!stmt.keyspace().equals(ksName) || !stmt.columnFamily().equals(cfName)))
-                        throw new InvalidRequestException("Batch with conditions cannot span multiple tables");
-                    ksName = stmt.keyspace();
-                    cfName = stmt.columnFamily();
-                }
-            }
-
             Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
             prepAttrs.collectMarkerSpecification(boundNames);
 
-            return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions), boundNames);
+            BatchStatement batchStatement = new BatchStatement(boundNames.size(), type, statements, prepAttrs, hasConditions);
+            batchStatement.validate();
+
+            return new ParsedStatement.Prepared(batchStatement, boundNames);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index e2cb8a1..c199a62 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -209,16 +209,7 @@ public class BatchMessage extends Message.Request
 
                 ModificationStatement mst = (ModificationStatement)statement;
                 hasConditions |= mst.hasConditions();
-                if (mst.isCounter())
-                {
-                    if (type != BatchStatement.Type.COUNTER)
-                        throw new InvalidRequestException("Cannot include counter statement in a non-counter batch");
-                }
-                else
-                {
-                    if (type == BatchStatement.Type.COUNTER)
-                        throw new InvalidRequestException("Cannot include non-counter statement in a counter batch");
-                }
+
                 statements.add(mst);
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/test/unit/org/apache/cassandra/cql3/BatchTests.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/BatchTests.java b/test/unit/org/apache/cassandra/cql3/BatchTests.java
new file mode 100644
index 0000000..27d407e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/BatchTests.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class BatchTests
+{
+    private static EmbeddedCassandraService cassandra;
+
+    private static Cluster cluster;
+    private static Session session;
+
+
+    private static PreparedStatement counter;
+    private static PreparedStatement noncounter;
+
+    @BeforeClass()
+    public static void setup() throws ConfigurationException, IOException
+    {
+        cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        session = cluster.connect();
+
+        session.execute("drop keyspace if exists junit;");
+        session.execute("create keyspace junit WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
+        session.execute("CREATE TABLE junit.noncounter (\n" +
+                "  id int PRIMARY KEY,\n" +
+                "  val text\n" +
+                ");");
+        session.execute("CREATE TABLE junit.counter (\n" +
+                "  id int PRIMARY KEY,\n" +
+                "  val counter,\n" +
+                ");");
+
+
+        noncounter = session.prepare("insert into junit.noncounter(id, val)values(?,?)");
+        counter = session.prepare("update junit.counter set val = val + ? where id = ?");
+    }
+
+    @Test(expected = InvalidQueryException.class)
+    public void testMixedInCounterBatch()
+    {
+       sendBatch(BatchStatement.Type.COUNTER, true, true);
+    }
+
+    @Test(expected = InvalidQueryException.class)
+    public void testMixedInLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.LOGGED, true, true);
+    }
+
+    @Test(expected = InvalidQueryException.class)
+    public void testMixedInUnLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.UNLOGGED, true, true);
+    }
+
+    @Test(expected = InvalidQueryException.class)
+    public void testNonCounterInCounterBatch()
+    {
+        sendBatch(BatchStatement.Type.COUNTER, false, true);
+    }
+
+    @Test
+    public void testNonCounterInLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.LOGGED, false, true);
+    }
+
+    @Test
+    public void testNonCounterInUnLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.UNLOGGED, false, true);
+    }
+
+    @Test
+    public void testCounterInCounterBatch()
+    {
+        sendBatch(BatchStatement.Type.COUNTER, true, false);
+    }
+
+    @Test
+    public void testCounterInUnLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.UNLOGGED, true, false);
+    }
+
+
+    @Test(expected = InvalidQueryException.class)
+    public void testCounterInLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.LOGGED, true, false);
+    }
+
+
+
+    public void sendBatch(BatchStatement.Type type, boolean addCounter, boolean addNonCounter)
+    {
+
+        assert addCounter || addNonCounter;
+        BatchStatement b = new BatchStatement(type);
+
+        for (int i = 0; i < 10; i++)
+        {
+            if (addNonCounter)
+                b.add(noncounter.bind(i, "foo"));
+
+            if (addCounter)
+                b.add(counter.bind((long)i, i));
+        }
+
+        session.execute(b);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee401cf8/test/unit/org/apache/cassandra/cql3/DeleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/DeleteTest.java
index 3395dcc..d2dbc79 100644
--- a/test/unit/org/apache/cassandra/cql3/DeleteTest.java
+++ b/test/unit/org/apache/cassandra/cql3/DeleteTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.cassandra.cql3;
 
 
@@ -16,9 +33,8 @@ import org.junit.Test;
 
 import java.io.IOException;
 
-public class DeleteTest extends SchemaLoader
+public class DeleteTest
 {
-
     private static EmbeddedCassandraService cassandra;
 
     private static Cluster cluster;
@@ -35,8 +51,6 @@ public class DeleteTest extends SchemaLoader
     @BeforeClass()
     public static void setup() throws ConfigurationException, IOException
     {
-
-        Schema.instance.clear(); // Schema are now written on disk and will be reloaded
         cassandra = new EmbeddedCassandraService();
         cassandra.start();
 
@@ -80,11 +94,6 @@ public class DeleteTest extends SchemaLoader
                 "  val text ,\n" +
                 "  PRIMARY KEY ( ( id ), cid )\n" +
                 ");");
-        try {
-            Thread.sleep(2000L);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
 
         pstmtI = session.prepare("insert into junit.tpc_inherit_b ( id, cid, inh_b, val) values (?, ?, ?, ?)");
         pstmtU = session.prepare("update junit.tpc_inherit_b set inh_b=?, val=? where id=? and cid=?");