You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/11/12 17:45:35 UTC

cassandra git commit: Startup checker should wait for count rather than percentage

Repository: cassandra
Updated Branches:
  refs/heads/trunk 918b1d8c6 -> 801cb70ee


Startup checker should wait for count rather than percentage

This improves on the wait for healthy work from CASSANDRA-13993 to
solve CASSANDRA-14297. In particular now the connectivity checker waits
for all but a single node in either the local datacenter or every
datacenter (defaults to just local, but the user can configure it to
wait for every datacenter). This way users can use this feature to ensure
availability of their application during restarts of Cassandra. The default
behavior waits for all but a single local datacenter node.

Patch by Joseph Lynch; Reviewed by Ariel Weisberg for CASSANDRA-14297


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/801cb70e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/801cb70e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/801cb70e

Branch: refs/heads/trunk
Commit: 801cb70ee811c956e987718a00695638d5bec1b6
Parents: 918b1d8
Author: Joseph Lynch <jo...@gmail.com>
Authored: Thu Aug 23 15:19:20 2018 -0700
Committer: Ariel Weisberg <aw...@apple.com>
Committed: Mon Nov 12 12:41:17 2018 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   7 +
 .../org/apache/cassandra/config/Config.java     |  23 ++-
 .../cassandra/config/DatabaseDescriptor.java    |   4 +-
 .../net/StartupClusterConnectivityChecker.java  | 137 ++++++++++----
 .../cassandra/service/CassandraDaemon.java      |   6 +-
 .../StartupClusterConnectivityCheckerTest.java  | 179 +++++++++++++++++--
 7 files changed, 299 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/801cb70e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a7a75c0..aaea773 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Startup checker should wait for count rather than percentage (CASSANDRA-14297)
  * Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862)
  * Partitioned outbound internode TCP connections can occur when nodes restart (CASSANDRA-14358)
  * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/801cb70e/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0d211a3..63c4a47 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,13 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - Nodes will now bootstrap all intra-cluster connections at startup by default and wait
+     10 seconds for the all but one node in the local data center to be connected and marked
+     UP in gossip. This prevents nodes from coordinating requests and failing because they
+     aren't able to connect to the cluster fast enough. block_for_peers_timeout_in_secs in
+     cassandra.yaml can be used to configure how long to wait (or whether to wait at all)
+     and block_for_peers_in_remote_dcs can be used to also block on all but one node in
+     each remote DC as well. See CASSANDRA-14297 and CASSANDRA-13993 for more information.
    - *Experimental* support for Transient Replication and Cheap Quorums introduced by CASSANDRA-14404
      The intended audience for this functionality is expert users of Cassandra who are prepared
      to validate every aspect of the database for their application and deployment practices. Future

http://git-wip-us.apache.org/repos/asf/cassandra/blob/801cb70e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 1e80108..7371df7 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -388,9 +388,28 @@ public class Config
     public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
     public int repair_command_pool_size = concurrent_validations;
 
-    // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
-    public int block_for_peers_percentage = 70;
+    /**
+     * When a node first starts up it intially considers all other peers as DOWN and is disconnected from all of them.
+     * To be useful as a coordinator (and not introduce latency penalties on restart) this node must have successfully
+     * opened all three internode TCP connections (gossip, small, and large messages) before advertising to clients.
+     * Due to this, by default, Casssandra will prime these internode TCP connections and wait for all but a single
+     * node to be DOWN/disconnected in the local datacenter before offering itself as a coordinator, subject to a
+     * timeout. See CASSANDRA-13993 and CASSANDRA-14297 for more details.
+     *
+     * We provide two tunables to control this behavior as some users may want to block until all datacenters are
+     * available (global QUORUM/EACH_QUORUM), some users may not want to block at all (clients that already work
+     * around the problem), and some users may want to prime the connections but not delay startup.
+     *
+     * block_for_peers_timeout_in_secs: controls how long this node will wait to connect to peers. To completely disable
+     * any startup connectivity checks set this to -1. To trigger the internode connections but immediately continue
+     * startup, set this to to 0. The default is 10 seconds.
+     *
+     * block_for_peers_in_remote_dcs: controls if this node will consider remote datacenters to wait for. The default
+     * is to _not_ wait on remote datacenters.
+     */
     public int block_for_peers_timeout_in_secs = 10;
+    public boolean block_for_peers_in_remote_dcs = false;
+
     public volatile boolean automatic_sstable_upgrade = false;
     public volatile int max_concurrent_automatic_sstable_upgrades = 1;
     public boolean stream_entire_sstables = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/801cb70e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1b11a91..bc1e5a2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2630,9 +2630,9 @@ public class DatabaseDescriptor
         return  conf.full_query_logging_options;
     }
 
-    public static int getBlockForPeersPercentage()
+    public static boolean getBlockForPeersInRemoteDatacenters()
     {
-        return conf.block_for_peers_percentage;
+        return conf.block_for_peers_in_remote_dcs;
     }
 
     public static int getBlockForPeersTimeoutInSeconds()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/801cb70e/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index bab3283..8e37470 100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.net;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -24,9 +26,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,73 +53,126 @@ public class StartupClusterConnectivityChecker
 {
     private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-    private final int targetPercent;
+    private final boolean blockForRemoteDcs;
     private final long timeoutNanos;
 
-    public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs)
+    public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs)
     {
-        timeoutSecs = Math.max(1, timeoutSecs);
         if (timeoutSecs > 100)
             logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
         long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-        return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos);
+        return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs);
     }
 
     @VisibleForTesting
-    StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+    StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs)
     {
-        this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+        this.blockForRemoteDcs = blockForRemoteDcs;
         this.timeoutNanos = timeoutNanos;
     }
 
     /**
      * @param peers The currently known peers in the cluster; argument is not modified.
+     * @param getDatacenterSource A function for mapping peers to their datacenter.
      * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
      * else false.
      */
-    public boolean execute(Set<InetAddressAndPort> peers)
+    public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort, String> getDatacenterSource)
     {
-        if (targetPercent == 0 || peers == null)
+        if (peers == null || this.timeoutNanos < 0)
             return true;
 
         // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
         peers = new HashSet<>(peers);
-        peers.remove(FBUtilities.getBroadcastAddressAndPort());
+        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
+        String localDc = getDatacenterSource.apply(localAddress);
 
+        peers.remove(localAddress);
         if (peers.isEmpty())
             return true;
 
-        logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds",
-                    targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+        // make a copy of the datacenter mapping (in case gossip updates happen during this method or some such)
+        Map<InetAddressAndPort, String> peerToDatacenter = new HashMap<>();
+        SetMultimap<String, InetAddressAndPort> datacenterToPeers = HashMultimap.create();
 
-        long startNanos = System.nanoTime();
+        for (InetAddressAndPort peer : peers)
+        {
+            String datacenter = getDatacenterSource.apply(peer);
+            peerToDatacenter.put(peer, datacenter);
+            datacenterToPeers.put(datacenter, peer);
+        }
+
+        // In the case where we do not want to block startup on remote datacenters (e.g. because clients only use
+        // LOCAL_X consistency levels), we remove all other datacenter hosts from the mapping and we only wait
+        // on the remaining local datacenter.
+        if (!blockForRemoteDcs)
+        {
+            datacenterToPeers.keySet().retainAll(Collections.singleton(localDc));
+            logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s",
+                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+        }
+        else
+        {
+            logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s",
+                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+        }
 
         AckMap acks = new AckMap(3);
-        int target = (int) ((targetPercent / 100.0) * peers.size());
-        CountDownLatch latch = new CountDownLatch(target);
+        Map<String, CountDownLatch> dcToRemainingPeers = new HashMap<>(datacenterToPeers.size());
+        for (String datacenter: datacenterToPeers.keys())
+        {
+            dcToRemainingPeers.put(datacenter,
+                                   new CountDownLatch(Math.max(datacenterToPeers.get(datacenter).size() - 1, 0)));
+        }
+
+        long startNanos = System.nanoTime();
 
         // set up a listener to react to new nodes becoming alive (in gossip), and account for all the nodes that are already alive
-        Set<InetAddressAndPort> alivePeers = Sets.newSetFromMap(new ConcurrentHashMap<>());
-        AliveListener listener = new AliveListener(alivePeers, latch, acks);
+        Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new ConcurrentHashMap<>());
+        AliveListener listener = new AliveListener(alivePeers, dcToRemainingPeers, acks, peerToDatacenter::get);
         Gossiper.instance.register(listener);
 
-        // send out a ping message to open up the non-gossip connections
-        sendPingMessages(peers, latch, acks);
+        // send out a ping message to open up the non-gossip connections to all peers. Note that this sends the
+        // ping messages to _all_ peers, not just the ones we block for in dcToRemainingPeers.
+        sendPingMessages(peers, dcToRemainingPeers, acks, peerToDatacenter::get);
 
         for (InetAddressAndPort peer : peers)
+        {
             if (Gossiper.instance.isAlive(peer) && alivePeers.add(peer) && acks.incrementAndCheck(peer))
-                latch.countDown();
+            {
+                String datacenter = peerToDatacenter.get(peer);
+                // We have to check because we might only have the local DC in the map
+                if (dcToRemainingPeers.containsKey(datacenter))
+                    dcToRemainingPeers.get(datacenter).countDown();
+            }
+        }
+
+        boolean succeeded = true;
+        for (String datacenter: dcToRemainingPeers.keySet())
+        {
+            long remainingNanos = Math.max(1, timeoutNanos - (System.nanoTime() - startNanos));
+            succeeded &= Uninterruptibles.awaitUninterruptibly(dcToRemainingPeers.get(datacenter),
+                                                               remainingNanos, TimeUnit.NANOSECONDS);
+        }
 
-        boolean succeeded = Uninterruptibles.awaitUninterruptibly(latch, timeoutNanos, TimeUnit.NANOSECONDS);
         Gossiper.instance.unregister(listener);
 
-        int connected = peers.size() - (int) latch.getCount();
-        logger.info("After waiting/processing for {} milliseconds, {} out of {} peers ({}%) have been marked alive and had connections established",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos),
-                    connected,
-                    peers.size(),
-                    String.format("%.2f", (connected / (float)peers.size()) * 100));
+        Map<String, Long> numDown = dcToRemainingPeers.entrySet().stream()
+                                                      .collect(Collectors.toMap(Map.Entry::getKey,
+                                                                                e -> e.getValue().getCount()));
+
+        if (succeeded)
+        {
+            logger.info("Ensured sufficient healthy connections with {} after {} milliseconds",
+                        numDown.keySet(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+        }
+        else
+        {
+            logger.warn("Timed out after {} milliseconds, was waiting for remaining peers to connect: {}",
+                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos), numDown);
+        }
+
         return succeeded;
     }
 
@@ -122,7 +180,8 @@ public class StartupClusterConnectivityChecker
      * Sends a "connection warmup" message to each peer in the collection, on every {@link ConnectionType}
      * used for internode messaging (that is not gossip).
      */
-    private void sendPingMessages(Set<InetAddressAndPort> peers, CountDownLatch latch, AckMap acks)
+    private void sendPingMessages(Set<InetAddressAndPort> peers, Map<String, CountDownLatch> dcToRemainingPeers,
+                                  AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
     {
         IAsyncCallback responseHandler = new IAsyncCallback()
         {
@@ -134,7 +193,12 @@ public class StartupClusterConnectivityChecker
             public void response(MessageIn msg)
             {
                 if (acks.incrementAndCheck(msg.from))
-                    latch.countDown();
+                {
+                    String datacenter = getDatacenter.apply(msg.from);
+                    // We have to check because we might only have the local DC in the map
+                    if (dcToRemainingPeers.containsKey(datacenter))
+                        dcToRemainingPeers.get(datacenter).countDown();
+                }
             }
         };
 
@@ -155,15 +219,18 @@ public class StartupClusterConnectivityChecker
      */
     private static final class AliveListener implements IEndpointStateChangeSubscriber
     {
-        private final CountDownLatch latch;
+        private final Map<String, CountDownLatch> dcToRemainingPeers;
         private final Set<InetAddressAndPort> livePeers;
+        private final Function<InetAddressAndPort, String> getDatacenter;
         private final AckMap acks;
 
-        AliveListener(Set<InetAddressAndPort> livePeers, CountDownLatch latch, AckMap acks)
+        AliveListener(Set<InetAddressAndPort> livePeers, Map<String, CountDownLatch> dcToRemainingPeers,
+                      AckMap acks, Function<InetAddressAndPort, String> getDatacenter)
         {
-            this.latch = latch;
             this.livePeers = livePeers;
+            this.dcToRemainingPeers = dcToRemainingPeers;
             this.acks = acks;
+            this.getDatacenter = getDatacenter;
         }
 
         public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
@@ -181,7 +248,11 @@ public class StartupClusterConnectivityChecker
         public void onAlive(InetAddressAndPort endpoint, EndpointState state)
         {
             if (livePeers.add(endpoint) && acks.incrementAndCheck(endpoint))
-                latch.countDown();
+            {
+                String datacenter = getDatacenter.apply(endpoint);
+                if (dcToRemainingPeers.containsKey(datacenter))
+                    dcToRemainingPeers.get(datacenter).countDown();
+            }
         }
 
         public void onDead(InetAddressAndPort endpoint, EndpointState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/801cb70e/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 815e673..f0b2dc1 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -505,9 +505,9 @@ public class CassandraDaemon
      */
     public void start()
     {
-        StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersPercentage(),
-                                                                                                         DatabaseDescriptor.getBlockForPeersTimeoutInSeconds());
-        connectivityChecker.execute(Gossiper.instance.getEndpoints());
+        StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
+                                                                                                         DatabaseDescriptor.getBlockForPeersInRemoteDatacenters());
+        connectivityChecker.execute(Gossiper.instance.getEndpoints(), DatabaseDescriptor.getEndpointSnitch()::getDatacenter);
 
         String nativeFlag = System.getProperty("cassandra.start_native_transport");
         if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/801cb70e/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
index 4eeb314..1645d77 100644
--- a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
+++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java
@@ -36,13 +36,35 @@ import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.HeartBeatState;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
 
 public class StartupClusterConnectivityCheckerTest
 {
-    private StartupClusterConnectivityChecker connectivityChecker;
+    private StartupClusterConnectivityChecker localQuorumConnectivityChecker;
+    private StartupClusterConnectivityChecker globalQuorumConnectivityChecker;
+    private StartupClusterConnectivityChecker noopChecker;
+    private StartupClusterConnectivityChecker zeroWaitChecker;
+
+    private static final long TIMEOUT_NANOS = 100;
+    private static final int NUM_PER_DC = 6;
     private Set<InetAddressAndPort> peers;
+    private Set<InetAddressAndPort> peersA;
+    private Set<InetAddressAndPort> peersAMinusLocal;
+    private Set<InetAddressAndPort> peersB;
+    private Set<InetAddressAndPort> peersC;
+
+    private String getDatacenter(InetAddressAndPort endpoint)
+    {
+        if (peersA.contains(endpoint))
+            return "datacenterA";
+        if (peersB.contains(endpoint))
+            return "datacenterB";
+        else if (peersC.contains(endpoint))
+            return "datacenterC";
+        return null;
+    }
 
     @BeforeClass
     public static void before()
@@ -53,11 +75,34 @@ public class StartupClusterConnectivityCheckerTest
     @Before
     public void setUp() throws UnknownHostException
     {
-        connectivityChecker = new StartupClusterConnectivityChecker(70, 10);
+        localQuorumConnectivityChecker = new StartupClusterConnectivityChecker(TIMEOUT_NANOS, false);
+        globalQuorumConnectivityChecker = new StartupClusterConnectivityChecker(TIMEOUT_NANOS, true);
+        noopChecker = new StartupClusterConnectivityChecker(-1, false);
+        zeroWaitChecker = new StartupClusterConnectivityChecker(0, false);
+
+        peersA = new HashSet<>();
+        peersAMinusLocal = new HashSet<>();
+        peersA.add(FBUtilities.getBroadcastAddressAndPort());
+
+        for (int i = 0; i < NUM_PER_DC - 1; i ++)
+        {
+            peersA.add(InetAddressAndPort.getByName("127.0.1." + i));
+            peersAMinusLocal.add(InetAddressAndPort.getByName("127.0.1." + i));
+        }
+
+        peersB = new HashSet<>();
+        for (int i = 0; i < NUM_PER_DC; i ++)
+            peersB.add(InetAddressAndPort.getByName("127.0.2." + i));
+
+
+        peersC = new HashSet<>();
+        for (int i = 0; i < NUM_PER_DC; i ++)
+            peersC.add(InetAddressAndPort.getByName("127.0.3." + i));
+
         peers = new HashSet<>();
-        peers.add(InetAddressAndPort.getByName("127.0.1.0"));
-        peers.add(InetAddressAndPort.getByName("127.0.1.1"));
-        peers.add(InetAddressAndPort.getByName("127.0.1.2"));
+        peers.addAll(peersA);
+        peers.addAll(peersB);
+        peers.addAll(peersC);
     }
 
     @After
@@ -69,50 +114,145 @@ public class StartupClusterConnectivityCheckerTest
     @Test
     public void execute_HappyPath()
     {
-        Sink sink = new Sink(true, true);
+        Sink sink = new Sink(true, true, peers);
         MessagingService.instance().addMessageSink(sink);
-        Assert.assertTrue(connectivityChecker.execute(peers));
-        checkAllConnectionTypesSeen(sink);
+        Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, this::getDatacenter));
+        Assert.assertTrue(checkAllConnectionTypesSeen(sink));
     }
 
     @Test
     public void execute_NotAlive()
     {
-        Sink sink = new Sink(false, true);
+        Sink sink = new Sink(false, true, peers);
         MessagingService.instance().addMessageSink(sink);
-        Assert.assertFalse(connectivityChecker.execute(peers));
-        checkAllConnectionTypesSeen(sink);
+        Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter));
+        Assert.assertTrue(checkAllConnectionTypesSeen(sink));
     }
 
     @Test
     public void execute_NoConnectionsAcks()
     {
-        Sink sink = new Sink(true, false);
+        Sink sink = new Sink(true, false, peers);
+        MessagingService.instance().addMessageSink(sink);
+        Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter));
+    }
+
+    @Test
+    public void execute_LocalQuorum()
+    {
+        // local peer plus 3 peers from same dc shouldn't pass (4/6)
+        Set<InetAddressAndPort> available = new HashSet<>();
+        copyCount(peersAMinusLocal, available, NUM_PER_DC - 3);
+        checkAvailable(localQuorumConnectivityChecker, available, false, true);
+
+        // local peer plus 4 peers from same dc should pass (5/6)
+        available.clear();
+        copyCount(peersAMinusLocal, available, NUM_PER_DC - 2);
+        checkAvailable(localQuorumConnectivityChecker, available, true, true);
+    }
+
+    @Test
+    public void execute_GlobalQuorum()
+    {
+        // local dc passing shouldn't pass globally with two hosts down in datacenterB
+        Set<InetAddressAndPort> available = new HashSet<>();
+        copyCount(peersAMinusLocal, available, NUM_PER_DC - 2);
+        copyCount(peersB, available, NUM_PER_DC - 2);
+        copyCount(peersC, available, NUM_PER_DC - 1);
+        checkAvailable(globalQuorumConnectivityChecker, available, false, true);
+
+        // All three datacenters should be able to have a single node down
+        available.clear();
+        copyCount(peersAMinusLocal, available, NUM_PER_DC - 2);
+        copyCount(peersB, available, NUM_PER_DC - 1);
+        copyCount(peersC, available, NUM_PER_DC - 1);
+        checkAvailable(globalQuorumConnectivityChecker, available, true, true);
+
+        // Everything being up should work of course
+        available.clear();
+        copyCount(peersAMinusLocal, available, NUM_PER_DC - 1);
+        copyCount(peersB, available, NUM_PER_DC);
+        copyCount(peersC, available, NUM_PER_DC);
+        checkAvailable(globalQuorumConnectivityChecker, available, true, true);
+    }
+
+    @Test
+    public void execute_Noop()
+    {
+        checkAvailable(noopChecker, new HashSet<>(), true, false);
+    }
+
+    @Test
+    public void execute_ZeroWaitHasConnections() throws InterruptedException
+    {
+        Sink sink = new Sink(true, true, new HashSet<>());
+        MessagingService.instance().addMessageSink(sink);
+        Assert.assertFalse(zeroWaitChecker.execute(peers, this::getDatacenter));
+        boolean hasConnections = false;
+        for (int i = 0; i < TIMEOUT_NANOS; i+= 10)
+        {
+            hasConnections = checkAllConnectionTypesSeen(sink);
+            if (hasConnections)
+                break;
+            Thread.sleep(0, 10);
+        }
+        MessagingService.instance().clearMessageSinks();
+        Assert.assertTrue(hasConnections);
+    }
+
+    private void checkAvailable(StartupClusterConnectivityChecker checker, Set<InetAddressAndPort> available,
+                                boolean shouldPass, boolean checkConnections)
+    {
+        Sink sink = new Sink(true, true, available);
         MessagingService.instance().addMessageSink(sink);
-        Assert.assertFalse(connectivityChecker.execute(peers));
+        Assert.assertEquals(shouldPass, checker.execute(peers, this::getDatacenter));
+        if (checkConnections)
+            Assert.assertTrue(checkAllConnectionTypesSeen(sink));
+        MessagingService.instance().clearMessageSinks();
+    }
+
+    private void copyCount(Set<InetAddressAndPort> source, Set<InetAddressAndPort> dest, int count)
+    {
+        for (InetAddressAndPort peer : source)
+        {
+            if (count <= 0)
+                break;
+
+            dest.add(peer);
+            count -= 1;
+        }
     }
 
-    private void checkAllConnectionTypesSeen(Sink sink)
+    private boolean checkAllConnectionTypesSeen(Sink sink)
     {
+        boolean result = true;
         for (InetAddressAndPort peer : peers)
         {
+            if (peer.equals(FBUtilities.getBroadcastAddressAndPort()))
+                continue;
             ConnectionTypeRecorder recorder = sink.seenConnectionRequests.get(peer);
-            Assert.assertNotNull(recorder);
-            Assert.assertTrue(recorder.seenSmallMessageRequest);
-            Assert.assertTrue(recorder.seenLargeMessageRequest);
+            result = recorder != null;
+            if (!result)
+                break;
+
+            result = recorder.seenSmallMessageRequest;
+            result &= recorder.seenLargeMessageRequest;
         }
+        return result;
     }
 
     private static class Sink implements IMessageSink
     {
         private final boolean markAliveInGossip;
         private final boolean processConnectAck;
+        private final Set<InetAddressAndPort> aliveHosts;
         private final Map<InetAddressAndPort, ConnectionTypeRecorder> seenConnectionRequests;
 
-        Sink(boolean markAliveInGossip, boolean processConnectAck)
+        Sink(boolean markAliveInGossip, boolean processConnectAck, Set<InetAddressAndPort> aliveHosts)
         {
             this.markAliveInGossip = markAliveInGossip;
             this.processConnectAck = processConnectAck;
+            this.aliveHosts = aliveHosts;
             seenConnectionRequests = new HashMap<>();
         }
 
@@ -131,6 +271,9 @@ public class StartupClusterConnectivityCheckerTest
                 recorder.seenLargeMessageRequest = true;
             }
 
+            if (!aliveHosts.contains(to))
+                return false;
+
             if (processConnectAck)
             {
                 MessageIn msgIn = MessageIn.create(to, message.payload, Collections.emptyMap(), MessagingService.Verb.REQUEST_RESPONSE, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org