You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/17 14:40:09 UTC

git commit: Fix count(*) queries in a mixed cluster

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 652ec6a5c -> 44cf4a66d


Fix count(*) queries in a mixed cluster

patch by Tyler Hobbs; reviewed by Piotr Kołaczkowski for CASSANDRA-6707


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44cf4a66
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44cf4a66
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44cf4a66

Branch: refs/heads/cassandra-2.0
Commit: 44cf4a66d157643297b7ab791a57f323432e28c5
Parents: 652ec6a
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Mon Feb 17 16:39:29 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Feb 17 16:39:29 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++-
 .../cql3/statements/SelectStatement.java        |  4 ++-
 .../apache/cassandra/net/MessagingService.java  | 26 +++++++++++++++++++-
 3 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd3b1b7..c9fabd2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,7 +12,8 @@
  * Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652)
  * Make commitlog failure handling configurable (CASSANDRA-6364)
  * Avoid overlaps in LCS (CASSANDRA-6688)
- * improve support for paginating over composites (4851)
+ * Improve support for paginating over composites (CASSANDRA-4851)
+ * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
 Merged from 1.2:
  * Fix broken streams when replacing with same IP (CASSANDRA-6622)
  * Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d42fd76..52a7c70 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
@@ -165,7 +166,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         int pageSize = options.getPageSize();
         // A count query will never be paged for the user, but we always page it internally to avoid OOM.
         // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
-        if (parameters.isCount && pageSize <= 0)
+        // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
+        if (parameters.isCount && pageSize <= 0 && MessagingService.instance().allNodesAtLeast20)
             pageSize = DEFAULT_COUNT_PAGE_SIZE;
 
         if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 232cf6a..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -73,6 +73,8 @@ public final class MessagingService implements MessagingServiceMBean
     public static final int VERSION_20  = 7;
     public static final int current_version = VERSION_20;
 
+    public boolean allNodesAtLeast20 = true;
+
     /**
      * we preface every message with this number so the recipient can validate the sender is sane
      */
@@ -742,14 +744,36 @@ public final class MessagingService implements MessagingServiceMBean
     public int setVersion(InetAddress endpoint, int version)
     {
         logger.debug("Setting version {} for {}", version, endpoint);
+        if (version < VERSION_20)
+            allNodesAtLeast20 = false;
         Integer v = versions.put(endpoint, version);
+
+        // if the version was increased to 2.0 or later, see if all nodes are >= 2.0 now
+        if (v != null && v < VERSION_20 && version >= VERSION_20)
+            refreshAllNodesAtLeast20();
+
         return v == null ? version : v;
     }
 
     public void resetVersion(InetAddress endpoint)
     {
         logger.debug("Reseting version for {}", endpoint);
-        versions.remove(endpoint);
+        Integer removed = versions.remove(endpoint);
+        if (removed != null && removed <= VERSION_20)
+            refreshAllNodesAtLeast20();
+    }
+
+    private void refreshAllNodesAtLeast20()
+    {
+        for (Integer version: versions.values())
+        {
+            if (version < VERSION_20)
+            {
+                allNodesAtLeast20 = false;
+                return;
+            }
+        }
+        allNodesAtLeast20 = true;
     }
 
     public int getVersion(InetAddress endpoint)