You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2020/06/23 19:19:07 UTC
[cassandra] branch trunk updated: generateSplits method replaced
the generateRandomTokens for ReplicationAwareTokenAllocator.
This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b85ede6 generateSplits method replaced the generateRandomTokens for ReplicationAwareTokenAllocator.
b85ede6 is described below
commit b85ede633c493367901de2ebac85c4cd85c31567
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Fri Jun 12 15:35:04 2020 -0400
generateSplits method replaced the generateRandomTokens for ReplicationAwareTokenAllocator.
Patch by Ekaterina Dimitrova; reviewed by Berenguer Blasi, adelapena and brandonwilliams for CASSANDRA-15877
---
CHANGES.txt | 1 +
.../NoReplicationTokenAllocator.java | 10 ++++-
.../ReplicationAwareTokenAllocator.java | 48 +++++++---------------
.../tokenallocator/TokenAllocatorDiagnostics.java | 19 +++++----
...AbstractReplicationAwareTokenAllocatorTest.java | 4 +-
.../NoReplicationTokenAllocatorTest.java | 4 +-
.../RandomReplicationAwareTokenAllocatorTest.java | 7 ----
.../dht/tokenallocator/TokenAllocatorTestBase.java | 25 ++++++++---
8 files changed, 58 insertions(+), 60 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index bd99365..daae839 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * generateSplits method replaced the generateRandomTokens for ReplicationAwareTokenAllocator. (CASSANDRA-15877)
* Several mbeans are not unregistered when dropping a keyspace and table (CASSANDRA-14888)
* Update defaults for server and client TLS settings (CASSANDRA-15262)
* Differentiate follower/initator in StreamMessageHeader (CASSANDRA-15665)
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java
index 0ac8951..255a2c9 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocator.java
@@ -20,14 +20,12 @@ package org.apache.cassandra.dht.tokenallocator;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.PriorityQueue;
import java.util.Queue;
-import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -233,6 +231,14 @@ public class NoReplicationTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
return newTokens;
}
+ @Override
+ Collection<Token> generateSplits(Unit newUnit, int numTokens)
+ {
+ Collection<Token> tokens = super.generateSplits(newUnit, numTokens);
+ TokenAllocatorDiagnostics.splitsGenerated(this, numTokens, sortedUnits, sortedTokens, newUnit, tokens);
+ return tokens;
+ }
+
/**
* For testing, remove the given unit preserving correct state of the allocator.
*/
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
index eb498ce..539b467 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocator.java
@@ -60,14 +60,14 @@ class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
assert !unitToTokens.containsKey(newUnit);
if (unitCount() < replicas)
- // Allocation does not matter; everything replicates everywhere.
- //However, at this point it is
+ // Allocation does not matter for now; everything replicates everywhere. However, at this point it is
// important to start the cluster/datacenter with suitably varied token range sizes so that the algorithm
// can maintain good balance for any number of nodes.
- return generateRandomTokens(newUnit, numTokens);
+ return generateSplits(newUnit, numTokens);
if (numTokens > sortedTokens.size())
- // Some of the heuristics below can't deal with this very unlikely case. Use splits for now, later allocations can fix any problems this may cause.
- return generateRandomTokens(newUnit, numTokens);
+ // Some of the heuristics below can't deal with this very unlikely case. Use splits for now,
+ // later allocations can fix any problems this may cause.
+ return generateSplits(newUnit, numTokens);
// ============= construct our initial token ring state =============
@@ -77,10 +77,10 @@ class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
if (groups.size() < replicas)
{
// We need at least replicas groups to do allocation correctly. If there aren't enough,
- // use random allocation.
+ // use splits as above.
// This part of the code should only be reached via the RATATest. StrategyAdapter should disallow
// token allocation in this case as the algorithm is not able to cover the behavior of NetworkTopologyStrategy.
- return generateRandomTokens(newUnit, numTokens);
+ return generateSplits(newUnit, numTokens);
}
// initialise our new unit's state (with an idealised ownership)
@@ -140,27 +140,19 @@ class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
return newTokens;
}
- private Collection<Token> generateRandomTokens(Unit newUnit, int numTokens)
- {
- Set<Token> tokens = new HashSet<>(numTokens);
- while (tokens.size() < numTokens)
- {
- Token token = partitioner.getRandomToken();
- if (!sortedTokens.containsKey(token))
- {
- tokens.add(token);
- sortedTokens.put(token, newUnit);
- unitToTokens.put(newUnit, token);
- }
- }
- TokenAllocatorDiagnostics.randomTokensGenerated(this, numTokens, unitToTokens, sortedTokens, newUnit, tokens);
- return tokens;
- }
+ /**
+ * Selects tokens by repeatedly splitting the largest range in the ring at the given ratio.
+ * This is used to choose tokens for the first nodes in the ring where the algorithm cannot be applied (e.g. when
+ * number of nodes < RF). It generates a reasonably chaotic initial token split, after which the algorithm behaves
+ * well for an unbounded number of nodes.
+ */
+ @Override
Collection<Token> generateSplits(Unit newUnit, int numTokens)
{
Collection<Token> tokens = super.generateSplits(newUnit, numTokens);
unitToTokens.putAll(newUnit, tokens);
+ TokenAllocatorDiagnostics.splitsGenerated(this, numTokens, unitToTokens, sortedTokens, newUnit, tokens);
return tokens;
}
@@ -572,15 +564,5 @@ class ReplicationAwareTokenAllocator<Unit> extends TokenAllocatorBase<Unit>
return split.prev;
}
}
-
- static void dumpTokens(String lead, BaseTokenInfo<?, ?> tokens)
- {
- BaseTokenInfo<?, ?> token = tokens;
- do
- {
- System.out.format("%s%s: rs %s rt %s size %.2e%n", lead, token, token.replicationStart, token.replicationThreshold, token.replicatedOwnership);
- token = token.next;
- } while (token != null && token != tokens);
- }
}
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorDiagnostics.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorDiagnostics.java
index cc27a47..04d7455 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorDiagnostics.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorDiagnostics.java
@@ -18,11 +18,11 @@
package org.apache.cassandra.dht.tokenallocator;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
-import java.util.Set;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -153,10 +153,11 @@ final class TokenAllocatorDiagnostics
tokenInfo));
}
- static <Unit> void randomTokensGenerated(TokenAllocatorBase<Unit> allocator,
- int numTokens, Queue<Weighted<UnitInfo>> sortedUnits,
- NavigableMap<Token, Unit> sortedTokens, Unit newUnit,
- Set<Token> tokens)
+ static <Unit> void splitsGenerated(TokenAllocatorBase<Unit> allocator,
+ int numTokens, Queue<Weighted<UnitInfo>> sortedUnits,
+ NavigableMap<Token, Unit> sortedTokens,
+ Unit newUnit,
+ Collection<Token> tokens)
{
if (isEnabled(TokenAllocatorEventType.RANDOM_TOKENS_GENERATED))
service.publish(new TokenAllocatorEvent<>(TokenAllocatorEventType.RANDOM_TOKENS_GENERATED,
@@ -170,10 +171,10 @@ final class TokenAllocatorDiagnostics
null));
}
- static <Unit> void randomTokensGenerated(TokenAllocatorBase<Unit> allocator,
- int numTokens, Multimap<Unit, Token> unitToTokens,
- NavigableMap<Token, Unit> sortedTokens, Unit newUnit,
- Set<Token> tokens)
+ static <Unit> void splitsGenerated(TokenAllocatorBase<Unit> allocator,
+ int numTokens, Multimap<Unit, Token> unitToTokens,
+ NavigableMap<Token, Unit> sortedTokens, Unit newUnit,
+ Collection<Token> tokens)
{
if (isEnabled(TokenAllocatorEventType.RANDOM_TOKENS_GENERATED))
service.publish(new TokenAllocatorEvent<>(TokenAllocatorEventType.RANDOM_TOKENS_GENERATED,
diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
index eb79f12..5f9aa31 100644
--- a/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
+++ b/test/long/org/apache/cassandra/dht/tokenallocator/AbstractReplicationAwareTokenAllocatorTest.java
@@ -523,12 +523,12 @@ abstract class AbstractReplicationAwareTokenAllocatorTest extends TokenAllocator
SummaryStatistics unitStat = new SummaryStatistics();
for (Map.Entry<Unit, Double> en : ownership.entrySet())
unitStat.addValue(en.getValue() * inverseAverage / t.unitToTokens.get(en.getKey()).size());
- su.update(unitStat);
+ su.update(unitStat, t.unitCount());
SummaryStatistics tokenStat = new SummaryStatistics();
for (Token tok : t.sortedTokens.keySet())
tokenStat.addValue(replicatedTokenOwnership(tok, t.sortedTokens, t.strategy) * inverseAverage);
- st.update(tokenStat);
+ st.update(tokenStat, t.unitCount());
if (print)
{
diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
index ccad0f8..ee38a28 100644
--- a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
+++ b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
@@ -183,7 +183,7 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
{
unitStat.addValue(wu.weight * size / t.tokensInUnits.get(wu.value.unit).size());
}
- su.update(unitStat);
+ su.update(unitStat, t.sortedUnits.size());
SummaryStatistics tokenStat = new SummaryStatistics();
for (PriorityQueue<TokenAllocatorBase.Weighted<TokenAllocatorBase.TokenInfo>> tokens : t.tokensInUnits.values())
@@ -193,7 +193,7 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
tokenStat.addValue(token.weight);
}
}
- st.update(tokenStat);
+ st.update(tokenStat, t.sortedUnits.size());
if (print)
{
diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java
index bd94442..6a2d59e 100644
--- a/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java
+++ b/test/long/org/apache/cassandra/dht/tokenallocator/RandomReplicationAwareTokenAllocatorTest.java
@@ -41,13 +41,6 @@ public class RandomReplicationAwareTokenAllocatorTest extends AbstractReplicatio
@Test
public void testNewClusterr()
{
- Util.flakyTest(this::flakyTestNewCluster,
- 3,
- "It tends to fail sometimes due to the random selection of the tokens in the first few nodes.");
- }
-
- private void flakyTestNewCluster()
- {
testNewCluster(new RandomPartitioner(), MAX_VNODE_COUNT);
}
diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java
index ac8f1a4..8722426 100644
--- a/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java
+++ b/test/long/org/apache/cassandra/dht/tokenallocator/TokenAllocatorTestBase.java
@@ -130,19 +130,34 @@ abstract class TokenAllocatorTestBase
class Summary
{
double min = 1;
+ int minAt = -1;
double max = 1;
+ int maxAt = - 1;
double stddev = 0;
+ int stddevAt = -1;
- void update(SummaryStatistics stat)
+ void update(SummaryStatistics stat, int point)
{
- min = Math.min(min, stat.getMin());
- max = Math.max(max, stat.getMax());
- stddev = Math.max(stddev, stat.getStandardDeviation());
+ if (stat.getMin() <= min)
+ {
+ min = Math.min(min, stat.getMin());
+ minAt = point;
+ }
+ if (stat.getMax() >= max)
+ {
+ max = Math.max(max, stat.getMax());
+ maxAt = point;
+ }
+ if (stat.getStandardDeviation() >= stddev)
+ {
+ stddev = Math.max(stddev, stat.getStandardDeviation());
+ stddevAt = point;
+ }
}
public String toString()
{
- return String.format("max %.2f min %.2f stddev %.4f", max, min, stddev);
+ return String.format("max %.4f @%d min %.4f @%d stddev %.4f @%d", max, maxAt, min, minAt, stddev, stddevAt);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org