You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/08/04 18:38:28 UTC
cassandra git commit: Allow updating DynamicEndpointSnitch properties
via JMX
Repository: cassandra
Updated Branches:
refs/heads/trunk 221f27a6d -> 3472cf06c
Allow updating DynamicEndpointSnitch properties via JMX
patch by sankalp kohli; reviewed by Robert Stupp for CASSANDRA-12179
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3472cf06
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3472cf06
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3472cf06
Branch: refs/heads/trunk
Commit: 3472cf06ce9a811381a1548934b7a0b343616197
Parents: 221f27a
Author: sankalp kohli <sa...@apple.com>
Authored: Thu Aug 4 20:37:36 2016 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Thu Aug 4 20:37:36 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/Config.java | 6 +-
.../cassandra/config/DatabaseDescriptor.java | 6 +-
.../locator/DynamicEndpointSnitch.java | 68 ++++++++++----
.../cassandra/service/StorageService.java | 93 +++++++++++++++-----
.../cassandra/service/StorageServiceMBean.java | 28 ++++--
6 files changed, 153 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98f9c24..c7063d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
* Collect metrics on queries by consistency level (CASSANDRA-7384)
* Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
* Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 64f06b9..5345a57 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -210,9 +210,9 @@ public class Config
public String endpoint_snitch;
public Boolean dynamic_snitch = true;
- public Integer dynamic_snitch_update_interval_in_ms = 100;
- public Integer dynamic_snitch_reset_interval_in_ms = 600000;
- public Double dynamic_snitch_badness_threshold = 0.1;
+ public volatile Integer dynamic_snitch_update_interval_in_ms = 100;
+ public volatile Integer dynamic_snitch_reset_interval_in_ms = 600000;
+ public volatile Double dynamic_snitch_badness_threshold = 0.1;
public String request_scheduler;
public RequestSchedulerId request_scheduler_id;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15aee59..3f367b4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -477,7 +477,7 @@ public class DatabaseDescriptor
{
throw new ConfigurationException("Missing endpoint_snitch directive", false);
}
- snitch = createEndpointSnitch(conf.endpoint_snitch);
+ snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch);
EndpointSnitchInfo.create();
localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
@@ -858,12 +858,12 @@ public class DatabaseDescriptor
}
}
- private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException
+ public static IEndpointSnitch createEndpointSnitch(boolean dynamic, String snitchClassName) throws ConfigurationException
{
if (!snitchClassName.contains("."))
snitchClassName = "org.apache.cassandra.locator." + snitchClassName;
IEndpointSnitch snitch = FBUtilities.construct(snitchClassName, "snitch");
- return conf.dynamic_snitch ? new DynamicEndpointSnitch(snitch) : snitch;
+ return dynamic ? new DynamicEndpointSnitch(snitch) : snitch;
}
public static IAuthenticator getAuthenticator()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 975b10e..de0cdde 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -23,6 +23,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
@@ -50,13 +51,13 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values
private static final int WINDOW_SIZE = 100;
- private final int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval();
- private final int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval();
- private final double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold();
+ private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
+ private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
+ private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
// the score for a merged set of endpoints must be this much worse than the score for separate endpoints to
// warrant not merging two ranges into a single range
- private double RANGE_MERGING_PREFERENCE = 1.5;
+ private static final double RANGE_MERGING_PREFERENCE = 1.5;
private String mbeanName;
private boolean registered = false;
@@ -66,24 +67,31 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public final IEndpointSnitch subsnitch;
+ private volatile ScheduledFuture<?> updateSchedular;
+ private volatile ScheduledFuture<?> resetSchedular;
+
+ private final Runnable update;
+ private final Runnable reset;
+
public DynamicEndpointSnitch(IEndpointSnitch snitch)
{
this(snitch, null);
}
+
public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
{
mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
if (instance != null)
mbeanName += ",instance=" + instance;
subsnitch = snitch;
- Runnable update = new Runnable()
+ update = new Runnable()
{
public void run()
{
updateScores();
}
};
- Runnable reset = new Runnable()
+ reset = new Runnable()
{
public void run()
{
@@ -92,10 +100,33 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
reset();
}
};
- ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS, UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
- ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS, RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+ updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
+ resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
registerMBean();
- }
+ }
+
+ /**
+ * Update configuration from {@link DatabaseDescriptor} and estart the update-scheduler and reset-scheduler tasks
+ * if the configured rates for these tasks have changed.
+ */
+ public void applyConfigChanges()
+ {
+ if (dynamicUpdateInterval != DatabaseDescriptor.getDynamicUpdateInterval())
+ {
+ dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
+ updateSchedular.cancel(false);
+ updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
+ }
+
+ if (dynamicResetInterval != DatabaseDescriptor.getDynamicResetInterval())
+ {
+ dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
+ resetSchedular.cancel(false);
+ resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
+ }
+
+ dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
+ }
private void registerMBean()
{
@@ -110,8 +141,11 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
}
- public void unregisterMBean()
+ public void close()
{
+ updateSchedular.cancel(false);
+ resetSchedular.cancel(false);
+
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
@@ -150,7 +184,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public void sortByProximity(final InetAddress address, List<InetAddress> addresses)
{
assert address.equals(FBUtilities.getBroadcastAddress()); // we only know about ourself
- if (BADNESS_THRESHOLD == 0)
+ if (dynamicBadnessThreshold == 0)
{
sortByProximityWithScore(address, addresses);
}
@@ -194,7 +228,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
}
// Sort the scores and then compare them (positionally) to the scores in the subsnitch order.
- // If any of the subsnitch-ordered scores exceed the optimal/sorted score by BADNESS_THRESHOLD, use
+ // If any of the subsnitch-ordered scores exceed the optimal/sorted score by dynamicBadnessThreshold, use
// the score-sorted ordering instead of the subsnitch ordering.
ArrayList<Double> sortedScores = new ArrayList<>(subsnitchOrderedScores);
Collections.sort(sortedScores);
@@ -202,7 +236,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
Iterator<Double> sortedScoreIterator = sortedScores.iterator();
for (Double subsnitchScore : subsnitchOrderedScores)
{
- if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + BADNESS_THRESHOLD)))
+ if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold)))
{
sortByProximityWithScore(address, addresses);
return;
@@ -306,15 +340,15 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa
public int getUpdateInterval()
{
- return UPDATE_INTERVAL_IN_MS;
+ return dynamicUpdateInterval;
}
public int getResetInterval()
{
- return RESET_INTERVAL_IN_MS;
+ return dynamicResetInterval;
}
public double getBadnessThreshold()
{
- return BADNESS_THRESHOLD;
+ return dynamicBadnessThreshold;
}
public String getSubsnitchClassName()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 6373df2..eade850 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4434,37 +4434,88 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Collections.unmodifiableMap(result);
}
- public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException
+ public void setDynamicUpdateInterval(int dynamicUpdateInterval)
{
- IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
-
- // new snitch registers mbean during construction
- IEndpointSnitch newSnitch;
- try
+ if (DatabaseDescriptor.getEndpointSnitch() instanceof DynamicEndpointSnitch)
{
- newSnitch = FBUtilities.construct(epSnitchClassName, "snitch");
- }
- catch (ConfigurationException e)
- {
- throw new ClassNotFoundException(e.getMessage());
+
+ try
+ {
+ updateSnitch(null, true, dynamicUpdateInterval, null, null);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- if (dynamic)
- {
+ }
+
+ public int getDynamicUpdateInterval()
+ {
+ return DatabaseDescriptor.getDynamicUpdateInterval();
+ }
+
+ public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException
+ {
+ // apply dynamic snitch configuration
+ if (dynamicUpdateInterval != null)
DatabaseDescriptor.setDynamicUpdateInterval(dynamicUpdateInterval);
+ if (dynamicResetInterval != null)
DatabaseDescriptor.setDynamicResetInterval(dynamicResetInterval);
+ if (dynamicBadnessThreshold != null)
DatabaseDescriptor.setDynamicBadnessThreshold(dynamicBadnessThreshold);
- newSnitch = new DynamicEndpointSnitch(newSnitch);
- }
- // point snitch references to the new instance
- DatabaseDescriptor.setEndpointSnitch(newSnitch);
- for (String ks : Schema.instance.getKeyspaces())
+ IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
+
+ // new snitch registers mbean during construction
+ if(epSnitchClassName != null)
{
- Keyspace.open(ks).getReplicationStrategy().snitch = newSnitch;
+
+ // need to unregister the mbean _before_ the new dynamic snitch is instantiated (and implicitly initialized
+ // and its mbean registered)
+ if (oldSnitch instanceof DynamicEndpointSnitch)
+ ((DynamicEndpointSnitch)oldSnitch).close();
+
+ IEndpointSnitch newSnitch;
+ try
+ {
+ newSnitch = DatabaseDescriptor.createEndpointSnitch(dynamic != null && dynamic, epSnitchClassName);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new ClassNotFoundException(e.getMessage());
+ }
+
+ if (newSnitch instanceof DynamicEndpointSnitch)
+ {
+ logger.info("Created new dynamic snitch {} with update-interval={}, reset-interval={}, badness-threshold={}",
+ ((DynamicEndpointSnitch)newSnitch).subsnitch.getClass().getName(), DatabaseDescriptor.getDynamicUpdateInterval(),
+ DatabaseDescriptor.getDynamicResetInterval(), DatabaseDescriptor.getDynamicBadnessThreshold());
+ }
+ else
+ {
+ logger.info("Created new non-dynamic snitch {}", newSnitch.getClass().getName());
+ }
+
+ // point snitch references to the new instance
+ DatabaseDescriptor.setEndpointSnitch(newSnitch);
+ for (String ks : Schema.instance.getKeyspaces())
+ {
+ Keyspace.open(ks).getReplicationStrategy().snitch = newSnitch;
+ }
}
+ else
+ {
+ if (oldSnitch instanceof DynamicEndpointSnitch)
+ {
+ logger.info("Applying config change to dynamic snitch {} with update-interval={}, reset-interval={}, badness-threshold={}",
+ ((DynamicEndpointSnitch)oldSnitch).subsnitch.getClass().getName(), DatabaseDescriptor.getDynamicUpdateInterval(),
+ DatabaseDescriptor.getDynamicResetInterval(), DatabaseDescriptor.getDynamicBadnessThreshold());
- if (oldSnitch instanceof DynamicEndpointSnitch)
- ((DynamicEndpointSnitch)oldSnitch).unregisterMBean();
+ DynamicEndpointSnitch snitch = (DynamicEndpointSnitch)oldSnitch;
+ snitch.applyConfigChanges();
+ }
+ }
updateTopology();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3472cf06/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index abb10c1..2e5651a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -463,15 +463,33 @@ public interface StorageServiceMBean extends NotificationEmitter
public Map<String, String> getViewBuildStatuses(String keyspace, String view);
/**
- * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime
+ * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime.
+ *
+ * This method is used to change the snitch implementation and/or dynamic snitch parameters.
+ * If {@code epSnitchClassName} is specified, it will configure a new snitch instance and make it a
+ * 'dynamic snitch' if {@code dynamic} is specified and {@code true}.
+ *
+ * The parameters {@code dynamicUpdateInterval}, {@code dynamicResetInterval} and {@code dynamicBadnessThreshold}
+ * can be specified individually to update the parameters of the dynamic snitch during runtime.
+ *
* @param epSnitchClassName the canonical path name for a class implementing IEndpointSnitch
- * @param dynamic boolean that decides whether dynamicsnitch is used or not
- * @param dynamicUpdateInterval integer, in ms (default 100)
- * @param dynamicResetInterval integer, in ms (default 600,000)
- * @param dynamicBadnessThreshold double, (default 0.0)
+ * @param dynamic boolean that decides whether dynamicsnitch is used or not - only valid, if {@code epSnitchClassName} is specified
+ * @param dynamicUpdateInterval integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 100)
+ * @param dynamicResetInterval integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 600,000)
+ * @param dynamicBadnessThreshold double, (defaults to the value configured in cassandra.yaml, which defaults to 0.0)
*/
public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException;
+ /*
+ Update dynamic_snitch_update_interval_in_ms
+ */
+ public void setDynamicUpdateInterval(int dynamicUpdateInterval);
+
+ /*
+ Get dynamic_snitch_update_interval_in_ms
+ */
+ public int getDynamicUpdateInterval();
+
// allows a user to forcibly 'kill' a sick node
public void stopGossiping();