You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/02/05 17:02:07 UTC

[6/7] cassandra git commit: More fixes to the TokenAllocation

More fixes to the TokenAllocation

patch by Dikang Gu; reviewed by Branimir Lambov for CASSANSRA-12990


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b6e83fc2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b6e83fc2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b6e83fc2

Branch: refs/heads/cassandra-3.11
Commit: b6e83fc200fa9e4c0e4f26491597188305cddd21
Parents: 67cda76
Author: Dikang Gu <di...@gmail.com>
Authored: Sat Dec 3 17:59:01 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:00:23 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dht/tokenallocator/TokenAllocation.java     | 30 ++++++++++++
 .../tokenallocator/TokenAllocatorFactory.java   |  2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java | 45 ++++++++++++++++++
 .../cassandra/service/CassandraDaemon.java      | 48 +-------------------
 5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1851e62..65efebc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * nodetool stopdaemon errors out (CASSANDRA-13030)
  * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
  * Fix primary index calculation for SASI (CASSANDRA-12910)
+ * More fixes to the TokenAllocator (CASSANDRA-12990)
  * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index 36824a1..15d7868 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -35,12 +35,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class TokenAllocation
 {
@@ -51,6 +53,9 @@ public class TokenAllocation
                                                    final InetAddress endpoint,
                                                    int numTokens)
     {
+        if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+            Gossiper.waitToSettle();
+
         TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
         StrategyAdapter strategy = getStrategy(tokenMetadataCopy, rs, endpoint);
         Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
@@ -198,6 +203,31 @@ public class TokenAllocation
         final String dc = snitch.getDatacenter(endpoint);
         final int replicas = rs.getReplicationFactor(dc);
 
+        if (replicas == 0 || replicas == 1)
+        {
+            // No replication, each node is treated as separate.
+            return new StrategyAdapter()
+            {
+                @Override
+                public int replicas()
+                {
+                    return 1;
+                }
+
+                @Override
+                public Object getGroup(InetAddress unit)
+                {
+                    return unit;
+                }
+
+                @Override
+                public boolean inAllocationRing(InetAddress other)
+                {
+                    return dc.equals(snitch.getDatacenter(other));
+                }
+            };
+        }
+
         Topology topology = tokenMetadata.getTopology();
         int racks = topology.getDatacenterRacks().get(dc).asMap().size();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
index d20de8f..58acb56 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -34,7 +34,7 @@ public class TokenAllocatorFactory
                                                      ReplicationStrategy<InetAddress> strategy,
                                                      IPartitioner partitioner)
     {
-        if(strategy.replicas() == 1 || strategy.replicas() == 0)
+        if(strategy.replicas() == 1)
         {
             logger.info("Using NoReplicationTokenAllocator.");
             return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 7f0f85b..ebfd66d 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1614,4 +1614,49 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return null;
     }
 
+    public static void waitToSettle()
+    {
+        int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
+        if (forceAfter == 0)
+        {
+            return;
+        }
+        final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
+        final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
+        final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
+
+        logger.info("Waiting for gossip to settle...");
+        Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
+        int totalPolls = 0;
+        int numOkay = 0;
+        int epSize = Gossiper.instance.getEndpointStates().size();
+        while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+        {
+            Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+            int currentSize = Gossiper.instance.getEndpointStates().size();
+            totalPolls++;
+            if (currentSize == epSize)
+            {
+                logger.debug("Gossip looks settled.");
+                numOkay++;
+            }
+            else
+            {
+                logger.info("Gossip not settled after {} polls.", totalPolls);
+                numOkay = 0;
+            }
+            epSize = currentSize;
+            if (forceAfter > 0 && totalPolls > forceAfter)
+            {
+                logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
+                            totalPolls);
+                break;
+            }
+        }
+        if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+            logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
+        else
+            logger.info("No gossip backlog; proceeding");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5a97dfe..851330b 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -42,7 +42,6 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -412,7 +411,7 @@ public class CassandraDaemon
         ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
 
         if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
-            waitForGossipToSettle();
+            Gossiper.waitToSettle();
 
         // schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
         // due to scheduling errors or race conditions
@@ -680,51 +679,6 @@ public class CassandraDaemon
         }
     }
 
-    private void waitForGossipToSettle()
-    {
-        int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
-        if (forceAfter == 0)
-        {
-            return;
-        }
-        final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
-        final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
-        final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
-
-        logger.info("Waiting for gossip to settle before accepting client requests...");
-        Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
-        int totalPolls = 0;
-        int numOkay = 0;
-        int epSize = Gossiper.instance.getEndpointStates().size();
-        while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
-        {
-            Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
-            int currentSize = Gossiper.instance.getEndpointStates().size();
-            totalPolls++;
-            if (currentSize == epSize)
-            {
-                logger.debug("Gossip looks settled.");
-                numOkay++;
-            }
-            else
-            {
-                logger.info("Gossip not settled after {} polls.", totalPolls);
-                numOkay = 0;
-            }
-            epSize = currentSize;
-            if (forceAfter > 0 && totalPolls > forceAfter)
-            {
-                logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
-                            totalPolls);
-                break;
-            }
-        }
-        if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
-            logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
-        else
-            logger.info("No gossip backlog; proceeding");
-    }
-
     public static void stop(String[] args)
     {
         instance.deactivate();