You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2019/02/08 20:29:32 UTC

[GitHub] aweisberg commented on a change in pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should never prefer latent replicas

aweisberg commented on a change in pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should never prefer latent replicas
URL: https://github.com/apache/cassandra/pull/283#discussion_r255225695
 
 

 ##########
 File path: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
 ##########
 @@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-    private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+    private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+    // Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking
+    protected boolean registered = false;
+    protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+    protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
 
-    private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values
-    private static final int WINDOW_SIZE = 100;
+    protected final Map<InetAddressAndPort, AnnotatedMeasurement> samples = new ConcurrentHashMap<>();
 
-    private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
-    private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
-    private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
+    // Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking
+    public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+    public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+    // The probe rate is set later when configuration is read in applyConfigChanges
+    protected static final RateLimiter probeRateLimiter = RateLimiter.create(1);
+    protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor("LatencyProbes");
+    private long lastUpdateSamplesNanos = System.nanoTime();
+
+    // User configuration of the snitch tunables
+    protected volatile int dynamicUpdateInterval = -1;
+    protected volatile int dynamicSampleUpdateInterval = -1;
+    protected volatile double dynamicBadnessThreshold = 0;
 
     // the score for a merged set of endpoints must be this much worse than the score for separate endpoints to
     // warrant not merging two ranges into a single range
     private static final double RANGE_MERGING_PREFERENCE = 1.5;
 
     private String mbeanName;
-    private boolean registered = false;
-
-    private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
-    private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir> samples = new ConcurrentHashMap<>();
+    private boolean mbeanRegistered = false;
 
     public final IEndpointSnitch subsnitch;
 
-    private volatile ScheduledFuture<?> updateSchedular;
-    private volatile ScheduledFuture<?> resetSchedular;
-
-    private final Runnable update;
-    private final Runnable reset;
+    private volatile ScheduledFuture<?> updateScoresScheduler = null;
+    private volatile ScheduledFuture<?> updateSamplesScheduler = null;
 
     public DynamicEndpointSnitch(IEndpointSnitch snitch)
     {
         this(snitch, null);
     }
 
-    public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
+    protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
     {
         mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
         if (instance != null)
             mbeanName += ",instance=" + instance;
         subsnitch = snitch;
-        update = new Runnable()
+
+        if (DatabaseDescriptor.isDaemonInitialized())
         {
-            public void run()
+            open();
+        }
+    }
+
+    /**
+     * Update configuration of the background tasks and restart the various scheduler tasks
+     * if the configured rates for these tasks have changed.
+     */
+    public void applyConfigChanges(int newDynamicUpdateInternal, int newDynamicSampleUpdateInterval, double newDynamicBadnessThreshold)
+    {
+        if (DatabaseDescriptor.isDaemonInitialized())
+        {
+            if (dynamicUpdateInterval != newDynamicUpdateInternal || updateScoresScheduler == null)
             {
-                updateScores();
+                cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval), TimeUnit.MILLISECONDS);
+                updateScoresScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateScores, newDynamicUpdateInternal, newDynamicUpdateInternal, TimeUnit.MILLISECONDS);
             }
-        };
-        reset = new Runnable()
+
+            if (dynamicSampleUpdateInterval != newDynamicSampleUpdateInterval || updateSamplesScheduler == null)
+            {
+                cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval), TimeUnit.MILLISECONDS);
+                if (newDynamicSampleUpdateInterval > 0)
+                    updateSamplesScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateSamples, newDynamicSampleUpdateInterval, newDynamicSampleUpdateInterval, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        dynamicUpdateInterval = newDynamicUpdateInternal;
+        dynamicSampleUpdateInterval = newDynamicSampleUpdateInterval;
+        dynamicBadnessThreshold = newDynamicBadnessThreshold;
+
+        if (dynamicSampleUpdateInterval > 0)
+            probeRateLimiter.setRate(dynamicSampleUpdateInterval);
+    }
+
+    public synchronized void open()
+    {
+        applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(),
+                           DatabaseDescriptor.getDynamicSampleUpdateInterval(),
+                           DatabaseDescriptor.getDynamicBadnessThreshold());
+
+        MBeanWrapper.instance.registerMBean(this, mbeanName);
+        mbeanRegistered = true;
+    }
+
+    public synchronized void close()
+    {
+        cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval), TimeUnit.MILLISECONDS);
+        cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval), TimeUnit.MILLISECONDS);
+        updateScoresScheduler = null;
+        updateSamplesScheduler = null;
+
+        for (AnnotatedMeasurement measurement : samples.values())
         {
-            public void run()
+            cancelAndWait(measurement.probeFuture, PING.getTimeout(), TimeUnit.MILLISECONDS);
+
+            measurement.millisSinceLastMeasure.set(0);
+            measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+            measurement.nextProbeDelayMillis = 0;
+        }
+
+        if (mbeanRegistered)
+            MBeanWrapper.instance.unregisterMBean(mbeanName);
+
+        mbeanRegistered = false;
+    }
+
+    private static void cancelAndWait(ScheduledFuture future, long timeout, TimeUnit unit)
+    {
+        if (future != null)
+        {
+            future.cancel(false);
+            try
             {
-                // we do this so that a host considered bad has a chance to recover, otherwise would we never try
-                // to read from it, which would cause its score to never change
-                reset();
+                future.get(timeout, unit);
             }
-        };
+            catch (CancellationException | InterruptedException | ExecutionException | TimeoutException ignored)
+            {
+                // Exception is expected to happen eventually due to the cancel -> get
 
 Review comment:
   So I just realized this exposes the lack of exception handling for these scheduled tasks. They each need top level exception handling. If they don't have it they will stop without warning and the error will be swallowed up by the scheduled future until something calls get(). They should probably have a top level try catch that logs the error and doesn't let it escape so it will continue to retry on schedule.
   
   That also means that all these other exception types again become an unexpected error except for CancellationException. I think it's fine to continue if you get an error, but we should log it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org