You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/13 17:27:32 UTC
git commit: Use real node messaging versions for schema exchange
decisions
Updated Branches:
refs/heads/cassandra-1.2 b2dfaed31 -> de72e7fc0
Use real node messaging versions for schema exchange decisions
patch by Aleksey Yeschenko; reviewed by Piotr Kołaczkowski for
CASSANDRA-6700
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de72e7fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de72e7fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de72e7fc
Branch: refs/heads/cassandra-1.2
Commit: de72e7fc0a750fdb2fcd752092e5a07a7f47046e
Parents: b2dfaed
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Feb 13 19:26:48 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Feb 13 19:26:48 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/net/IncomingTcpConnection.java | 4 +--
.../apache/cassandra/net/MessagingService.java | 28 +++++++++++++-------
.../cassandra/service/MigrationManager.java | 4 +--
4 files changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index de7c307..872934a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@
* Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
* Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649)
* Don't exchange schema between nodes with different versions (CASSANDRA-6695)
+ * Use real node messaging versions for schema exchange decisions (CASSANDRA-6700)
1.2.15
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index 3b24a7f..d0126c7 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -130,8 +130,8 @@ public class IncomingTcpConnection extends Thread
logger.info("Received messages from newer protocol version {}. Ignoring", version);
return;
}
- MessagingService.instance().setVersion(from, Math.min(MessagingService.current_version, maxVersion));
- logger.debug("set version for {} to {}", from, Math.min(MessagingService.current_version, maxVersion));
+ MessagingService.instance().setVersion(from, maxVersion);
+ logger.debug("Set version for {} to {} (will use {})", from, maxVersion, Math.min(MessagingService.current_version, maxVersion));
// outbound side will reconnect if necessary to upgrade version
while (true)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index bfc3957..09fa272 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -800,10 +800,10 @@ public final class MessagingService implements MessagingServiceMBean
/**
* @return the last version associated with address, or @param version if this is the first such version
*/
- public int setVersion(InetAddress address, int version)
+ public int setVersion(InetAddress endpoint, int version)
{
- logger.debug("Setting version {} for {}", version, address);
- Integer v = versions.put(address, version);
+ logger.debug("Setting version {} for {}", version, endpoint);
+ Integer v = versions.put(endpoint, version);
return v == null ? version : v;
}
@@ -813,27 +813,35 @@ public final class MessagingService implements MessagingServiceMBean
versions.remove(endpoint);
}
- public Integer getVersion(InetAddress address)
+ public int getVersion(InetAddress endpoint)
{
- Integer v = versions.get(address);
+ Integer v = versions.get(endpoint);
if (v == null)
{
// we don't know the version. assume current. we'll know soon enough if that was incorrect.
- logger.trace("Assuming current protocol version for {}", address);
+ logger.trace("Assuming current protocol version for {}", endpoint);
return MessagingService.current_version;
}
else
- return v;
+ return Math.min(v, MessagingService.current_version);
}
- public int getVersion(String address) throws UnknownHostException
+ public int getVersion(String endpoint) throws UnknownHostException
{
- return getVersion(InetAddress.getByName(address));
+ return getVersion(InetAddress.getByName(endpoint));
+ }
+
+ public int getRawVersion(InetAddress endpoint)
+ {
+ Integer v = versions.get(endpoint);
+ if (v == null)
+ throw new IllegalStateException("getRawVersion() was called without checking knowsVersion() result first");
+ return v;
}
public boolean knowsVersion(InetAddress endpoint)
{
- return versions.get(endpoint) != null;
+ return versions.containsKey(endpoint);
}
public void incrementDroppedMessages(Verb verb)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/de72e7fc/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 68d0bad..584415d 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -138,7 +138,7 @@ public class MigrationManager
* Don't request schema from fat clients
*/
return MessagingService.instance().knowsVersion(endpoint)
- && MessagingService.instance().getVersion(endpoint) == MessagingService.current_version
+ && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version
&& !Gossiper.instance.isFatClient(endpoint);
}
@@ -292,7 +292,7 @@ public class MigrationManager
// only push schema to nodes with known and equal versions
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
MessagingService.instance().knowsVersion(endpoint) &&
- MessagingService.instance().getVersion(endpoint) == MessagingService.current_version)
+ MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version)
pushSchemaMutation(endpoint, schema);
}