You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/15 15:08:08 UTC
[storm] branch master updated: [STORM-3624] Fix race condition when
loading scheduler configs (#3249)
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new af5021a [STORM-3624] Fix race condition when loading scheduler configs (#3249)
af5021a is described below
commit af5021aa1911a89c8cee65b91f50fa8d00263d08
Author: Meng Li (Ethan) <et...@gmail.com>
AuthorDate: Mon Jun 15 10:07:51 2020 -0500
[STORM-3624] Fix race condition when loading scheduler configs (#3249)
---
.../main/java/org/apache/storm/DaemonConfig.java | 7 ++
.../apache/storm/scheduler/DefaultScheduler.java | 3 +-
.../org/apache/storm/scheduler/EvenScheduler.java | 2 +-
.../apache/storm/scheduler/IsolationScheduler.java | 2 +-
.../multitenant/MultitenantScheduler.java | 19 +++--
.../scheduler/resource/ResourceAwareScheduler.java | 14 +++-
.../scheduler/utils/ArtifactoryConfigLoader.java | 1 +
.../scheduler/utils/SchedulerConfigCache.java | 83 ++++++++++++++++++++++
8 files changed, 119 insertions(+), 12 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 96b6e21..2444351 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -938,6 +938,13 @@ public class DaemonConfig implements Validated {
public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
/**
+ * How long before a scheduler considers its config cache expired.
+ */
+ @IsInteger
+ @IsPositiveNumber(includeZero = true)
+ public static final String SCHEDULER_CONFIG_CACHE_EXPIRATION_SECS = "scheduler.config.cache.expiration.secs";
+
+ /**
* For ArtifactoryConfigLoader, this can either be a reference to an individual file in Artifactory or to a directory. If it is a
* directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" to the beginning of the real
* URI to use ArtifactoryConfigLoader. For FileConfigLoader, this is the URI pointing to a file.
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index c137521..81a0ad8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -18,6 +18,7 @@
package org.apache.storm.scheduler;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -109,6 +110,6 @@ public class DefaultScheduler implements IScheduler {
@Override
public Map<String, Map<String, Double>> config() {
- return new HashMap<>();
+ return Collections.emptyMap();
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index e4431a9..ccc2e34 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -173,7 +173,7 @@ public class EvenScheduler implements IScheduler {
@Override
public Map<String, Map<String, Double>> config() {
- return new HashMap<>();
+ return Collections.emptyMap();
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
index 75d8d9b..87d2ae7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
@@ -53,7 +53,7 @@ public class IsolationScheduler implements IScheduler {
@Override
public Map<String, Map<String, Double>> config() {
- return new HashMap<>();
+ return Collections.emptyMap();
}
// get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index aa75ff8..2121d0c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -12,6 +12,7 @@
package org.apache.storm.scheduler.multitenant;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.DaemonConfig;
@@ -22,6 +23,7 @@ import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
import org.apache.storm.scheduler.utils.IConfigLoader;
+import org.apache.storm.scheduler.utils.SchedulerConfigCache;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,12 +32,14 @@ public class MultitenantScheduler implements IScheduler {
private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class);
protected IConfigLoader configLoader;
private Map<String, Object> conf;
+ private SchedulerConfigCache<Map<String, Number>> schedulerConfigCache;
@Override
public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
this.conf = conf;
configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
-
+ schedulerConfigCache = new SchedulerConfigCache<>(conf, this::loadConfig);
+ schedulerConfigCache.prepare();
}
/**
@@ -43,7 +47,7 @@ public class MultitenantScheduler implements IScheduler {
* if no config available from multitenant-scheduler.yaml, get configs from conf. Only one will be used.
* @return User pool configs.
*/
- private Map<String, Number> getUserConf() {
+ private Map<String, Number> loadConfig() {
Map<String, Number> ret;
// Try the loader plugin, if configured
@@ -69,23 +73,26 @@ public class MultitenantScheduler implements IScheduler {
// If that fails, use config
ret = (Map<String, Number>) conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
if (ret == null) {
- return new HashMap<>();
+ return Collections.emptyMap();
} else {
return ret;
}
}
@Override
- public Map config() {
- return getUserConf();
+ public Map<String, Number> config() {
+ return Collections.unmodifiableMap(schedulerConfigCache.get());
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
LOG.debug("Rerunning scheduling...");
+ //refresh the config every time before scheduling
+ schedulerConfigCache.refresh();
+
Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
- Map<String, Number> userConf = getUserConf();
+ Map<String, Number> userConf = config();
Map<String, IsolatedPool> userPools = new HashMap<>();
for (Map.Entry<String, Number> entry : userConf.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 788767c..f2580b8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -43,6 +43,7 @@ import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriori
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
import org.apache.storm.scheduler.utils.IConfigLoader;
+import org.apache.storm.scheduler.utils.SchedulerConfigCache;
import org.apache.storm.shade.com.google.common.collect.ImmutableList;
import org.apache.storm.utils.DisallowedStrategyException;
import org.apache.storm.utils.ObjectReader;
@@ -62,6 +63,7 @@ public class ResourceAwareScheduler implements IScheduler {
private Map<String, Set<String>> evictedTopologiesMap; // topoId : toposEvicted
private Meter schedulingTimeoutMeter;
private Meter internalErrorMeter;
+ private SchedulerConfigCache<Map<String, Map<String, Double>>> schedulerConfigCache;
private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
markFailedTopology(u, c, td, message, null);
@@ -92,6 +94,9 @@ public class ResourceAwareScheduler implements IScheduler {
conf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60);
backgroundScheduling = Executors.newFixedThreadPool(1);
evictedTopologiesMap = new HashMap<>();
+
+ schedulerConfigCache = new SchedulerConfigCache<>(conf, this::loadConfig);
+ schedulerConfigCache.prepare();
}
@Override
@@ -102,11 +107,14 @@ public class ResourceAwareScheduler implements IScheduler {
@Override
public Map<String, Map<String, Double>> config() {
- return (Map) getUserResourcePools();
+ return Collections.unmodifiableMap(schedulerConfigCache.get());
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
+ //refresh the config every time before scheduling
+ schedulerConfigCache.refresh();
+
Map<String, User> userMap = getUsers(cluster);
List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
if (LOG.isDebugEnabled()) {
@@ -439,7 +447,7 @@ public class ResourceAwareScheduler implements IScheduler {
*/
private Map<String, User> getUsers(Cluster cluster) {
Map<String, User> userMap = new HashMap<>();
- Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
+ Map<String, Map<String, Double>> userResourcePools = config();
LOG.debug("userResourcePools: {}", userResourcePools);
for (TopologyDetails td : cluster.getTopologies()) {
@@ -479,7 +487,7 @@ public class ResourceAwareScheduler implements IScheduler {
* @return a map that contains resource guarantees of every user of the following format
* {userid->{resourceType->amountGuaranteed}}
*/
- private Map<String, Map<String, Double>> getUserResourcePools() {
+ private Map<String, Map<String, Double>> loadConfig() {
Map<String, Map<String, Number>> raw;
// Try the loader plugin, if configured
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
index e8f8a94..9f2d8cd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
@@ -44,6 +44,7 @@ import org.yaml.snakeyaml.constructor.SafeConstructor;
/**
* A dynamic loader that can load scheduler configurations for user resource guarantees from Artifactory (an artifact repository manager).
+ * This is not thread-safe.
*/
public class ArtifactoryConfigLoader implements IConfigLoader {
protected static final String LOCAL_ARTIFACT_DIR = "scheduler_artifacts";
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/SchedulerConfigCache.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/SchedulerConfigCache.java
new file mode 100644
index 0000000..e85bb9f
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/SchedulerConfigCache.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.StormTimer;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility class to cache the scheduler config and refresh after the cache expires.
+ */
+public class SchedulerConfigCache<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulerConfigCache.class);
+
+ private final Reloadable<T> reloader;
+ private AtomicReference<T> schedulerConfigAtomicReference;
+ private long lastUpdateTimestamp;
+ long configCacheExpirationMs;
+
+ public SchedulerConfigCache(Map<String, Object> conf, Reloadable<T> reloader) {
+ schedulerConfigAtomicReference = new AtomicReference<>();
+ lastUpdateTimestamp = 0;
+ this.reloader = reloader;
+ configCacheExpirationMs = ObjectReader.getInt(conf.get(DaemonConfig.SCHEDULER_CONFIG_CACHE_EXPIRATION_SECS), 60) * 1000L;
+ }
+
+ public void prepare() {
+ //refresh the cache here to make sure cache is available at very beginning
+ refresh();
+ }
+
+ /**
+ * Refresh the config only after the cache expires.
+ * This is not thread-safe and should only be called in single thread.
+ */
+ public void refresh() {
+ long current = Time.currentTimeMillis();
+ if (lastUpdateTimestamp <= 0 || current > lastUpdateTimestamp + configCacheExpirationMs) {
+ LOG.debug("refreshing scheduler config since cache is expired");
+ T updatedConfig = reloader.reload();
+ schedulerConfigAtomicReference.set(updatedConfig);
+ lastUpdateTimestamp = current;
+ } else {
+ LOG.debug("skip refreshing scheduler config since cache is not yet expired;");
+ }
+ }
+
+ /**
+ * Get the scheduler config from cache.
+ * This method is thead-safe and can be called in multiple threads.
+ * @return the scheduler config
+ */
+ public T get() {
+ return schedulerConfigAtomicReference.get();
+ }
+
+ public interface Reloadable<T> {
+ /**
+ * Reload and return the configs.
+ * @return reloaded configs. It can't be null.
+ */
+ T reload();
+ }
+}