You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2021/11/12 17:59:41 UTC

[cassandra] branch trunk updated: Log missing peers in StartupClusterConnectivityChecker

This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 97bfde0  Log missing peers in StartupClusterConnectivityChecker
97bfde0 is described below

commit 97bfde01f805b4c3abd52870e0338440093f5893
Author: Yifan Cai <yc...@apache.org>
AuthorDate: Thu Nov 11 16:40:19 2021 -0800

    Log missing peers in StartupClusterConnectivityChecker
    
    patch by Yifan Cai; reviewed by Berenguer Blasi, Stefan Podkowinski for CASSANDRA-17130
---
 CHANGES.txt                                        |  1 +
 .../net/StartupClusterConnectivityChecker.java     | 52 +++++++++++++++++-----
 2 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index f7855f8..fb97fec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
  * Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
  * Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
  * Actively update auth cache in the background (CASSANDRA-16957)
diff --git a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index 93305db..a226503 100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@ -17,9 +17,11 @@
  */
 package org.apache.cassandra.net;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +29,6 @@ import org.apache.cassandra.utils.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
@@ -35,14 +36,15 @@ import com.google.common.collect.SetMultimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
 import static org.apache.cassandra.net.Verb.PING_REQ;
 import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
 import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES;
@@ -118,7 +120,9 @@ public class StartupClusterConnectivityChecker
                         TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
         }
 
-        AckMap acks = new AckMap(3);
+        // The threshold is 3 because for each peer we want to have 3 acks,
+        // one for small message connection, one for large message connnection and one for alive event from gossip.
+        AckMap acks = new AckMap(3, peers);
         Map<String, CountDownLatch> dcToRemainingPeers = new HashMap<>(datacenterToPeers.size());
         for (String datacenter: datacenterToPeers.keys())
         {
@@ -158,19 +162,21 @@ public class StartupClusterConnectivityChecker
 
         Gossiper.instance.unregister(listener);
 
-        Map<String, Integer> numDown = dcToRemainingPeers.entrySet().stream()
-                                                      .collect(Collectors.toMap(Map.Entry::getKey,
-                                                                                e -> e.getValue().count()));
-
         if (succeeded)
         {
             logger.info("Ensured sufficient healthy connections with {} after {} milliseconds",
-                        numDown.keySet(), TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos));
+                        dcToRemainingPeers.keySet(), TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos));
         }
         else
         {
+            // dc -> missing peer host addresses
+            Map<String, List<String>> peersDown = acks.getMissingPeers().stream()
+                                                      .collect(groupingBy(peerToDatacenter::get,
+                                                                          mapping(InetAddressAndPort::getHostAddressAndPort,
+                                                                                  toList())));
             logger.warn("Timed out after {} milliseconds, was waiting for remaining peers to connect: {}",
-                        TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos), numDown);
+                        TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos),
+                        peersDown);
         }
 
         return succeeded;
@@ -238,15 +244,37 @@ public class StartupClusterConnectivityChecker
         private final int threshold;
         private final Map<InetAddressAndPort, AtomicInteger> acks;
 
-        AckMap(int threshold)
+        AckMap(int threshold, Iterable<InetAddressAndPort> initialPeers)
         {
             this.threshold = threshold;
             acks = new ConcurrentHashMap<>();
+            for (InetAddressAndPort peer : initialPeers)
+                initOrGetCounter(peer);
         }
 
         boolean incrementAndCheck(InetAddressAndPort address)
         {
-            return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold;
+            return initOrGetCounter(address).incrementAndGet() == threshold;
+        }
+
+        /**
+         * Get a list of peers that has not fully ack'd, i.e. not reaching threshold acks
+         */
+        List<InetAddressAndPort> getMissingPeers()
+        {
+            List<InetAddressAndPort> missingPeers = new ArrayList<>();
+            for (Map.Entry<InetAddressAndPort, AtomicInteger> entry : acks.entrySet())
+            {
+                if (entry.getValue().get() < threshold)
+                    missingPeers.add(entry.getKey());
+            }
+            return missingPeers;
+        }
+
+        // init the counter for the peer just in case
+        private AtomicInteger initOrGetCounter(InetAddressAndPort address)
+        {
+            return acks.computeIfAbsent(address, addr -> new AtomicInteger(0));
         }
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org