You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2019/09/08 18:31:07 UTC
[cassandra] branch trunk updated: Add
`allocate_tokens_for_local_rf` yaml option for token allocation that
doesn't require keyspace knowledge/existence
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 068d2d3 Add `allocate_tokens_for_local_rf` yaml option for token allocation that doesn't require keyspace knowledge/existence
068d2d3 is described below
commit 068d2d37c6fbdb60546821c4d408a84161fd1cb6
Author: Mick Semb Wever <mc...@apache.org>
AuthorDate: Mon Aug 5 00:06:30 2019 +0200
Add `allocate_tokens_for_local_rf` yaml option for token allocation that doesn't require keyspace knowledge/existence
patch by Mick Semb Wever; reviewed by Branimir Lambov for CASSANDRA-15260
---
CHANGES.txt | 3 +++
conf/cassandra.yaml | 10 +++++++--
src/java/org/apache/cassandra/config/Config.java | 2 ++
.../cassandra/config/DatabaseDescriptor.java | 5 +++++
.../org/apache/cassandra/dht/BootStrapper.java | 22 +++++++++++++++++++
.../apache/cassandra/dht/BootstrapDiagnostics.java | 16 ++++++++++++++
.../org/apache/cassandra/dht/BootstrapEvent.java | 6 +++++-
.../dht/tokenallocator/TokenAllocation.java | 25 ++++++++++++++++++++++
.../org/apache/cassandra/dht/BootStrapperTest.java | 19 ++++++++++++++++
9 files changed, 105 insertions(+), 3 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6425b1f..709e436 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+4.0-alpha2
+ * Add `allocate_tokens_for_local_replication_factor` option for token allocation (CASSANDRA-15260)
+
4.0-alpha1
* Inaccurate exception message with nodetool snapshot (CASSANDRA-15287)
* Fix InternodeOutboundMetrics overloaded bytes/count mixup (CASSANDRA-15186)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e776a5a..f3e5c75 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -26,15 +26,21 @@ num_tokens: 256
# Triggers automatic allocation of num_tokens tokens for this node. The allocation
# algorithm attempts to choose tokens in a way that optimizes replicated load over
-# the nodes in the datacenter for the replication strategy used by the specified
-# keyspace.
+# the nodes in the datacenter for the replica factor.
#
# The load assigned to each node will be close to proportional to its number of
# vnodes.
#
# Only supported with the Murmur3Partitioner.
+
+# Replica factor is determined via the replication strategy used by the specified
+# keyspace.
# allocate_tokens_for_keyspace: KEYSPACE
+# Replica factor is explicitly set, regardless of keyspace or datacenter.
+# This is the replica factor within the datacenter, like NTS.
+# allocate_tokens_for_local_replication_factor: 3
+
# initial_token allows you to specify tokens manually. While you can use it with
# vnodes (num_tokens > 1, above) -- in which case you should provide a
# comma-separated list -- it's primarily used when adding nodes to legacy clusters
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a1fdfdc..b86b7c5 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -86,6 +86,8 @@ public class Config
public int num_tokens = 1;
/** Triggers automatic allocation of tokens if set, using the replication strategy of the referenced keyspace */
public String allocate_tokens_for_keyspace = null;
+ /** Triggers automatic allocation of tokens if set, based on the provided replica count for a datacenter */
+ public Integer allocate_tokens_for_local_replication_factor = null;
public long native_transport_idle_timeout_in_ms = 0L;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 43203e5..e4ea611 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1422,6 +1422,11 @@ public class DatabaseDescriptor
return System.getProperty(Config.PROPERTY_PREFIX + "allocate_tokens_for_keyspace", conf.allocate_tokens_for_keyspace);
}
+ public static Integer getAllocateTokensForLocalRf()
+ {
+ return conf.allocate_tokens_for_local_replication_factor;
+ }
+
public static Collection<String> tokensFromString(String tokenString)
{
List<String> tokens = new ArrayList<String>();
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index f5c455c..94bf283 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.db.Keyspace;
@@ -32,6 +33,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.*;
@@ -153,6 +155,7 @@ public class BootStrapper extends ProgressEventNotifierSupport
public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, InetAddressAndPort address, int schemaWaitDelay) throws ConfigurationException
{
String allocationKeyspace = DatabaseDescriptor.getAllocateTokensForKeyspace();
+ Integer allocationLocalRf = DatabaseDescriptor.getAllocateTokensForLocalRf();
Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
if (initialTokens.size() > 0 && allocationKeyspace != null)
logger.warn("manually specified tokens override automatic allocation");
@@ -172,6 +175,9 @@ public class BootStrapper extends ProgressEventNotifierSupport
if (allocationKeyspace != null)
return allocateTokens(metadata, address, allocationKeyspace, numTokens, schemaWaitDelay);
+ if (allocationLocalRf != null)
+ return allocateTokens(metadata, address, allocationLocalRf, numTokens, schemaWaitDelay);
+
if (numTokens == 1)
logger.warn("Picking random token for a single vnode. You should probably add more vnodes and/or use the automatic token allocation mechanism.");
@@ -215,6 +221,22 @@ public class BootStrapper extends ProgressEventNotifierSupport
return tokens;
}
+
+ static Collection<Token> allocateTokens(final TokenMetadata metadata,
+ InetAddressAndPort address,
+ int rf,
+ int numTokens,
+ int schemaWaitDelay)
+ {
+ StorageService.instance.waitForSchema(schemaWaitDelay);
+ if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
+ Gossiper.waitToSettle();
+
+ Collection<Token> tokens = TokenAllocation.allocateTokens(metadata, rf, address, numTokens);
+ BootstrapDiagnostics.tokensAllocated(address, metadata, rf, numTokens, tokens);
+ return tokens;
+ }
+
public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
{
Set<Token> tokens = new HashSet<>(numTokens);
diff --git a/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java b/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java
index 5695532..5c2b46a 100644
--- a/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java
+++ b/src/java/org/apache/cassandra/dht/BootstrapDiagnostics.java
@@ -45,6 +45,7 @@ final class BootstrapDiagnostics
address,
null,
allocationKeyspace,
+ null,
numTokens,
ImmutableList.copyOf(initialTokens)));
}
@@ -56,6 +57,7 @@ final class BootstrapDiagnostics
address,
metadata.cloneOnlyTokenMap(),
null,
+ null,
numTokens,
ImmutableList.copyOf(tokens)));
}
@@ -68,6 +70,20 @@ final class BootstrapDiagnostics
address,
metadata.cloneOnlyTokenMap(),
allocationKeyspace,
+ null,
+ numTokens,
+ ImmutableList.copyOf(tokens)));
+ }
+
+ static void tokensAllocated(InetAddressAndPort address, TokenMetadata metadata,
+ int rf, int numTokens, Collection<Token> tokens)
+ {
+ if (isEnabled(BootstrapEventType.TOKENS_ALLOCATED))
+ service.publish(new BootstrapEvent(BootstrapEventType.TOKENS_ALLOCATED,
+ address,
+ metadata.cloneOnlyTokenMap(),
+ null,
+ rf,
numTokens,
ImmutableList.copyOf(tokens)));
}
diff --git a/src/java/org/apache/cassandra/dht/BootstrapEvent.java b/src/java/org/apache/cassandra/dht/BootstrapEvent.java
index 5bad09a..4936c29 100644
--- a/src/java/org/apache/cassandra/dht/BootstrapEvent.java
+++ b/src/java/org/apache/cassandra/dht/BootstrapEvent.java
@@ -42,16 +42,19 @@ final class BootstrapEvent extends DiagnosticEvent
private final InetAddressAndPort address;
@Nullable
private final String allocationKeyspace;
+ @Nullable
+ private final Integer rf;
private final Integer numTokens;
private final Collection<Token> tokens;
BootstrapEvent(BootstrapEventType type, InetAddressAndPort address, @Nullable TokenMetadata tokenMetadata,
- @Nullable String allocationKeyspace, int numTokens, ImmutableCollection<Token> tokens)
+ @Nullable String allocationKeyspace, @Nullable Integer rf, int numTokens, ImmutableCollection<Token> tokens)
{
this.type = type;
this.address = address;
this.tokenMetadata = tokenMetadata;
this.allocationKeyspace = allocationKeyspace;
+ this.rf = rf;
this.numTokens = numTokens;
this.tokens = tokens;
}
@@ -75,6 +78,7 @@ final class BootstrapEvent extends DiagnosticEvent
HashMap<String, Serializable> ret = new HashMap<>();
ret.put("tokenMetadata", String.valueOf(tokenMetadata));
ret.put("allocationKeyspace", allocationKeyspace);
+ ret.put("rf", rf);
ret.put("numTokens", numTokens);
ret.put("tokens", String.valueOf(tokens));
return ret;
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index ba93eb8..bfa281e 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -32,6 +32,7 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -72,6 +73,20 @@ public class TokenAllocation
return tokens;
}
+ public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata,
+ final int replicas,
+ final InetAddressAndPort endpoint,
+ int numTokens)
+ {
+ TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
+ StrategyAdapter strategy = getStrategy(tokenMetadataCopy, replicas, endpoint);
+ Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
+ tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens);
+ logger.warn("Selected tokens {}", tokens);
+ // SummaryStatistics is not implemented for `allocate_tokens_for_local_replication_factor`
+ return tokens;
+ }
+
private static Collection<Token> adjustForCrossDatacenterClashes(final TokenMetadata tokenMetadata,
StrategyAdapter strategy, Collection<Token> tokens)
{
@@ -197,7 +212,17 @@ public class TokenAllocation
{
final String dc = snitch.getDatacenter(endpoint);
final int replicas = rs.getReplicationFactor(dc).allReplicas;
+ return getStrategy(tokenMetadata, replicas, snitch, endpoint);
+ }
+
+ static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final int replicas, final InetAddressAndPort endpoint)
+ {
+ return getStrategy(tokenMetadata, replicas, DatabaseDescriptor.getEndpointSnitch(), endpoint);
+ }
+ static StrategyAdapter getStrategy(final TokenMetadata tokenMetadata, final int replicas, final IEndpointSnitch snitch, final InetAddressAndPort endpoint)
+ {
+ final String dc = snitch.getDatacenter(endpoint);
if (replicas == 0 || replicas == 1)
{
// No replication, each node is treated as separate.
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 2f412ad..c0b6d5c 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -174,6 +174,17 @@ public class BootStrapperTest
allocateTokensForNode(vn, ks, tm, addr);
}
+ @Test
+ public void testAllocateTokensLocalRf() throws UnknownHostException
+ {
+ int vn = 16;
+ int allocateTokensForLocalRf = 3;
+ TokenMetadata tm = new TokenMetadata();
+ generateFakeEndpoints(tm, 10, vn);
+ InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
+ allocateTokensForNode(vn, allocateTokensForLocalRf, tm, addr);
+ }
+
public void testAllocateTokensNetworkStrategy(int rackCount, int replicas) throws UnknownHostException
{
IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
@@ -243,6 +254,14 @@ public class BootStrapperTest
verifyImprovement(os, ns);
}
+ private void allocateTokensForNode(int vn, int rf, TokenMetadata tm, InetAddressAndPort addr)
+ {
+ Collection<Token> tokens = BootStrapper.allocateTokens(tm, addr, rf, vn, 0);
+ assertEquals(vn, tokens.size());
+ tm.updateNormalTokens(tokens, addr);
+ // SummaryStatistics is not implemented for `allocate_tokens_for_local_replication_factor` so can't be verified
+ }
+
private void verifyImprovement(SummaryStatistics os, SummaryStatistics ns)
{
if (ns.getStandardDeviation() > os.getStandardDeviation())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org