You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/17 14:40:09 UTC
git commit: Fix count(*) queries in a mixed cluster
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 652ec6a5c -> 44cf4a66d
Fix count(*) queries in a mixed cluster
patch by Tyler Hobbs; reviewed by Piotr Kołaczkowski for CASSANDRA-6707
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44cf4a66
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44cf4a66
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44cf4a66
Branch: refs/heads/cassandra-2.0
Commit: 44cf4a66d157643297b7ab791a57f323432e28c5
Parents: 652ec6a
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Mon Feb 17 16:39:29 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Feb 17 16:39:29 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 3 ++-
.../cql3/statements/SelectStatement.java | 4 ++-
.../apache/cassandra/net/MessagingService.java | 26 +++++++++++++++++++-
3 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd3b1b7..c9fabd2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,7 +12,8 @@
* Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652)
* Make commitlog failure handling configurable (CASSANDRA-6364)
* Avoid overlaps in LCS (CASSANDRA-6688)
- * improve support for paginating over composites (4851)
+ * Improve support for paginating over composites (CASSANDRA-4851)
+ * Fix count(*) queries in a mixed cluster (CASSANDRA-6707)
Merged from 1.2:
* Fix broken streams when replacing with same IP (CASSANDRA-6622)
* Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d42fd76..52a7c70 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
@@ -165,7 +166,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
int pageSize = options.getPageSize();
// A count query will never be paged for the user, but we always page it internally to avoid OOM.
// If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default
- if (parameters.isCount && pageSize <= 0)
+ // Note that if there are some nodes in the cluster with a version less than 2.0, we can't use paging (CASSANDRA-6707).
+ if (parameters.isCount && pageSize <= 0 && MessagingService.instance().allNodesAtLeast20)
pageSize = DEFAULT_COUNT_PAGE_SIZE;
if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
http://git-wip-us.apache.org/repos/asf/cassandra/blob/44cf4a66/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 232cf6a..ad86bbd 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -73,6 +73,8 @@ public final class MessagingService implements MessagingServiceMBean
public static final int VERSION_20 = 7;
public static final int current_version = VERSION_20;
+ public boolean allNodesAtLeast20 = true;
+
/**
* we preface every message with this number so the recipient can validate the sender is sane
*/
@@ -742,14 +744,36 @@ public final class MessagingService implements MessagingServiceMBean
public int setVersion(InetAddress endpoint, int version)
{
logger.debug("Setting version {} for {}", version, endpoint);
+ if (version < VERSION_20)
+ allNodesAtLeast20 = false;
Integer v = versions.put(endpoint, version);
+
+ // if the version was increased to 2.0 or later, see if all nodes are >= 2.0 now
+ if (v != null && v < VERSION_20 && version >= VERSION_20)
+ refreshAllNodesAtLeast20();
+
return v == null ? version : v;
}
public void resetVersion(InetAddress endpoint)
{
logger.debug("Reseting version for {}", endpoint);
- versions.remove(endpoint);
+ Integer removed = versions.remove(endpoint);
+ if (removed != null && removed <= VERSION_20)
+ refreshAllNodesAtLeast20();
+ }
+
+ private void refreshAllNodesAtLeast20()
+ {
+ for (Integer version: versions.values())
+ {
+ if (version < VERSION_20)
+ {
+ allNodesAtLeast20 = false;
+ return;
+ }
+ }
+ allNodesAtLeast20 = true;
}
public int getVersion(InetAddress endpoint)