You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2021/11/12 17:59:41 UTC
[cassandra] branch trunk updated: Log missing peers in
StartupClusterConnectivityChecker
This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 97bfde0 Log missing peers in StartupClusterConnectivityChecker
97bfde0 is described below
commit 97bfde01f805b4c3abd52870e0338440093f5893
Author: Yifan Cai <yc...@apache.org>
AuthorDate: Thu Nov 11 16:40:19 2021 -0800
Log missing peers in StartupClusterConnectivityChecker
patch by Yifan Cai; reviewed by Berenguer Blasi, Stefan Podkowinski for CASSANDRA-17130
---
CHANGES.txt | 1 +
.../net/StartupClusterConnectivityChecker.java | 52 +++++++++++++++++-----
2 files changed, 41 insertions(+), 12 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index f7855f8..fb97fec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Log missing peers in StartupClusterConnectivityChecker (CASSANDRA-17130)
* Introduce separate rate limiting settings for entire SSTable streaming (CASSANDRA-17065)
* Implement Virtual Tables for Auth Caches (CASSANDRA-16914)
* Actively update auth cache in the background (CASSANDRA-16957)
diff --git a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
index 93305db..a226503 100644
--- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
+++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java
@@ -17,9 +17,11 @@
*/
package org.apache.cassandra.net;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +29,6 @@ import org.apache.cassandra.utils.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
-import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
@@ -35,14 +36,15 @@ import com.google.common.collect.SetMultimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
import static org.apache.cassandra.net.Verb.PING_REQ;
import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
import static org.apache.cassandra.net.ConnectionType.SMALL_MESSAGES;
@@ -118,7 +120,9 @@ public class StartupClusterConnectivityChecker
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
}
- AckMap acks = new AckMap(3);
+ // The threshold is 3 because for each peer we want to have 3 acks,
+ // one for small message connection, one for large message connnection and one for alive event from gossip.
+ AckMap acks = new AckMap(3, peers);
Map<String, CountDownLatch> dcToRemainingPeers = new HashMap<>(datacenterToPeers.size());
for (String datacenter: datacenterToPeers.keys())
{
@@ -158,19 +162,21 @@ public class StartupClusterConnectivityChecker
Gossiper.instance.unregister(listener);
- Map<String, Integer> numDown = dcToRemainingPeers.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> e.getValue().count()));
-
if (succeeded)
{
logger.info("Ensured sufficient healthy connections with {} after {} milliseconds",
- numDown.keySet(), TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos));
+ dcToRemainingPeers.keySet(), TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos));
}
else
{
+ // dc -> missing peer host addresses
+ Map<String, List<String>> peersDown = acks.getMissingPeers().stream()
+ .collect(groupingBy(peerToDatacenter::get,
+ mapping(InetAddressAndPort::getHostAddressAndPort,
+ toList())));
logger.warn("Timed out after {} milliseconds, was waiting for remaining peers to connect: {}",
- TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos), numDown);
+ TimeUnit.NANOSECONDS.toMillis(nanoTime() - startNanos),
+ peersDown);
}
return succeeded;
@@ -238,15 +244,37 @@ public class StartupClusterConnectivityChecker
private final int threshold;
private final Map<InetAddressAndPort, AtomicInteger> acks;
- AckMap(int threshold)
+ AckMap(int threshold, Iterable<InetAddressAndPort> initialPeers)
{
this.threshold = threshold;
acks = new ConcurrentHashMap<>();
+ for (InetAddressAndPort peer : initialPeers)
+ initOrGetCounter(peer);
}
boolean incrementAndCheck(InetAddressAndPort address)
{
- return acks.computeIfAbsent(address, addr -> new AtomicInteger(0)).incrementAndGet() == threshold;
+ return initOrGetCounter(address).incrementAndGet() == threshold;
+ }
+
+ /**
+ * Get a list of peers that has not fully ack'd, i.e. not reaching threshold acks
+ */
+ List<InetAddressAndPort> getMissingPeers()
+ {
+ List<InetAddressAndPort> missingPeers = new ArrayList<>();
+ for (Map.Entry<InetAddressAndPort, AtomicInteger> entry : acks.entrySet())
+ {
+ if (entry.getValue().get() < threshold)
+ missingPeers.add(entry.getKey());
+ }
+ return missingPeers;
+ }
+
+ // init the counter for the peer just in case
+ private AtomicInteger initOrGetCounter(InetAddressAndPort address)
+ {
+ return acks.computeIfAbsent(address, addr -> new AtomicInteger(0));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org