You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/02/18 11:43:34 UTC

[cassandra] branch trunk updated: Fix node unable to join when RF > N in multi-DC with added warning

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 87e4f31  Fix node unable to join when RF > N in multi-DC with added warning
87e4f31 is described below

commit 87e4f31e30f6a89f1c17b5a9eb6406208e384d51
Author: Bereng <be...@gmail.com>
AuthorDate: Thu Feb 18 11:42:24 2021 +0000

    Fix node unable to join when RF > N in multi-DC with added warning
    
    patch by Berenguer Blasi; reviewed by Andrés de la Peña, Ekaterina Dimitrova and Tomasz Lasica for CASSANDRA-16296
---
 CHANGES.txt                                        |  1 +
 .../statements/schema/AlterKeyspaceStatement.java  | 25 +++++---
 .../statements/schema/CreateKeyspaceStatement.java | 24 ++++++--
 .../org/apache/cassandra/dht/BootStrapper.java     |  1 -
 .../org/apache/cassandra/dht/RangeStreamer.java    | 61 +++++++++++++------
 .../locator/AbstractReplicationStrategy.java       | 16 ++---
 .../apache/cassandra/locator/LocalStrategy.java    |  5 ++
 .../cassandra/locator/NetworkTopologyStrategy.java | 42 ++++++++++++-
 .../apache/cassandra/locator/SimpleStrategy.java   | 34 +++++++++--
 .../apache/cassandra/locator/TokenMetadata.java    |  8 +++
 .../org/apache/cassandra/service/ClientWarn.java   |  5 ++
 .../cql3/validation/operations/AlterTest.java      | 69 ++++++++++++++++++++--
 .../cql3/validation/operations/CreateTest.java     | 19 +++---
 .../schema/CreateTableValidationTest.java          | 24 ++++----
 14 files changed, 269 insertions(+), 65 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1f5cb1b..6d311eb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Fix node unable to join when RF > N in multi-DC with added warning (CASSANDRA-16296)
  * Add an option to nodetool tablestats to check sstable location correctness (CASSANDRA-16344) 
  * Unable to ALTER KEYSPACE while decommissioned/assassinated nodes are in gossip (CASSANDRA-16422)
  * Metrics backward compatibility restored after CASSANDRA-15066 (CASSANDRA-16083)
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
index 2ef890e..c1b9bc2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
@@ -17,13 +17,12 @@
  */
 package org.apache.cassandra.cql3.statements.schema;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import com.google.common.collect.ImmutableSet;
-
 import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
@@ -43,6 +42,7 @@ import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.utils.FBUtilities;
@@ -51,6 +51,7 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
 {
     private static final boolean allow_alter_rf_during_range_movement = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_alter_rf_during_range_movement");
     private static final boolean allow_unsafe_transient_changes = Boolean.getBoolean(Config.PROPERTY_PREFIX + "allow_unsafe_transient_changes");
+    private final HashSet<String> clientWarnings = new HashSet<>();
 
     private final KeyspaceAttributes attrs;
 
@@ -62,6 +63,9 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
 
     public Keyspaces apply(Keyspaces schema)
     {
+        if (ClientWarn.instance.get() == null)
+            ClientWarn.instance.captureWarnings();
+        int previousNumWarnings = ClientWarn.instance.numWarnings();
         attrs.validate();
 
         KeyspaceMetadata keyspace = schema.getNullable(keyspaceName);
@@ -78,7 +82,13 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
         validateNoRangeMovements();
         validateTransientReplication(keyspace.createReplicationStrategy(), newKeyspace.createReplicationStrategy());
 
-        return schema.withAddedOrUpdated(newKeyspace);
+        Keyspaces res = schema.withAddedOrUpdated(newKeyspace);
+
+        int newNumWarnings = ClientWarn.instance.numWarnings();
+        if (newNumWarnings > previousNumWarnings)
+            clientWarnings.addAll(ClientWarn.instance.getWarnings().subList(previousNumWarnings, newNumWarnings));
+
+        return res;
     }
 
     SchemaChange schemaChangeEvent(KeyspacesDiff diff)
@@ -95,16 +105,17 @@ public final class AlterKeyspaceStatement extends AlterSchemaStatement
     Set<String> clientWarnings(KeyspacesDiff diff)
     {
         if (diff.isEmpty())
-            return ImmutableSet.of();
+            return clientWarnings;
 
         KeyspaceDiff keyspaceDiff = diff.altered.get(0);
 
         AbstractReplicationStrategy before = keyspaceDiff.before.createReplicationStrategy();
         AbstractReplicationStrategy after = keyspaceDiff.after.createReplicationStrategy();
 
-        return before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas
-             ? ImmutableSet.of("When increasing replication factor you need to run a full (-full) repair to distribute the data.")
-             : ImmutableSet.of();
+        if (before.getReplicationFactor().fullReplicas < after.getReplicationFactor().fullReplicas)
+            clientWarnings.add("When increasing replication factor you need to run a full (-full) repair to distribute the data.");
+
+        return clientWarnings;
     }
 
     private void validateNoRangeMovements()
diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
index 094806c..806b50a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.statements.schema;
 
+import java.util.HashSet;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
@@ -26,7 +27,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.audit.AuditLogEntryType;
-import org.apache.cassandra.auth.*;
+import org.apache.cassandra.auth.DataResource;
+import org.apache.cassandra.auth.FunctionResource;
+import org.apache.cassandra.auth.IResource;
+import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.exceptions.AlreadyExistsException;
@@ -37,6 +41,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 
@@ -46,6 +51,7 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
 
     private final KeyspaceAttributes attrs;
     private final boolean ifNotExists;
+    private final HashSet<String> clientWarnings = new HashSet<>();
 
     public CreateKeyspaceStatement(String keyspaceName, KeyspaceAttributes attrs, boolean ifNotExists)
     {
@@ -56,6 +62,10 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
 
     public Keyspaces apply(Keyspaces schema)
     {
+        if (ClientWarn.instance.get() == null)
+            ClientWarn.instance.captureWarnings();
+        int previousNumWarnings = ClientWarn.instance.numWarnings();
+
         attrs.validate();
 
         if (!attrs.hasOption(Option.REPLICATION))
@@ -75,8 +85,13 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
             throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use.");
 
         keyspace.params.validate(keyspaceName);
+        Keyspaces keyspaces = schema.withAddedOrUpdated(keyspace);
+
+        int newNumWarnings = ClientWarn.instance.numWarnings();
+        if (newNumWarnings > previousNumWarnings)
+            clientWarnings.addAll(ClientWarn.instance.getWarnings().subList(previousNumWarnings, newNumWarnings));
 
-        return schema.withAddedOrUpdated(keyspace);
+        return keyspaces;
     }
 
     SchemaChange schemaChangeEvent(KeyspacesDiff diff)
@@ -115,9 +130,10 @@ public final class CreateKeyspaceStatement extends AlterSchemaStatement
             String msg = String.format("Cluster already contains %d keyspaces. Having a large number of keyspaces will significantly slow down schema dependent cluster operations.",
                                        keyspaceCount);
             logger.warn(msg);
-            return ImmutableSet.of(msg);
+            clientWarnings.add(msg);
         }
-        return ImmutableSet.of();
+
+        return clientWarnings;
     }
 
     public static final class Raw extends CQLStatement.Raw
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index bc64325..c508639 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.*;
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 4928259..ebf0f03 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -17,7 +17,13 @@
  */
 package org.apache.cassandra.dht;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -26,39 +32,40 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.EndpointsByReplica;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.LocalStrategy;
-
-import org.apache.cassandra.locator.EndpointsByRange;
-import org.apache.cassandra.locator.EndpointsForRange;
-import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
 import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsByRange;
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
 import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.streaming.StreamResultFuture;
-import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 import static com.google.common.base.Predicates.and;
 import static com.google.common.base.Predicates.not;
@@ -353,9 +360,27 @@ public class RangeStreamer
      */
     private boolean useStrictSourcesForRanges(AbstractReplicationStrategy strat)
     {
-        return useStrictConsistency
-                && tokens != null
-                && metadata.getSizeOfAllEndpoints() != strat.getReplicationFactor().allReplicas;
+        boolean res = useStrictConsistency && tokens != null;
+        
+        if (res)
+        {
+            int nodes = 0;
+
+            if (strat instanceof NetworkTopologyStrategy)
+            {
+                ImmutableMultimap<String, InetAddressAndPort> dc2Nodes = metadata.getDC2AllEndpoints(snitch);
+
+                NetworkTopologyStrategy ntps = (NetworkTopologyStrategy) strat;
+                for (String dc : dc2Nodes.keySet())
+                    nodes += ntps.getReplicationFactor(dc).allReplicas > 0 ? dc2Nodes.get(dc).size() : 0;
+            }
+            else
+                nodes = metadata.getSizeOfAllEndpoints();
+    
+            res = nodes > strat.getReplicationFactor().allReplicas;
+        }
+        
+        return res;
     }
 
     /**
@@ -442,7 +467,7 @@ public class RangeStreamer
                  if (useStrictConsistency)
                  {
                      EndpointsForRange strictEndpoints;
-                     //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints.
+                     //Due to CASSANDRA-5953 we can have a higher RF than we have endpoints.
                      //So we need to be careful to only be strict when endpoints == RF
                      if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas)
                      {
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index d5aacd0..dfcfdc0 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -20,28 +20,28 @@ package org.apache.cassandra.locator;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.*;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
 import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
 import org.apache.cassandra.service.DatacenterWriteResponseHandler;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
-
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
@@ -51,9 +51,8 @@ public abstract class AbstractReplicationStrategy
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
-    @VisibleForTesting
-    final String keyspaceName;
     public final Map<String, String> configOptions;
+    protected final String keyspaceName;
     private final TokenMetadata tokenMetadata;
 
     // track when the token range changes, signaling we need to invalidate our endpoint cache
@@ -299,6 +298,8 @@ public abstract class AbstractReplicationStrategy
 
     public abstract void validateOptions() throws ConfigurationException;
 
+    public abstract void maybeWarnOnOptions();
+
     /*
      * The options recognized by the strategy.
      * The empty collection means that no options are accepted, but null means
@@ -400,6 +401,7 @@ public abstract class AbstractReplicationStrategy
         AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions);
         strategy.validateExpectedOptions();
         strategy.validateOptions();
+        strategy.maybeWarnOnOptions();
         if (strategy.hasTransientReplicas() && !DatabaseDescriptor.isTransientReplicationEnabled())
         {
             throw new ConfigurationException("Transient replication is disabled. Enable in cassandra.yaml to use.");
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index 41cc9b0..e7da769 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -69,6 +69,11 @@ public class LocalStrategy extends AbstractReplicationStrategy
     {
     }
 
+    @Override
+    public void maybeWarnOnOptions()
+    {
+    }
+
     public Collection<String> recognizedOptions()
     {
         // LocalStrategy doesn't expect any options.
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index be63ea1..9e6ad6d 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -29,6 +29,9 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
@@ -45,7 +48,7 @@ import com.google.common.collect.Multimap;
  * <p>
  * So for example, if the keyspace replication factor is 6, the
  * datacenter replication factors could be 3, 2, and 1 - so 3 replicas in
- * one datacenter, 2 in another, and 1 in another - totalling 6.
+ * one datacenter, 2 in another, and 1 in another - totaling 6.
  * </p>
  * This class also caches the Endpoints and invalidates the cache if there is a
  * change in the number of tokens.
@@ -104,7 +107,11 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         int acceptableRackRepeats;
         int transients;
 
-        DatacenterEndpoints(ReplicationFactor rf, int rackCount, int nodeCount, EndpointsForRange.Builder replicas, Set<Pair<String, String>> racks)
+        DatacenterEndpoints(ReplicationFactor rf,
+                            int rackCount,
+                            int nodeCount,
+                            EndpointsForRange.Builder replicas,
+                            Set<Pair<String, String>> racks)
         {
             this.replicas = replicas;
             this.racks = racks;
@@ -291,6 +298,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         super.validateExpectedOptions();
     }
 
+    @Override
     public void validateOptions() throws ConfigurationException
     {
         for (Entry<String, String> e : this.configOptions.entrySet())
@@ -303,6 +311,36 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
     }
 
     @Override
+    public void maybeWarnOnOptions()
+    {
+        if (!SchemaConstants.isSystemKeyspace(keyspaceName))
+        {
+            ImmutableMultimap<String, InetAddressAndPort> dcsNodes = StorageService.instance.getTokenMetadata()
+                                                                                            .getDC2AllEndpoints(snitch);
+            for (Entry<String, String> e : this.configOptions.entrySet())
+            {
+
+                String dc = e.getKey();
+                ReplicationFactor rf = getReplicationFactor(dc);
+                int nodeCount = dcsNodes.get(dc).size();
+                // nodeCount==0 on many tests
+                if (rf.fullReplicas > nodeCount && nodeCount != 0)
+                {
+                    String msg = "Your replication factor " + rf.fullReplicas
+                                 + " for keyspace "
+                                 + keyspaceName
+                                 + " is higher than the number of nodes "
+                                 + nodeCount
+                                 + " for datacenter "
+                                 + dc;
+                    ClientWarn.instance.warn(msg);
+                    logger.warn(msg);
+                }
+            }
+        }
+    }
+
+    @Override
     public boolean hasSameSettings(AbstractReplicationStrategy other)
     {
         return super.hasSameSettings(other) && ((NetworkTopologyStrategy) other).datacenters.equals(datacenters);
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 610ffe1..672c9ff 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -18,16 +18,20 @@
 package org.apache.cassandra.locator;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Collection;
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -39,6 +43,7 @@ import org.apache.cassandra.dht.Token;
 public class SimpleStrategy extends AbstractReplicationStrategy
 {
     private static final String REPLICATION_FACTOR = "replication_factor";
+    private static final Logger logger = LoggerFactory.getLogger(SimpleStrategy.class);
     private final ReplicationFactor rf;
 
     public SimpleStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@ -84,12 +89,33 @@ public class SimpleStrategy extends AbstractReplicationStrategy
             throw new ConfigurationException("SimpleStrategy requires a replication_factor strategy option.");
     }
 
+    @Override
     public void validateOptions() throws ConfigurationException
     {
         validateOptionsInternal(configOptions);
         validateReplicationFactor(configOptions.get(REPLICATION_FACTOR));
     }
 
+    @Override
+    public void maybeWarnOnOptions()
+    {
+        if (!SchemaConstants.isSystemKeyspace(keyspaceName))
+        {
+            int nodeCount = StorageService.instance.getHostIdToEndpoint().size();
+            // nodeCount==0 on many tests
+            if (rf.fullReplicas > nodeCount && nodeCount != 0)
+            {
+                String msg = "Your replication factor " + rf.fullReplicas
+                             + " for keyspace "
+                             + keyspaceName
+                             + " is higher than the number of nodes "
+                             + nodeCount;
+                ClientWarn.instance.warn(msg);
+                logger.warn(msg);
+            }
+        }
+    }
+
     public Collection<String> recognizedOptions()
     {
         return Collections.singleton(REPLICATION_FACTOR);
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 1d2607f..589a259 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -1335,6 +1335,14 @@ public class TokenMetadata
     }
 
     /**
+     * @return a (stable copy, won't be modified) datacenter to Endpoint map for all the nodes in the cluster.
+     */
+    public ImmutableMultimap<String, InetAddressAndPort> getDC2AllEndpoints(IEndpointSnitch snitch)
+    {
+        return Multimaps.index(getAllEndpoints(), snitch::getDatacenter);
+    }
+
+    /**
      * @return the Topology map of nodes to DCs + Racks
      *
      * This is only allowed when a copy has been made of TokenMetadata, to avoid concurrent modifications
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java
index 5a6a878..ec79854 100644
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -64,6 +64,11 @@ public class ClientWarn implements ExecutorLocal<ClientWarn.State>
         return state.warnings;
     }
 
+    public int numWarnings()
+    {
+        return getWarnings() == null ? 0 : getWarnings().size();
+    }
+
     public void resetWarnings()
     {
         warnLocal.remove();
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
index 069669d..23d0ae7 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java
@@ -17,29 +17,36 @@
  */
 package org.apache.cassandra.cql3.validation.operations;
 
+import java.util.List;
 import java.util.UUID;
+
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import com.datastax.driver.core.PreparedStatement;
-
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.api.Assertions;
 
 import static java.lang.String.format;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(OrderedJUnit4ClassRunner.class)
 public class AlterTest extends CQLTester
 {
     @Test
@@ -270,6 +277,58 @@ public class AlterTest extends CQLTester
     }
 
     @Test
+    public void testCreateAlterKeyspacesRFWarnings() throws Throwable
+    {
+        requireNetwork();
+
+        // NTS
+        ClientWarn.instance.captureWarnings();
+        String ks = createKeyspace("CREATE KEYSPACE %s WITH replication = {'class' : 'NetworkTopologyStrategy', '" + DATA_CENTER + "' : 3 }");
+        List<String> warnings = ClientWarn.instance.getWarnings();
+        assertEquals(1, warnings.size());
+        Assertions.assertThat(warnings.get(0)).contains("Your replication factor 3 for keyspace " + ks + " is higher than the number of nodes 1 for datacenter " + DATA_CENTER);
+
+        ClientWarn.instance.captureWarnings();
+        execute("CREATE TABLE " + ks + ".t (k int PRIMARY KEY, v int)");
+        warnings = ClientWarn.instance.getWarnings();
+        assertNull(warnings);
+
+        ClientWarn.instance.captureWarnings();
+        execute("ALTER KEYSPACE " + ks + " WITH replication = {'class' : 'NetworkTopologyStrategy', '" + DATA_CENTER + "' : 2 }");
+        warnings = ClientWarn.instance.getWarnings();
+        assertEquals(1, warnings.size());
+        Assertions.assertThat(warnings.get(0)).contains("Your replication factor 2 for keyspace " + ks + " is higher than the number of nodes 1 for datacenter " + DATA_CENTER);
+
+        ClientWarn.instance.captureWarnings();
+        execute("ALTER KEYSPACE " + ks + " WITH replication = {'class' : 'NetworkTopologyStrategy', '" + DATA_CENTER + "' : 1 }");
+        warnings = ClientWarn.instance.getWarnings();
+        assertNull(warnings);
+
+        // SimpleStrategy
+        ClientWarn.instance.captureWarnings();
+        ks = createKeyspace("CREATE KEYSPACE %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        warnings = ClientWarn.instance.getWarnings();
+        assertEquals(1, warnings.size());
+        Assertions.assertThat(warnings.get(0)).contains("Your replication factor 3 for keyspace " + ks + " is higher than the number of nodes 1");
+
+        ClientWarn.instance.captureWarnings();
+        execute("CREATE TABLE " + ks + ".t (k int PRIMARY KEY, v int)");
+        warnings = ClientWarn.instance.getWarnings();
+        assertNull(warnings);
+
+        ClientWarn.instance.captureWarnings();
+        execute("ALTER KEYSPACE " + ks + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 }");
+        warnings = ClientWarn.instance.getWarnings();
+        assertEquals(1, warnings.size());
+        Assertions.assertThat(warnings.get(0)).contains("Your replication factor 2 for keyspace " + ks + " is higher than the number of nodes 1");
+
+        ClientWarn.instance.captureWarnings();
+        execute("ALTER KEYSPACE " + ks + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }");
+        warnings = ClientWarn.instance.getWarnings();
+        assertNull(warnings);
+    }
+
+    @Test
     public void testCreateAlterNetworkTopologyWithDefaults() throws Throwable
     {
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
index b35b82f..3d5c680 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/CreateTest.java
@@ -35,21 +35,26 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.AbstractEndpointSnitch;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.schema.*;
 import org.apache.cassandra.triggers.ITrigger;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static java.lang.String.format;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
-import static org.apache.cassandra.cql3.Duration.*;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_HOUR;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MICRO;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MILLI;
+import static org.apache.cassandra.cql3.Duration.NANOS_PER_MINUTE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CreateTest extends CQLTester
 {
diff --git a/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java b/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
index c9a1356..f2abc7c 100644
--- a/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
+++ b/test/unit/org/apache/cassandra/schema/CreateTableValidationTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.schema;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
@@ -26,20 +27,16 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.transport.Message;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.transport.messages.QueryMessage;
 
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class CreateTableValidationTest extends CQLTester
 {
-    private static final String KEYSPACE1 = "CreateTableValidationTest";
-
     @Test
     public void testInvalidBloomFilterFPRatio() throws Throwable
     {
@@ -75,18 +72,25 @@ public class CreateTableValidationTest extends CQLTester
             String createKeyspace = "CREATE KEYSPACE createkswarning%d WITH REPLICATION={'class':'org.apache.cassandra.locator.NetworkTopologyStrategy','datacenter1':'2'}";
             QueryMessage query = new QueryMessage(String.format(createKeyspace, 1), QueryOptions.DEFAULT);
             Message.Response resp = client.execute(query);
-            assertTrue(resp.getWarnings().size() > 0);
-            assertTrue(resp.getWarnings().get(0).contains("Having a large number of keyspaces will significantly"));
+            List<String> warns = resp.getWarnings();
+            warns.removeIf(w -> w.contains("is higher than the number of nodes"));
+            assertTrue(warns.size() > 0);
+            assertTrue(warns.get(0).contains("Having a large number of keyspaces will significantly"));
 
-            DatabaseDescriptor.setKeyspaceCountWarnThreshold(Schema.instance.getKeyspaces().size() + 1);
+            DatabaseDescriptor.setKeyspaceCountWarnThreshold(Schema.instance.getKeyspaces().size() + 2);
             query = new QueryMessage(String.format(createKeyspace, 2), QueryOptions.DEFAULT);
             resp = client.execute(query);
-            assertTrue(resp.getWarnings() == null || resp.getWarnings().isEmpty());
+            warns = resp.getWarnings();
+            if (warns != null)
+                warns.removeIf(w -> w.contains("is higher than the number of nodes"));
+            assertTrue(warns == null || warns.isEmpty());
 
             query = new QueryMessage(String.format("CREATE TABLE %s.%s (id int primary key, x int)", KEYSPACE, "test1"), QueryOptions.DEFAULT);
             resp = client.execute(query);
-            assertTrue(resp.getWarnings().size() > 0);
-            assertTrue(resp.getWarnings().get(0).contains("Having a large number of tables"));
+            warns = resp.getWarnings();
+            warns.removeIf(w -> w.contains("is higher than the number of nodes"));
+            assertTrue(warns.size() > 0);
+            assertTrue(warns.get(0).contains("Having a large number of tables"));
 
             DatabaseDescriptor.setTableCountWarnThreshold(Schema.instance.getNumberOfTables() + 1);
             query = new QueryMessage(String.format("CREATE TABLE %s.%s (id int primary key, x int)", KEYSPACE, "test2"), QueryOptions.DEFAULT);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org