You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2019/09/08 18:31:07 UTC

[cassandra] branch trunk updated: Add `allocate_tokens_for_local_rf` yaml option for token allocation that doesn't require keyspace knowledge/existence

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 068d2d3  Add `allocate_tokens_for_local_rf` yaml option for token allocation that doesn't require keyspace knowledge/existence
068d2d3 is described below

commit 068d2d37c6fbdb60546821c4d408a84161fd1cb6
Author: Mick Semb Wever <mc...@apache.org>
AuthorDate: Mon Aug 5 00:06:30 2019 +0200

    Add `allocate_tokens_for_local_rf` yaml option for token allocation that doesn't require keyspace knowledge/existence
    
     patch by Mick Semb Wever; reviewed by Branimir Lambov for CASSANDRA-15260
---
 CHANGES.txt                                        |  3 +++
 conf/cassandra.yaml                                | 10 +++++++--
 src/java/org/apache/cassandra/config/Config.java   |  2 ++
 .../cassandra/config/DatabaseDescriptor.java       |  5 +++++
 .../org/apache/cassandra/dht/BootStrapper.java     | 22 +++++++++++++++++++
 .../apache/cassandra/dht/BootstrapDiagnostics.java | 16 ++++++++++++++
 .../org/apache/cassandra/dht/BootstrapEvent.java   |  6 +++++-
 .../dht/tokenallocator/TokenAllocation.java        | 25 ++++++++++++++++++++++
 .../org/apache/cassandra/dht/BootStrapperTest.java | 19 ++++++++++++++++
 9 files changed, 105 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6425b1f..709e436 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+4.0-alpha2
+ * Add `allocate_tokens_for_local_replication_factor` option for token allocation (CASSANDRA-15260)
+
 4.0-alpha1
  * Inaccurate exception message with nodetool snapshot (CASSANDRA-15287)
  * Fix InternodeOutboundMetrics overloaded bytes/count mixup (CASSANDRA-15186)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e776a5a..f3e5c75 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -26,15 +26,21 @@ num_tokens: 256
 
 # Triggers automatic allocation of num_tokens tokens for this node. The allocation
 # algorithm attempts to choose tokens in a way that optimizes replicated load over
-# the nodes in the datacenter for the replication strategy used by the specified
-# keyspace.
+# the nodes in the datacenter for the replica factor.
 #
 # The load assigned to each node will be close to proportional to its number of
 # vnodes.
 #
 # Only supported with the Murmur3Partitioner.
+
+# Replica factor is determined via the replication strategy used by the specified
+# keyspace.
 # allocate_tokens_for_keyspace: KEYSPACE
 
+# Replica factor is explicitly set, regardless of keyspace or datacenter.
+# This is the replica factor within the datacenter, like NTS.
+# allocate_tokens_for_local_replication_factor: 3
+
 # initial_token allows you to specify tokens manually.  While you can use it with
 # vnodes (num_tokens > 1, above) -- in which case you should provide a 
 # comma-separated list -- it's primarily used when adding nodes to legacy clusters 
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a1fdfdc..b86b7c5 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -86,6 +86,8 @@ public class Config
     public int num_tokens = 1;
     /** Triggers automatic allocation of tokens if set, using the replication strategy of the referenced keyspace */
     public String allocate_tokens_for_keyspace = null;
+    /** Triggers automatic allocation of tokens if set, based on the provided replica count for a datacenter */
+    public Integer allocate_tokens_for_local_replication_factor = null;
 
     public long native_transport_idle_timeout_in_ms = 0L;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 43203e5..e4ea611 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1422,6 +1422,11 @@ public class DatabaseDescriptor
         return System.getProperty(Config.PROPERTY_PREFIX + "allocate_tokens_for_keyspace", conf.allocate_tokens_for_keyspace);
     }
 
+    public static Integer getAllocateTokensForLocalRf()
+    {
+        return conf.allocate_tokens_for_local_replication_factor;
+    }
+
     public static Collection<String> tokensFromString(String tokenString)
     {
         List<String> tokens = new ArrayList<String>();
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index f5c455c..94bf283 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.Keyspace;
@@ -32,6 +33,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.*;
@@ -153,6 +155,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
     public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, InetAddressAndPort address, int schemaWaitDelay) throws ConfigurationException
     {
         String allocationKeyspace = DatabaseDescriptor.getAllocateTokensForKeyspace();
+        Integer allocationLocalRf = DatabaseDescriptor.getAllocateTokensForLocalRf();
         Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
         if (initialTokens.size() > 0 && allocationKeyspace != null)
             logger.warn("manually specified tokens override automatic allocation");
@@ -172,6 +175,9 @@ public class BootStrapper extends ProgressEventNotifierSupport
         if (allocationKeyspace != null)
             return allocateTokens(metadata, address, allocationKeyspace, numTokens, schemaWaitDelay);
 
+        if (allocationLocalRf != null)
+            return allocateTokens(metadata, address, allocationLocalRf, numTokens, schemaWaitDelay);
+
         if (numTokens == 1)
             logger.warn("Picking random token for a single vnode.  You should probably add more vnodes and/or use the automatic token allocation mechanism.");
 
@@ -215,6 +221,22 @@ public class BootStrapper extends ProgressEventNotifierSupport
         return tokens;
     }
 
+
+    static Collection<Token> allocateTokens(final TokenMetadata metadata,
+                                            InetAddressAndPort address,
+                                            int rf,
+                                            int numTokens,
+                                            int schemaWaitDelay)
+    {
+        StorageService.instance.waitForSchema(schemaWaitDelay);
+        if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
+            Gossiper.waitToSettle();
+
+        Collection<Token> tokens = TokenAllocation.allocateTokens(metadata, rf, address, numTokens);
+        BootstrapDiagnostics.tokensAllocated(address, metadata, rf, numTokens, tokens);
+        return tokens;
+    }
+
     public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
     {
         Set<Token> tokens = new HashSet<>(numTokens);
diff --git a/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java b/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java
index 5695532..5c2b46a 100644
--- a/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java
+++ b/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java
@@ -45,6 +45,7 @@ final class BootstrapDiagnostics
                                                address,
                                                null,
                                                allocationKeyspace,
+                                               null,
                                                numTokens,
                                                ImmutableList.copyOf(initialTokens)));
     }
@@ -56,6 +57,7 @@ final class BootstrapDiagnostics
                                                address,
                                                metadata.cloneOnlyTokenMap(),
                                                null,
+                                               null,
                                                numTokens,
                                                ImmutableList.copyOf(tokens)));
     }
@@ -68,6 +70,20 @@ final class BootstrapDiagnostics
                                                address,
                                                metadata.cloneOnlyTokenMap(),
                                                allocationKeyspace,
+                                               null,
+                                               numTokens,
+                                               ImmutableList.copyOf(tokens)));
+    }
+
+    static void tokensAllocated(InetAddressAndPort address, TokenMetadata metadata,
+                                int rf, int numTokens, Collection<Token> tokens)
+    {
+        if (isEnabled(BootstrapEventType.TOKENS_ALLOCATED))
+            service.publish(new BootstrapEvent(BootstrapEventType.TOKENS_ALLOCATED,
+                                               address,
+                                               metadata.cloneOnlyTokenMap(),
+                                               null,
+                                               rf,
                                                numTokens,
                                                ImmutableList.copyOf(tokens)));
     }
diff --git a/src/java/org/apache/cassandra/dht/BootstrapEvent.java b/src/java/org/apache/cassandra/dht/BootstrapEvent.java
index 5bad09a..4936c29 100644
--- a/src/java/org/apache/cassandra/dht/BootstrapEvent.java
+++ b/src/java/org/apache/cassandra/dht/BootstrapEvent.java
@@ -42,16 +42,19 @@ final class BootstrapEvent extends DiagnosticEvent
     private final InetAddressAndPort address;
     @Nullable
     private final String allocationKeyspace;
+    @Nullable
+    private final Integer rf;
     private final Integer numTokens;
     private final Collection<Token> tokens;
 
     BootstrapEvent(BootstrapEventType type, InetAddressAndPort address, @Nullable TokenMetadata tokenMetadata,
-                   @Nullable String allocationKeyspace, int numTokens, ImmutableCollection<Token> tokens)
+                   @Nullable String allocationKeyspace, @Nullable Integer rf, int numTokens, ImmutableCollection<Token> tokens)
     {
         this.type = type;
         this.address = address;
         this.tokenMetadata = tokenMetadata;
         this.allocationKeyspace = allocationKeyspace;
+        this.rf = rf;
         this.numTokens = numTokens;
         this.tokens = tokens;
     }
@@ -75,6 +78,7 @@ final class BootstrapEvent extends DiagnosticEvent
         HashMap<String, Serializable> ret = new HashMap<>();
         ret.put("tokenMetadata", String.valueOf(tokenMetadata));
         ret.put("allocationKeyspace", allocationKeyspace);
+        ret.put("rf", rf);
         ret.put("numTokens", numTokens);
         ret.put("tokens", String.valueOf(tokens));
         return ret;
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index ba93eb8..bfa281e 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -32,6 +32,7 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -72,6 +73,20 @@ public class TokenAllocation
         return tokens;
     }
 
+    public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
+                                                   final int replicas,
+                                                   final InetAddressAndPort endpoint,
+                                                   int numTokens)
+    {
+        TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
+        StrategyAdapter strategy = getStrategy(tokenMetadataCopy, replicas, endpoint);
+        Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
+        tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens);
+        logger.warn("Selected tokens {}", tokens);
+        // SummaryStatistics is not implemented for `allocate_tokens_for_local_replication_factor`
+        return tokens;
+    }
+
     private static Collection<Token> adjustForCrossDatacenterClashes(final TokenMetadata tokenMetadata,
                                                                      StrategyAdapter strategy, Collection<Token> tokens)
     {
@@ -197,7 +212,17 @@ public class TokenAllocation
     {
         final String dc = snitch.getDatacenter(endpoint);
         final int replicas = rs.getReplicationFactor(dc).allReplicas;
+        return getStrategy(tokenMetadata, replicas, snitch, endpoint);
+    }
+
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final int replicas, final InetAddressAndPort endpoint)
+    {
+        return getStrategy(tokenMetadata, replicas, DatabaseDescriptor.getEndpointSnitch(), endpoint);
+    }
 
+    static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final int replicas, final IEndpointSnitch snitch, final InetAddressAndPort endpoint)
+    {
+        final String dc = snitch.getDatacenter(endpoint);
         if (replicas == 0 || replicas == 1)
         {
             // No replication, each node is treated as separate.
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 2f412ad..c0b6d5c 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -174,6 +174,17 @@ public class BootStrapperTest
         allocateTokensForNode(vn, ks, tm, addr);
     }
 
+    @Test
+    public void testAllocateTokensLocalRf() throws UnknownHostException
+    {
+        int vn = 16;
+        int allocateTokensForLocalRf = 3;
+        TokenMetadata tm = new TokenMetadata();
+        generateFakeEndpoints(tm, 10, vn);
+        InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+        allocateTokensForNode(vn, allocateTokensForLocalRf, tm, addr);
+    }
+
     public void testAllocateTokensNetworkStrategy(int rackCount, int replicas) throws UnknownHostException
     {
         IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
@@ -243,6 +254,14 @@ public class BootStrapperTest
         verifyImprovement(os, ns);
     }
 
+    private void allocateTokensForNode(int vn, int rf, TokenMetadata tm, InetAddressAndPort addr)
+    {
+        Collection<Token> tokens = BootStrapper.allocateTokens(tm, addr, rf, vn, 0);
+        assertEquals(vn, tokens.size());
+        tm.updateNormalTokens(tokens, addr);
+        // SummaryStatistics is not implemented for `allocate_tokens_for_local_replication_factor` so can't be verified
+    }
+
     private void verifyImprovement(SummaryStatistics os, SummaryStatistics ns)
     {
         if (ns.getStandardDeviation() > os.getStandardDeviation())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org