You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/08/04 18:38:28 UTC

cassandra git commit: Allow updating DynamicEndpointSnitch properties via JMX

Repository: cassandra
Updated Branches:
  refs/heads/trunk 221f27a6d -> 3472cf06c


Allow updating DynamicEndpointSnitch properties via JMX

patch by sankalp kohli; reviewed by Robert Stupp for CASSANDRA-12179


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3472cf06
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3472cf06
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3472cf06

Branch: refs/heads/trunk
Commit: 3472cf06ce9a811381a1548934b7a0b343616197
Parents: 221f27a
Author: sankalp kohli <sa...@apple.com>
Authored: Thu Aug 4 20:37:36 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Thu Aug 4 20:37:36 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/config/Config.java     |  6 +-
 .../cassandra/config/DatabaseDescriptor.java    |  6 +-
 .../locator/DynamicEndpointSnitch.java          | 68 ++++++++++----
 .../cassandra/service/StorageService.java       | 93 +++++++++++++++-----
 .../cassandra/service/StorageServiceMBean.java  | 28 ++++--
 6 files changed, 153 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98f9c24..c7063d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
  * Collect metrics on queries by consistency level (CASSANDRA-7384)
  * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
  * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64f06b9..5345a57 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -210,9 +210,9 @@ public class Config
 
     public String endpoint_snitch;
     public Boolean dynamic_snitch = true;
-    public Integer dynamic_snitch_update_interval_in_ms = 100;
-    public Integer dynamic_snitch_reset_interval_in_ms = 600000;
-    public Double dynamic_snitch_badness_threshold = 0.1;
+    public volatile Integer dynamic_snitch_update_interval_in_ms = 100;
+    public volatile Integer dynamic_snitch_reset_interval_in_ms = 600000;
+    public volatile Double dynamic_snitch_badness_threshold = 0.1;
 
     public String request_scheduler;
     public RequestSchedulerId request_scheduler_id;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15aee59..3f367b4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -477,7 +477,7 @@ public class DatabaseDescriptor
         {
             throw new ConfigurationException("Missing endpoint_snitch directive", false);
         }
-        snitch = createEndpointSnitch(conf.endpoint_snitch);
+        snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
         EndpointSnitchInfo.create();
 
         localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
@@ -858,12 +858,12 @@ public class DatabaseDescriptor
         }
     }
 
-    private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException
+    public static IEndpointSnitch createEndpointSnitch(boolean dynamic, String snitchClassName) throws ConfigurationException
     {
         if (!snitchClassName.contains("."))
             snitchClassName = "org.apache.cassandra.locator." + snitchClassName;
         IEndpointSnitch snitch = FBUtilities.construct(snitchClassName, "snitch");
-        return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch;
+        return dynamic ? new DynamicEndpointSnitch(snitch) : snitch;
     }
 
     public static IAuthenticator getAuthenticator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 975b10e..de0cdde 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,6 +23,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.ExponentiallyDecayingReservoir;
@@ -50,13 +51,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
     private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values
     private static final int WINDOW_SIZE = 100;
 
-    private final int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval();
-    private final int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval();
-    private final double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold();
+    private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
+    private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
+    private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
 
     // the score for a merged set of endpoints must be this much worse than the score for separate endpoints to
     // warrant not merging two ranges into a single range
-    private double RANGE_MERGING_PREFERENCE = 1.5;
+    private static final double RANGE_MERGING_PREFERENCE = 1.5;
 
     private String mbeanName;
     private boolean registered = false;
@@ -66,24 +67,31 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
 
     public final IEndpointSnitch subsnitch;
 
+    private volatile ScheduledFuture<?> updateSchedular;
+    private volatile ScheduledFuture<?> resetSchedular;
+
+    private final Runnable update;
+    private final Runnable reset;
+
     public DynamicEndpointSnitch(IEndpointSnitch snitch)
     {
         this(snitch, null);
     }
+
     public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
     {
         mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
         if (instance != null)
             mbeanName += ",instance=" + instance;
         subsnitch = snitch;
-        Runnable update = new Runnable()
+        update = new Runnable()
         {
             public void run()
             {
                 updateScores();
             }
         };
-        Runnable reset = new Runnable()
+        reset = new Runnable()
         {
             public void run()
             {
@@ -92,10 +100,33 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
                 reset();
             }
         };
-        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
-        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
+        resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
         registerMBean();
-   }
+    }
+
+    /**
+     * Update configuration from {@link DatabaseDescriptor} and estart the update-scheduler and reset-scheduler tasks
+     * if the configured rates for these tasks have changed.
+     */
+    public void applyConfigChanges()
+    {
+        if (dynamicUpdateInterval != DatabaseDescriptor.getDynamicUpdateInterval())
+        {
+            dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
+            updateSchedular.cancel(false);
+            updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
+        }
+
+        if (dynamicResetInterval != DatabaseDescriptor.getDynamicResetInterval())
+        {
+            dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
+            resetSchedular.cancel(false);
+            resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
+        }
+
+        dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
+    }
 
     private void registerMBean()
     {
@@ -110,8 +141,11 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         }
     }
 
-    public void unregisterMBean()
+    public void close()
     {
+        updateSchedular.cancel(false);
+        resetSchedular.cancel(false);
+
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
         {
@@ -150,7 +184,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
     public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
     {
         assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself
-        if (BADNESS_THRESHOLD == 0)
+        if (dynamicBadnessThreshold == 0)
         {
             sortByProximityWithScore(address, addresses);
         }
@@ -194,7 +228,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         }
 
         // Sort the scores and then compare them (positionally) to the scores in the subsnitch order.
-        // If any of the subsnitch-ordered scores exceed the optimal/sorted score by BADNESS_THRESHOLD, use
+        // If any of the subsnitch-ordered scores exceed the optimal/sorted score by dynamicBadnessThreshold, use
         // the score-sorted ordering instead of the subsnitch ordering.
         ArrayList<Double> sortedScores = new ArrayList<>(subsnitchOrderedScores);
         Collections.sort(sortedScores);
@@ -202,7 +236,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
         Iterator<Double> sortedScoreIterator = sortedScores.iterator();
         for (Double subsnitchScore : subsnitchOrderedScores)
         {
-            if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + BADNESS_THRESHOLD)))
+            if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold)))
             {
                 sortByProximityWithScore(address, addresses);
                 return;
@@ -306,15 +340,15 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
 
     public int getUpdateInterval()
     {
-        return UPDATE_INTERVAL_IN_MS;
+        return dynamicUpdateInterval;
     }
     public int getResetInterval()
     {
-        return RESET_INTERVAL_IN_MS;
+        return dynamicResetInterval;
     }
     public double getBadnessThreshold()
     {
-        return BADNESS_THRESHOLD;
+        return dynamicBadnessThreshold;
     }
 
     public String getSubsnitchClassName()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 6373df2..eade850 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4434,37 +4434,88 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return Collections.unmodifiableMap(result);
     }
 
-    public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException
+    public void setDynamicUpdateInterval(int dynamicUpdateInterval)
     {
-        IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
-
-        // new snitch registers mbean during construction
-        IEndpointSnitch newSnitch;
-        try
+        if (DatabaseDescriptor.getEndpointSnitch() instanceof DynamicEndpointSnitch)
         {
-            newSnitch = FBUtilities.construct(epSnitchClassName, "snitch");
-        }
-        catch (ConfigurationException e)
-        {
-            throw new ClassNotFoundException(e.getMessage());
+
+            try
+            {
+                updateSnitch(null, true, dynamicUpdateInterval, null, null);
+            }
+            catch (ClassNotFoundException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-        if (dynamic)
-        {
+    }
+
+    public int getDynamicUpdateInterval()
+    {
+        return DatabaseDescriptor.getDynamicUpdateInterval();
+    }
+
+    public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException
+    {
+        // apply dynamic snitch configuration
+        if (dynamicUpdateInterval != null)
             DatabaseDescriptor.setDynamicUpdateInterval(dynamicUpdateInterval);
+        if (dynamicResetInterval != null)
             DatabaseDescriptor.setDynamicResetInterval(dynamicResetInterval);
+        if (dynamicBadnessThreshold != null)
             DatabaseDescriptor.setDynamicBadnessThreshold(dynamicBadnessThreshold);
-            newSnitch = new DynamicEndpointSnitch(newSnitch);
-        }
 
-        // point snitch references to the new instance
-        DatabaseDescriptor.setEndpointSnitch(newSnitch);
-        for (String ks : Schema.instance.getKeyspaces())
+        IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
+
+        // new snitch registers mbean during construction
+        if(epSnitchClassName != null)
         {
-            Keyspace.open(ks).getReplicationStrategy().snitch = newSnitch;
+
+            // need to unregister the mbean _before_ the new dynamic snitch is instantiated (and implicitly initialized
+            // and its mbean registered)
+            if (oldSnitch instanceof DynamicEndpointSnitch)
+                ((DynamicEndpointSnitch)oldSnitch).close();
+
+            IEndpointSnitch newSnitch;
+            try
+            {
+                newSnitch = DatabaseDescriptor.createEndpointSnitch(dynamic != null && dynamic, epSnitchClassName);
+            }
+            catch (ConfigurationException e)
+            {
+                throw new ClassNotFoundException(e.getMessage());
+            }
+
+            if (newSnitch instanceof DynamicEndpointSnitch)
+            {
+                logger.info("Created new dynamic snitch {} with update-interval={}, reset-interval={}, badness-threshold={}",
+                            ((DynamicEndpointSnitch)newSnitch).subsnitch.getClass().getName(), DatabaseDescriptor.getDynamicUpdateInterval(),
+                            DatabaseDescriptor.getDynamicResetInterval(), DatabaseDescriptor.getDynamicBadnessThreshold());
+            }
+            else
+            {
+                logger.info("Created new non-dynamic snitch {}", newSnitch.getClass().getName());
+            }
+
+            // point snitch references to the new instance
+            DatabaseDescriptor.setEndpointSnitch(newSnitch);
+            for (String ks : Schema.instance.getKeyspaces())
+            {
+                Keyspace.open(ks).getReplicationStrategy().snitch = newSnitch;
+            }
         }
+        else
+        {
+            if (oldSnitch instanceof DynamicEndpointSnitch)
+            {
+                logger.info("Applying config change to dynamic snitch {} with update-interval={}, reset-interval={}, badness-threshold={}",
+                            ((DynamicEndpointSnitch)oldSnitch).subsnitch.getClass().getName(), DatabaseDescriptor.getDynamicUpdateInterval(),
+                            DatabaseDescriptor.getDynamicResetInterval(), DatabaseDescriptor.getDynamicBadnessThreshold());
 
-        if (oldSnitch instanceof DynamicEndpointSnitch)
-            ((DynamicEndpointSnitch)oldSnitch).unregisterMBean();
+                DynamicEndpointSnitch snitch = (DynamicEndpointSnitch)oldSnitch;
+                snitch.applyConfigChanges();
+            }
+        }
 
         updateTopology();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index abb10c1..2e5651a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -463,15 +463,33 @@ public interface StorageServiceMBean extends NotificationEmitter
     public Map<String, String> getViewBuildStatuses(String keyspace, String view);
 
     /**
-     * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime
+     * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime.
+     *
+     * This method is used to change the snitch implementation and/or dynamic snitch parameters.
+     * If {@code epSnitchClassName} is specified, it will configure a new snitch instance and make it a
+     * 'dynamic snitch' if {@code dynamic} is specified and {@code true}.
+     *
+     * The parameters {@code dynamicUpdateInterval}, {@code dynamicResetInterval} and {@code dynamicBadnessThreshold}
+     * can be specified individually to update the parameters of the dynamic snitch during runtime.
+     *
      * @param epSnitchClassName        the canonical path name for a class implementing IEndpointSnitch
-     * @param dynamic                  boolean that decides whether dynamicsnitch is used or not
-     * @param dynamicUpdateInterval    integer, in ms (default 100)
-     * @param dynamicResetInterval     integer, in ms (default 600,000)
-     * @param dynamicBadnessThreshold  double, (default 0.0)
+     * @param dynamic                  boolean that decides whether dynamicsnitch is used or not - only valid, if {@code epSnitchClassName} is specified
+     * @param dynamicUpdateInterval    integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 100)
+     * @param dynamicResetInterval     integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 600,000)
+     * @param dynamicBadnessThreshold  double, (defaults to the value configured in cassandra.yaml, which defaults to 0.0)
      */
     public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException;
 
+    /*
+      Update dynamic_snitch_update_interval_in_ms
+     */
+    public void setDynamicUpdateInterval(int dynamicUpdateInterval);
+
+    /*
+      Get dynamic_snitch_update_interval_in_ms
+     */
+    public int getDynamicUpdateInterval();
+
     // allows a user to forcibly 'kill' a sick node
     public void stopGossiping();