You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by jolynch <gi...@git.apache.org> on 2018/11/10 01:35:07 UTC
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/283#discussion_r232435653
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
@@ -23,128 +23,300 @@
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
/**
* A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this class.
*/
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
{
- private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
- private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values
- private static final int WINDOW_SIZE = 100;
-
- private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
- private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
- private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
+ private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+ // Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking
+ protected boolean registered = false;
+ protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+ protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
+ protected final Map<InetAddressAndPort, AnnotatedMeasurement> samples = new ConcurrentHashMap<>();
+
+ // Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking
+ public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+ public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+ // The probe rate is set later when configuration is read
+ protected static final RateLimiter probeRateLimiter = RateLimiter.create(1);
+ protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY);
+
+ // User configuration of the snitch tunables
+ protected volatile int dynamicUpdateInterval = -1;
+ protected volatile int dynamicLatencyProbeInterval = -1;
+ protected volatile double dynamicBadnessThreshold = 0;
// the score for a merged set of endpoints must be this much worse than the score for separate endpoints to
// warrant not merging two ranges into a single range
private static final double RANGE_MERGING_PREFERENCE = 1.5;
private String mbeanName;
- private boolean registered = false;
-
- private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
- private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>();
+ private boolean mbeanRegistered = false;
public final IEndpointSnitch subsnitch;
- private volatile ScheduledFuture<?> updateSchedular;
- private volatile ScheduledFuture<?> resetSchedular;
-
- private final Runnable update;
- private final Runnable reset;
+ private volatile ScheduledFuture<?> updateScoresScheduler;
+ private volatile ScheduledFuture<?> updateSamplesScheduler;
public DynamicEndpointSnitch(IEndpointSnitch snitch)
{
this(snitch, null);
}
- public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
+ protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
{
mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
if (instance != null)
mbeanName += ",instance=" + instance;
subsnitch = snitch;
- update = new Runnable()
+
+ if (DatabaseDescriptor.isDaemonInitialized())
{
- public void run()
- {
- updateScores();
- }
- };
- reset = new Runnable()
+ applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(),
+ DatabaseDescriptor.getDynamicSampleUpdateInterval(),
--- End diff --
It's set within `applyConfigChanges`, I'll reorganize that method to make it more readable.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org