You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/06/09 23:42:39 UTC

[1/2] git commit: Fix native protocol CAS batches

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 9adb31ca9 -> 6d1f05e34


Fix native protocol CAS batches

patch by Sylvain Lebresne; reviewed by Aleksey Yeschenko for
CASSANDRA-7337


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa9894df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa9894df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa9894df

Branch: refs/heads/cassandra-2.1
Commit: aa9894df439eefd6af2732c9a49875c20a3d4902
Parents: 0c3424e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Tue Jun 10 00:27:42 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Jun 10 00:27:42 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                                   | 1 +
 src/java/org/apache/cassandra/cql3/QueryProcessor.java        | 3 +--
 .../org/apache/cassandra/cql3/statements/BatchStatement.java  | 7 +------
 .../org/apache/cassandra/transport/messages/BatchMessage.java | 4 +++-
 4 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa9894df/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac95e3f..97ac75e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.9
+ * Fix native protocol CAS batches (CASSANDRA-7337)
  * Add per-CF range read request latency metrics (CASSANDRA-7338)
  * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
  * Add conditional CREATE/DROP USER support (CASSANDRA-7264)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa9894df/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 3d9c5a8..a59fe9b 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -316,8 +316,7 @@ public class QueryProcessor implements QueryHandler
         batch.checkAccess(clientState);
         batch.validate(clientState);
 
-        batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues());
-        return new ResultMessage.Void();
+        return batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues());
     }
 
     public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa9894df/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 875e41c..8fc1ecc 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -64,11 +64,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
      * @param statements a list of UpdateStatements
      * @param attrs additional attributes for statement (CL, timestamp, timeToLive)
      */
-    public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs)
-    {
-        this(boundTerms, type, statements, attrs, false);
-    }
-
     public BatchStatement(int boundTerms, Type type, List<ModificationStatement> statements, Attributes attrs, boolean hasConditions)
     {
         this.boundTerms = boundTerms;
@@ -254,7 +249,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             return executeWithConditions(variables, cl, serialCl, now);
 
         executeWithoutConditions(getMutations(variables, local, cl, now), cl);
-        return null;
+        return new ResultMessage.Void();
     }
 
     private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa9894df/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 221dcd9..34dd8fe 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -162,6 +162,7 @@ public class BatchMessage extends Message.Request
 
             QueryHandler handler = state.getClientState().getCQLQueryHandler();
             List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size());
+            boolean hasConditions = false;
             for (int i = 0; i < queryOrIdList.size(); i++)
             {
                 Object query = queryOrIdList.get(i);
@@ -186,6 +187,7 @@ public class BatchMessage extends Message.Request
                     throw new InvalidRequestException("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
 
                 ModificationStatement mst = (ModificationStatement)statement;
+                hasConditions |= mst.hasConditions();
                 if (mst.isCounter())
                 {
                     if (type != BatchStatement.Type.COUNTER)
@@ -201,7 +203,7 @@ public class BatchMessage extends Message.Request
 
             // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong one).
-            BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none());
+            BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none(), hasConditions);
             Message.Response response = handler.processBatch(batch, state, new BatchQueryOptions(consistency, values, queryOrIdList));
 
             if (tracingId != null)


[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/cql3/QueryProcessor.java
	src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
	src/java/org/apache/cassandra/transport/messages/BatchMessage.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d1f05e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d1f05e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d1f05e3

Branch: refs/heads/cassandra-2.1
Commit: 6d1f05e34cbee738e8bbe2967481516abbd254d4
Parents: 9adb31c aa9894d
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Jun 10 00:42:30 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Jun 10 00:42:30 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                                   | 1 +
 src/java/org/apache/cassandra/cql3/QueryProcessor.java        | 3 +--
 .../org/apache/cassandra/cql3/statements/BatchStatement.java  | 7 +------
 .../org/apache/cassandra/transport/messages/BatchMessage.java | 4 +++-
 4 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d1f05e3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 784f9f9,97ac75e..b70782b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,17 +1,17 @@@
 -2.0.9
 +2.1.0
+  * Fix native protocol CAS batches (CASSANDRA-7337)
 + * Reduce likelihood of contention on local paxos locking (CASSANDRA-7359)
 + * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
 + * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
 + * Don't fail streams on failure detector downs (CASSANDRA-3569)
 + * Add optional keyspace to DROP INDEX statement (CASSANDRA-7314)
 + * Reduce run time for CQL tests (CASSANDRA-7327)
 + * Fix heap size calculation on Windows (CASSANDRA-7352)
 + * RefCount native frames from netty (CASSANDRA-7245)
 + * Use tarball dir instead of /var for default paths (CASSANDRA-7136)
 +Merged from 2.0:
   * Add per-CF range read request latency metrics (CASSANDRA-7338)
   * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
 - * Add conditional CREATE/DROP USER support (CASSANDRA-7264)
 - * Swap local and global default read repair chances (CASSANDRA-7320)
 - * Add missing iso8601 patterns for date strings (CASSANDRA-6973)
 - * Support selecting multiple rows in a partition using IN (CASSANDRA-6875)
 - * cqlsh: always emphasize the partition key in DESC output (CASSANDRA-7274)
 - * Copy compaction options to make sure they are reloaded (CASSANDRA-7290)
 - * Add option to do more aggressive tombstone compactions (CASSANDRA-6563)
 - * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288)
 - * Add authentication support to shuffle (CASSANDRA-6484)
 - * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
   * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
   * Fix infinite loop on exception while streaming (CASSANDRA-7330)
  Merged from 1.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d1f05e3/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f50bc81,a59fe9b..287a700
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -419,8 -315,8 +419,7 @@@ public class QueryProcessor implements 
          ClientState clientState = queryState.getClientState();
          batch.checkAccess(clientState);
          batch.validate(clientState);
-         batch.execute(queryState, options);
-         return new ResultMessage.Void();
 -
 -        return batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues());
++        return batch.execute(queryState, options);
      }
  
      public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d1f05e3/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 5b058f3,8fc1ecc..e513aef
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -219,27 -226,30 +214,27 @@@ public class BatchStatement implements 
  
      public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
      {
 -        if (options.getConsistency() == null)
 -            throw new InvalidRequestException("Invalid empty consistency level");
 -
 -        return execute(new PreparedBatchVariables(options.getValues()), false, options.getConsistency(), options.getSerialConsistency(), queryState.getTimestamp());
 +        return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
      }
  
 -    public ResultMessage executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException
 +    public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException
      {
 -        if (cl == null)
 -            throw new InvalidRequestException("Invalid empty consistency level");
 -
 -        return execute(new BatchOfPreparedVariables(variables), false, cl, ConsistencyLevel.SERIAL, queryState.getTimestamp());
 +        return execute(options, false, options.getTimestamp(queryState));
      }
  
 -    public ResultMessage execute(BatchVariables variables, boolean local, ConsistencyLevel cl, ConsistencyLevel serialCl, long now)
 +    public ResultMessage execute(BatchQueryOptions options, boolean local, long now)
      throws RequestExecutionException, RequestValidationException
      {
 -        // TODO: we don't support a serial consistency for batches in the protocol so defaulting to SERIAL for now.
 -        // We'll need to fix that.
 +        if (options.getConsistency() == null)
 +            throw new InvalidRequestException("Invalid empty consistency level");
 +        if (options.getSerialConsistency() == null)
 +            throw new InvalidRequestException("Invalid empty serial consistency level");
 +
          if (hasConditions)
 -            return executeWithConditions(variables, cl, serialCl, now);
 +            return executeWithConditions(options, now);
  
 -        executeWithoutConditions(getMutations(variables, local, cl, now), cl);
 +        executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
-         return null;
+         return new ResultMessage.Void();
      }
  
      private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d1f05e3/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index ec96ed1,34dd8fe..e2cb8a1
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@@ -170,7 -161,8 +170,8 @@@ public class BatchMessage extends Messa
              }
  
              QueryHandler handler = state.getClientState().getCQLQueryHandler();
 -            List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size());
 +            List<ParsedStatement.Prepared> prepared = new ArrayList<>(queryOrIdList.size());
+             boolean hasConditions = false;
              for (int i = 0; i < queryOrIdList.size(); i++)
              {
                  Object query = queryOrIdList.get(i);
@@@ -222,8 -203,8 +224,8 @@@
  
              // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
              // (and no value would be really correct, so we prefer passing a clearly wrong one).
-             BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none());
+             BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none(), hasConditions);
 -            Message.Response response = handler.processBatch(batch, state, new BatchQueryOptions(consistency, values, queryOrIdList));
 +            Message.Response response = handler.processBatch(batch, state, batchOptions);
  
              if (tracingId != null)
                  response.setTracingId(tracingId);