You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/02/18 11:43:34 UTC
[cassandra] branch trunk updated: Fix node unable to join when RF >
N in multi-DC with added warning
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87e4f31 Fix node unable to join when RF > N in multi-DC with added warning
87e4f31 is described below
commit 87e4f31e30f6a89f1c17b5a9eb6406208e384d51
Author: Bereng <be...@gmail.com>
AuthorDate: Thu Feb 18 11:42:24 2021 +0000
Fix node unable to join when RF > N in multi-DC with added warning
patch by Berenguer Blasi; reviewed by Andrés de la Peña, Ekaterina Dimitrova and Tomasz Lasica for CASSANDRA-16296
---
CHANGES.txt | 1 +
.../statements/schema/AlterKeyspaceStatement.java | 25 +++++---
.../statements/schema/CreateKeyspaceStatement.java | 24 ++++++--
.../org/apache/cassandra/dht/BootStrapper.java | 1 -
.../org/apache/cassandra/dht/RangeStreamer.java | 61 +++++++++++++------
.../locator/AbstractReplicationStrategy.java | 16 ++---
.../apache/cassandra/locator/LocalStrategy.java | 5 ++
.../cassandra/locator/NetworkTopologyStrategy.java | 42 ++++++++++++-
.../apache/cassandra/locator/SimpleStrategy.java | 34 +++++++++--
.../apache/cassandra/locator/TokenMetadata.java | 8 +++
.../org/apache/cassandra/service/ClientWarn.java | 5 ++
.../cql3/validation/operations/AlterTest.java | 69 ++++++++++++++++++++--
.../cql3/validation/operations/CreateTest.java | 19 +++---
.../schema/CreateTableValidationTest.java | 24 ++++----
14 files changed, 269 insertions(+), 65 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 1f5cb1b..6d311eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Fix node unable to join when RF > N in multi-DC with added warning (CASSANDRA-16296)
* Add an option to nodetool tablestats to check sstable location correctness (CASSANDRA-16344)
* Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in gossip (CASSANDRA-16422)
* Metrics backward compatibility restored after CASSANDRA-15066 (CASSANDRA-16083)
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
index 2ef890e..c1b9bc2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
@@ -17,13 +17,12 @@
*/
package org.apache.cassandra.cql3.statements.schema;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import com.google.common.collect.ImmutableSet;
-
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
import org.apache.cassandra.auth.Permission;
@@ -43,6 +42,7 @@ import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.Event.SchemaChange.Change;
import org.apache.cassandra.utils.FBUtilities;
@@ -51,6 +51,7 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
{
private static final boolean allow_alter_rf_during_range_movement = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_alter_rf_during_range_movement");
private static final boolean allow_unsafe_transient_changes = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_unsafe_transient_changes");
+ private final HashSet<String> clientWarnings = new HashSet<>();
private final KeyspaceAttributes attrs;
@@ -62,6 +63,9 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
public Keyspaces apply(Keyspaces schema)
{
+ if (ClientWarn.instance.get() == null)
+ ClientWarn.instance.captureWarnings();
+ int previousNumWarnings = ClientWarn.instance.numWarnings();
attrs.validate();
KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
@@ -78,7 +82,13 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
validateNoRangeMovements();
validateTransientReplication(keyspace.createReplicationStrategy(), newKeyspace.createReplicationStrategy());
- return schema.withAddedOrUpdated(newKeyspace);
+ Keyspaces res = schema.withAddedOrUpdated(newKeyspace);
+
+ int newNumWarnings = ClientWarn.instance.numWarnings();
+ if (newNumWarnings > previousNumWarnings)
+ clientWarnings.addAll(ClientWarn.instance.getWarnings().subList(previousNumWarnings, newNumWarnings));
+
+ return res;
}
SchemaChange schemaChangeEvent(KeyspacesDiff diff)
@@ -95,16 +105,17 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
Set<String> clientWarnings(KeyspacesDiff diff)
{
if (diff.isEmpty())
- return ImmutableSet.of();
+ return clientWarnings;
KeyspaceDiff keyspaceDiff = diff.altered.get(0);
AbstractReplicationStrategy before = keyspaceDiff.before.createReplicationStrategy();
AbstractReplicationStrategy after = keyspaceDiff.after.createReplicationStrategy();
- return before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas
- ? ImmutableSet.of("When increasing replication factor you need to run a full (-full) repair to distribute the data.")
- : ImmutableSet.of();
+ if (before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas)
+ clientWarnings.add("When increasing replication factor you need to run a full (-full) repair to distribute the data.");
+
+ return clientWarnings;
}
private void validateNoRangeMovements()
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
index 094806c..806b50a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cql3.statements.schema;
+import java.util.HashSet;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
@@ -26,7 +27,10 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
-import org.apache.cassandra.auth.*;
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.exceptions.AlreadyExistsException;
@@ -37,6 +41,7 @@ import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.transport.Event.SchemaChange;
import org.apache.cassandra.transport.Event.SchemaChange.Change;
@@ -46,6 +51,7 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
private final KeyspaceAttributes attrs;
private final boolean ifNotExists;
+ private final HashSet<String> clientWarnings = new HashSet<>();
public CreateKeyspaceStatement(String keyspaceName, KeyspaceAttributes attrs, boolean ifNotExists)
{
@@ -56,6 +62,10 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
public Keyspaces apply(Keyspaces schema)
{
+ if (ClientWarn.instance.get() == null)
+ ClientWarn.instance.captureWarnings();
+ int previousNumWarnings = ClientWarn.instance.numWarnings();
+
attrs.validate();
if (!attrs.hasOption(Option.REPLICATION))
@@ -75,8 +85,13 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
keyspace.params.validate(keyspaceName);
+ Keyspaces keyspaces = schema.withAddedOrUpdated(keyspace);
+
+ int newNumWarnings = ClientWarn.instance.numWarnings();
+ if (newNumWarnings > previousNumWarnings)
+ clientWarnings.addAll(ClientWarn.instance.getWarnings().subList(previousNumWarnings, newNumWarnings));
- return schema.withAddedOrUpdated(keyspace);
+ return keyspaces;
}
SchemaChange schemaChangeEvent(KeyspacesDiff diff)
@@ -115,9 +130,10 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
String msg = String.format("Cluster already contains %d keyspaces. Having a large number of keyspaces will significantly slow down schema dependent cluster operations.",
keyspaceCount);
logger.warn(msg);
- return ImmutableSet.of(msg);
+ clientWarnings.add(msg);
}
- return ImmutableSet.of();
+
+ return clientWarnings;
}
public static final class Raw extends CQLStatement.Raw
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index bc64325..c508639 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.*;
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 4928259..ebf0f03 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -17,7 +17,13 @@
*/
package org.apache.cassandra.dht;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -26,39 +32,40 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.EndpointsByReplica;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.LocalStrategy;
-
-import org.apache.cassandra.locator.EndpointsByRange;
-import org.apache.cassandra.locator.EndpointsForRange;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsByRange;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamResultFuture;
-import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
@@ -353,9 +360,27 @@ public class RangeStreamer
*/
private boolean useStrictSourcesForRanges(AbstractReplicationStrategy strat)
{
- return useStrictConsistency
- && tokens != null
- && metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor().allReplicas;
+ boolean res = useStrictConsistency && tokens != null;
+
+ if (res)
+ {
+ int nodes = 0;
+
+ if (strat instanceof NetworkTopologyStrategy)
+ {
+ ImmutableMultimap<String, InetAddressAndPort> dc2Nodes = metadata.getDC2AllEndpoints(snitch);
+
+ NetworkTopologyStrategy ntps = (NetworkTopologyStrategy) strat;
+ for (String dc : dc2Nodes.keySet())
+ nodes += ntps.getReplicationFactor(dc).allReplicas > 0 ? dc2Nodes.get(dc).size() : 0;
+ }
+ else
+ nodes = metadata.getSizeOfAllEndpoints();
+
+ res = nodes > strat.getReplicationFactor().allReplicas;
+ }
+
+ return res;
}
/**
@@ -442,7 +467,7 @@ public class RangeStreamer
if (useStrictConsistency)
{
EndpointsForRange strictEndpoints;
- //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+ //Due to CASSANDRA-5953 we can have a higher RF than we have endpoints.
//So we need to be careful to only be strict when endpoints == RF
if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
{
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index d5aacd0..dfcfdc0 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,28 +20,28 @@ package org.apache.cassandra.locator;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
import org.apache.cassandra.service.DatacenterWriteResponseHandler;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
-
import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
@@ -51,9 +51,8 @@ public abstract class AbstractReplicationStrategy
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
- @VisibleForTesting
- final String keyspaceName;
public final Map<String, String> configOptions;
+ protected final String keyspaceName;
private final TokenMetadata tokenMetadata;
// track when the token range changes, signaling we need to invalidate our endpoint cache
@@ -299,6 +298,8 @@ public abstract class AbstractReplicationStrategy
public abstract void validateOptions() throws ConfigurationException;
+ public abstract void maybeWarnOnOptions();
+
/*
* The options recognized by the strategy.
* The empty collection means that no options are accepted, but null means
@@ -400,6 +401,7 @@ public abstract class AbstractReplicationStrategy
AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions);
strategy.validateExpectedOptions();
strategy.validateOptions();
+ strategy.maybeWarnOnOptions();
if (strategy.hasTransientReplicas() && !DatabaseDescriptor.isTransientReplicationEnabled())
{
throw new ConfigurationException("Transient replication is disabled. Enable in cassandra.yaml to use.");
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index 41cc9b0..e7da769 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -69,6 +69,11 @@ public class LocalStrategy extends AbstractReplicationStrategy
{
}
+ @Override
+ public void maybeWarnOnOptions()
+ {
+ }
+
public Collection<String> recognizedOptions()
{
// LocalStrategy doesn't expect any options.
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index be63ea1..9e6ad6d 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -29,6 +29,9 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -45,7 +48,7 @@ import com.google.common.collect.Multimap;
* <p>
* So for example, if the keyspace replication factor is 6, the
* datacenter replication factors could be 3, 2, and 1 - so 3 replicas in
- * one datacenter, 2 in another, and 1 in another - totalling 6.
+ * one datacenter, 2 in another, and 1 in another - totaling 6.
* </p>
* This class also caches the Endpoints and invalidates the cache if there is a
* change in the number of tokens.
@@ -104,7 +107,11 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
int acceptableRackRepeats;
int transients;
- DatacenterEndpoints(ReplicationFactor rf, int rackCount, int nodeCount, EndpointsForRange.Builder replicas, Set<Pair<String, String>> racks)
+ DatacenterEndpoints(ReplicationFactor rf,
+ int rackCount,
+ int nodeCount,
+ EndpointsForRange.Builder replicas,
+ Set<Pair<String, String>> racks)
{
this.replicas = replicas;
this.racks = racks;
@@ -291,6 +298,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
super.validateExpectedOptions();
}
+ @Override
public void validateOptions() throws ConfigurationException
{
for (Entry<String, String> e : this.configOptions.entrySet())
@@ -303,6 +311,36 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
}
@Override
+ public void maybeWarnOnOptions()
+ {
+ if (!SchemaConstants.isSystemKeyspace(keyspaceName))
+ {
+ ImmutableMultimap<String, InetAddressAndPort> dcsNodes = StorageService.instance.getTokenMetadata()
+ .getDC2AllEndpoints(snitch);
+ for (Entry<String, String> e : this.configOptions.entrySet())
+ {
+
+ String dc = e.getKey();
+ ReplicationFactor rf = getReplicationFactor(dc);
+ int nodeCount = dcsNodes.get(dc).size();
+ // nodeCount==0 on many tests
+ if (rf.fullReplicas > nodeCount && nodeCount != 0)
+ {
+ String msg = "Your replication factor " + rf.fullReplicas
+ + " for keyspace "
+ + keyspaceName
+ + " is higher than the number of nodes "
+ + nodeCount
+ + " for datacenter "
+ + dc;
+ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ }
+ }
+ }
+ }
+
+ @Override
public boolean hasSameSettings(AbstractReplicationStrategy other)
{
return super.hasSameSettings(other) && ((NetworkTopologyStrategy) other).datacenters.equals(datacenters);
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 610ffe1..672c9ff 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -18,16 +18,20 @@
package org.apache.cassandra.locator;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Collection;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.StorageService;
/**
@@ -39,6 +43,7 @@ import org.apache.cassandra.dht.Token;
public class SimpleStrategy extends AbstractReplicationStrategy
{
private static final String REPLICATION_FACTOR = "replication_factor";
+ private static final Logger logger = LoggerFactory.getLogger(SimpleStrategy.class);
private final ReplicationFactor rf;
public SimpleStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -84,12 +89,33 @@ public class SimpleStrategy extends AbstractReplicationStrategy
throw new ConfigurationException("SimpleStrategy requires a replication_factor strategy option.");
}
+ @Override
public void validateOptions() throws ConfigurationException
{
validateOptionsInternal(configOptions);
validateReplicationFactor(configOptions.get(REPLICATION_FACTOR));
}
+ @Override
+ public void maybeWarnOnOptions()
+ {
+ if (!SchemaConstants.isSystemKeyspace(keyspaceName))
+ {
+ int nodeCount = StorageService.instance.getHostIdToEndpoint().size();
+ // nodeCount==0 on many tests
+ if (rf.fullReplicas > nodeCount && nodeCount != 0)
+ {
+ String msg = "Your replication factor " + rf.fullReplicas
+ + " for keyspace "
+ + keyspaceName
+ + " is higher than the number of nodes "
+ + nodeCount;
+ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ }
+ }
+ }
+
public Collection<String> recognizedOptions()
{
return Collections.singleton(REPLICATION_FACTOR);
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 1d2607f..589a259 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -1335,6 +1335,14 @@ public class TokenMetadata
}
/**
+ * @return a (stable copy, won't be modified) datacenter to Endpoint map for all the nodes in the cluster.
+ */
+ public ImmutableMultimap<String, InetAddressAndPort> getDC2AllEndpoints(IEndpointSnitch snitch)
+ {
+ return Multimaps.index(getAllEndpoints(), snitch::getDatacenter);
+ }
+
+ /**
* @return the Topology map of nodes to DCs + Racks
*
* This is only allowed when a copy has been made of TokenMetadata, to avoid concurrent modifications
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java
index 5a6a878..ec79854 100644
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -64,6 +64,11 @@ public class ClientWarn implements ExecutorLocal<ClientWarn.State>
return state.warnings;
}
+ public int numWarnings()
+ {
+ return getWarnings() == null ? 0 : getWarnings().size();
+ }
+
public void resetWarnings()
{
warnLocal.remove();
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 069669d..23d0ae7 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -17,29 +17,36 @@
*/
package org.apache.cassandra.cql3.validation.operations;
+import java.util.List;
import java.util.UUID;
+
import org.junit.Test;
+import org.junit.runner.RunWith;
import com.datastax.driver.core.PreparedStatement;
-
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.api.Assertions;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+@RunWith(OrderedJUnit4ClassRunner.class)
public class AlterTest extends CQLTester
{
@Test
@@ -270,6 +277,58 @@ public class AlterTest extends CQLTester
}
@Test
+ public void testCreateAlterKeyspacesRFWarnings() throws Throwable
+ {
+ requireNetwork();
+
+ // NTS
+ ClientWarn.instance.captureWarnings();
+ String ks = createKeyspace("CREATE KEYSPACE %s WITH replication = {'class' : 'NetworkTopologyStrategy', '" + DATA_CENTER + "' : 3 }");
+ List<String> warnings = ClientWarn.instance.getWarnings();
+ assertEquals(1, warnings.size());
+ Assertions.assertThat(warnings.get(0)).contains("Your replication factor 3 for keyspace " + ks + " is higher than the number of nodes 1 for datacenter " + DATA_CENTER);
+
+ ClientWarn.instance.captureWarnings();
+ execute("CREATE TABLE " + ks + ".t (k int PRIMARY KEY, v int)");
+ warnings = ClientWarn.instance.getWarnings();
+ assertNull(warnings);
+
+ ClientWarn.instance.captureWarnings();
+ execute("ALTER KEYSPACE " + ks + " WITH replication = {'class' : 'NetworkTopologyStrategy', '" + DATA_CENTER + "' : 2 }");
+ warnings = ClientWarn.instance.getWarnings();
+ assertEquals(1, warnings.size());
+ Assertions.assertThat(warnings.get(0)).contains("Your replication factor 2 for keyspace " + ks + " is higher than the number of nodes 1 for datacenter " + DATA_CENTER);
+
+ ClientWarn.instance.captureWarnings();
+ execute("ALTER KEYSPACE " + ks + " WITH replication = {'class' : 'NetworkTopologyStrategy', '" + DATA_CENTER + "' : 1 }");
+ warnings = ClientWarn.instance.getWarnings();
+ assertNull(warnings);
+
+ // SimpleStrategy
+ ClientWarn.instance.captureWarnings();
+ ks = createKeyspace("CREATE KEYSPACE %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+ warnings = ClientWarn.instance.getWarnings();
+ assertEquals(1, warnings.size());
+ Assertions.assertThat(warnings.get(0)).contains("Your replication factor 3 for keyspace " + ks + " is higher than the number of nodes 1");
+
+ ClientWarn.instance.captureWarnings();
+ execute("CREATE TABLE " + ks + ".t (k int PRIMARY KEY, v int)");
+ warnings = ClientWarn.instance.getWarnings();
+ assertNull(warnings);
+
+ ClientWarn.instance.captureWarnings();
+ execute("ALTER KEYSPACE " + ks + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 }");
+ warnings = ClientWarn.instance.getWarnings();
+ assertEquals(1, warnings.size());
+ Assertions.assertThat(warnings.get(0)).contains("Your replication factor 2 for keyspace " + ks + " is higher than the number of nodes 1");
+
+ ClientWarn.instance.captureWarnings();
+ execute("ALTER KEYSPACE " + ks + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+ warnings = ClientWarn.instance.getWarnings();
+ assertNull(warnings);
+ }
+
+ @Test
public void testCreateAlterNetworkTopologyWithDefaults() throws Throwable
{
TokenMetadata metadata = StorageService.instance.getTokenMetadata();
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index b35b82f..3d5c680 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -35,21 +35,26 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.AbstractEndpointSnitch;
-import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.schema.*;
import org.apache.cassandra.triggers.ITrigger;
import org.apache.cassandra.utils.ByteBufferUtil;
import static java.lang.String.format;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-import static org.apache.cassandra.cql3.Duration.*;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_HOUR;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MICRO;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MILLI;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MINUTE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class CreateTest extends CQLTester
{
diff --git a/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java b/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
index c9a1356..f2abc7c 100644
--- a/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
+++ b/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.schema;
import java.io.IOException;
+import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
@@ -26,20 +27,16 @@ import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.transport.Server;
import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.transport.messages.QueryMessage;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class CreateTableValidationTest extends CQLTester
{
- private static final String KEYSPACE1 = "CreateTableValidationTest";
-
@Test
public void testInvalidBloomFilterFPRatio() throws Throwable
{
@@ -75,18 +72,25 @@ public class CreateTableValidationTest extends CQLTester
String createKeyspace = "CREATE KEYSPACE createkswarning%d WITH REPLICATION={'class':'org.apache.cassandra.locator.NetworkTopologyStrategy','datacenter1':'2'}";
QueryMessage query = new QueryMessage(String.format(createKeyspace, 1), QueryOptions.DEFAULT);
Message.Response resp = client.execute(query);
- assertTrue(resp.getWarnings().size() > 0);
- assertTrue(resp.getWarnings().get(0).contains("Having a large number of keyspaces will significantly"));
+ List<String> warns = resp.getWarnings();
+ warns.removeIf(w -> w.contains("is higher than the number of nodes"));
+ assertTrue(warns.size() > 0);
+ assertTrue(warns.get(0).contains("Having a large number of keyspaces will significantly"));
- DatabaseDescriptor.setKeyspaceCountWarnThreshold(Schema.instance.getKeyspaces().size() + 1);
+ DatabaseDescriptor.setKeyspaceCountWarnThreshold(Schema.instance.getKeyspaces().size() + 2);
query = new QueryMessage(String.format(createKeyspace, 2), QueryOptions.DEFAULT);
resp = client.execute(query);
- assertTrue(resp.getWarnings() == null || resp.getWarnings().isEmpty());
+ warns = resp.getWarnings();
+ if (warns != null)
+ warns.removeIf(w -> w.contains("is higher than the number of nodes"));
+ assertTrue(warns == null || warns.isEmpty());
query = new QueryMessage(String.format("CREATE TABLE %s.%s (id int primary key, x int)", KEYSPACE, "test1"), QueryOptions.DEFAULT);
resp = client.execute(query);
- assertTrue(resp.getWarnings().size() > 0);
- assertTrue(resp.getWarnings().get(0).contains("Having a large number of tables"));
+ warns = resp.getWarnings();
+ warns.removeIf(w -> w.contains("is higher than the number of nodes"));
+ assertTrue(warns.size() > 0);
+ assertTrue(warns.get(0).contains("Having a large number of tables"));
DatabaseDescriptor.setTableCountWarnThreshold(Schema.instance.getNumberOfTables() + 1);
query = new QueryMessage(String.format("CREATE TABLE %s.%s (id int primary key, x int)", KEYSPACE, "test2"), QueryOptions.DEFAULT);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org