You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/05/26 00:14:00 UTC

[04/15] cassandra git commit: Do not consider local node a valid source during replace

Do not consider local node a valid source during replace

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11848


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6100eb2c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6100eb2c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6100eb2c

Branch: refs/heads/cassandra-3.0
Commit: 6100eb2c1c73b197ea276e8ece232962a0e7b9d2
Parents: 675591d
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon May 23 19:52:44 2016 -0300
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 25 18:16:42 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/dht/BootStrapper.java  |  1 +
 .../org/apache/cassandra/dht/RangeStreamer.java | 48 +++++++++++++++-----
 3 files changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6100eb2c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fcd7c3c..d914420 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Do not consider local node a valid source during replace (CASSANDRA-11848)
  * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)
  * Add message dropped tasks to nodetool netstats (CASSANDRA-11855)
  * Don't compute expensive MaxPurgeableTimestamp until we've verified there's an 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6100eb2c/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index cbbd100..dfefbe9 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -65,6 +65,7 @@ public class BootStrapper
 
         RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap");
         streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
+        streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
 
         for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6100eb2c/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index f8e29b6..121a351 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -105,6 +105,17 @@ public class RangeStreamer
         }
     }
 
+    /**
+     * Source filter which excludes the current node from source calculations
+     */
+    public static class ExcludeLocalNodeFilter implements ISourceFilter
+    {
+        public boolean shouldInclude(InetAddress endpoint)
+        {
+            return !FBUtilities.getBroadcastAddress().equals(endpoint);
+        }
+    }
+
     public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description)
     {
         this.metadata = metadata;
@@ -153,10 +164,12 @@ public class RangeStreamer
     private boolean useStrictSourcesForRanges(String keyspaceName)
     {
         AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy();
-        return !DatabaseDescriptor.isReplacing()
-                && useStrictConsistency
-                && tokens != null
-                && metadata.getAllEndpoints().size() != strat.getReplicationFactor();
+        return isNotReplacingAndUsesStrictConsistency() && tokens != null && metadata.getAllEndpoints().size() != strat.getReplicationFactor();
+    }
+
+    private static boolean isNotReplacingAndUsesStrictConsistency()
+    {
+        return !DatabaseDescriptor.isReplacing() && useStrictConsistency;
     }
 
     /**
@@ -265,6 +278,12 @@ public class RangeStreamer
             outer:
             for (InetAddress address : rangesWithSources.get(range))
             {
+                for (ISourceFilter filter : sourceFilters)
+                {
+                    if (!filter.shouldInclude(address))
+                        continue outer;
+                }
+
                 if (address.equals(FBUtilities.getBroadcastAddress()))
                 {
                     // If localhost is a source, we have found one, but we don't add it to the map to avoid streaming locally
@@ -272,19 +291,26 @@ public class RangeStreamer
                     continue;
                 }
 
-                for (ISourceFilter filter : sourceFilters)
-                {
-                    if (!filter.shouldInclude(address))
-                        continue outer;
-                }
-
                 rangeFetchMapMap.put(address, range);
                 foundSource = true;
                 break; // ensure we only stream from one other node for each range
             }
 
             if (!foundSource)
-                throw new IllegalStateException("unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace);
+            {
+                AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy();
+                if (strat != null && strat.getReplicationFactor() == 1)
+                {
+                    if (isNotReplacingAndUsesStrictConsistency())
+                        throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace + " with RF=1." +
+                                                        "If you want to ignore this, consider using system property -Dcassandra.consistent.rangemovement=false.");
+                    else
+                        logger.warn("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace + " with RF=1. " +
+                                    "Keyspace might be missing data.");
+                }
+                else
+                    throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace);
+            }
         }
 
         return rangeFetchMapMap;