You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/02/05 17:02:07 UTC
[6/7] cassandra git commit: More fixes to the TokenAllocation
More fixes to the TokenAllocation
patch by Dikang Gu; reviewed by Branimir Lambov for CASSANSRA-12990
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b6e83fc2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b6e83fc2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b6e83fc2
Branch: refs/heads/cassandra-3.11
Commit: b6e83fc200fa9e4c0e4f26491597188305cddd21
Parents: 67cda76
Author: Dikang Gu <di...@gmail.com>
Authored: Sat Dec 3 17:59:01 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:00:23 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dht/tokenallocator/TokenAllocation.java | 30 ++++++++++++
.../tokenallocator/TokenAllocatorFactory.java | 2 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 45 ++++++++++++++++++
.../cassandra/service/CassandraDaemon.java | 48 +-------------------
5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1851e62..65efebc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* nodetool stopdaemon errors out (CASSANDRA-13030)
* Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
* Fix primary index calculation for SASI (CASSANDRA-12910)
+ * More fixes to the TokenAllocator (CASSANDRA-12990)
* NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index 36824a1..15d7868 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -35,12 +35,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.utils.FBUtilities;
public class TokenAllocation
{
@@ -51,6 +53,9 @@ public class TokenAllocation
final InetAddress endpoint,
int numTokens)
{
+ if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+ Gossiper.waitToSettle();
+
TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
StrategyAdapter strategy = getStrategy(tokenMetadataCopy, rs, endpoint);
Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
@@ -198,6 +203,31 @@ public class TokenAllocation
final String dc = snitch.getDatacenter(endpoint);
final int replicas = rs.getReplicationFactor(dc);
+ if (replicas == 0 || replicas == 1)
+ {
+ // No replication, each node is treated as separate.
+ return new StrategyAdapter()
+ {
+ @Override
+ public int replicas()
+ {
+ return 1;
+ }
+
+ @Override
+ public Object getGroup(InetAddress unit)
+ {
+ return unit;
+ }
+
+ @Override
+ public boolean inAllocationRing(InetAddress other)
+ {
+ return dc.equals(snitch.getDatacenter(other));
+ }
+ };
+ }
+
Topology topology = tokenMetadata.getTopology();
int racks = topology.getDatacenterRacks().get(dc).asMap().size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
index d20de8f..58acb56 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -34,7 +34,7 @@ public class TokenAllocatorFactory
ReplicationStrategy<InetAddress> strategy,
IPartitioner partitioner)
{
- if(strategy.replicas() == 1 || strategy.replicas() == 0)
+ if(strategy.replicas() == 1)
{
logger.info("Using NoReplicationTokenAllocator.");
return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 7f0f85b..ebfd66d 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1614,4 +1614,49 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return null;
}
+ public static void waitToSettle()
+ {
+ int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
+ if (forceAfter == 0)
+ {
+ return;
+ }
+ final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
+ final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
+ final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
+
+ logger.info("Waiting for gossip to settle...");
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
+ int totalPolls = 0;
+ int numOkay = 0;
+ int epSize = Gossiper.instance.getEndpointStates().size();
+ while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+ {
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ int currentSize = Gossiper.instance.getEndpointStates().size();
+ totalPolls++;
+ if (currentSize == epSize)
+ {
+ logger.debug("Gossip looks settled.");
+ numOkay++;
+ }
+ else
+ {
+ logger.info("Gossip not settled after {} polls.", totalPolls);
+ numOkay = 0;
+ }
+ epSize = currentSize;
+ if (forceAfter > 0 && totalPolls > forceAfter)
+ {
+ logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
+ totalPolls);
+ break;
+ }
+ }
+ if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+ logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
+ else
+ logger.info("No gossip backlog; proceeding");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5a97dfe..851330b 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -42,7 +42,6 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -412,7 +411,7 @@ public class CassandraDaemon
ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
- waitForGossipToSettle();
+ Gossiper.waitToSettle();
// schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
// due to scheduling errors or race conditions
@@ -680,51 +679,6 @@ public class CassandraDaemon
}
}
- private void waitForGossipToSettle()
- {
- int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
- if (forceAfter == 0)
- {
- return;
- }
- final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
- final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
- final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
-
- logger.info("Waiting for gossip to settle before accepting client requests...");
- Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
- int totalPolls = 0;
- int numOkay = 0;
- int epSize = Gossiper.instance.getEndpointStates().size();
- while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
- {
- Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
- int currentSize = Gossiper.instance.getEndpointStates().size();
- totalPolls++;
- if (currentSize == epSize)
- {
- logger.debug("Gossip looks settled.");
- numOkay++;
- }
- else
- {
- logger.info("Gossip not settled after {} polls.", totalPolls);
- numOkay = 0;
- }
- epSize = currentSize;
- if (forceAfter > 0 && totalPolls > forceAfter)
- {
- logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
- totalPolls);
- break;
- }
- }
- if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
- logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
- else
- logger.info("No gossip backlog; proceeding");
- }
-
public static void stop(String[] args)
{
instance.deactivate();