You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/15 15:08:08 UTC

[storm] branch master updated: [STORM-3624] Fix race condition when loading scheduler configs (#3249)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new af5021a  [STORM-3624] Fix race condition when loading scheduler configs (#3249)
af5021a is described below

commit af5021aa1911a89c8cee65b91f50fa8d00263d08
Author: Meng Li (Ethan) <et...@gmail.com>
AuthorDate: Mon Jun 15 10:07:51 2020 -0500

    [STORM-3624] Fix race condition when loading scheduler configs (#3249)
---
 .../main/java/org/apache/storm/DaemonConfig.java   |  7 ++
 .../apache/storm/scheduler/DefaultScheduler.java   |  3 +-
 .../org/apache/storm/scheduler/EvenScheduler.java  |  2 +-
 .../apache/storm/scheduler/IsolationScheduler.java |  2 +-
 .../multitenant/MultitenantScheduler.java          | 19 +++--
 .../scheduler/resource/ResourceAwareScheduler.java | 14 +++-
 .../scheduler/utils/ArtifactoryConfigLoader.java   |  1 +
 .../scheduler/utils/SchedulerConfigCache.java      | 83 ++++++++++++++++++++++
 8 files changed, 119 insertions(+), 12 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 96b6e21..2444351 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -938,6 +938,13 @@ public class DaemonConfig implements Validated {
     public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines";
 
     /**
+     * How long before a scheduler considers its config cache expired.
+     */
+    @IsInteger
+    @IsPositiveNumber(includeZero = true)
+    public static final String SCHEDULER_CONFIG_CACHE_EXPIRATION_SECS = "scheduler.config.cache.expiration.secs";
+
+    /**
      * For ArtifactoryConfigLoader, this can either be a reference to an individual file in Artifactory or to a directory. If it is a
      * directory, the file with the largest lexographic name will be returned. Users need to add "artifactory+" to the beginning of the real
      * URI to use ArtifactoryConfigLoader. For FileConfigLoader, this is the URI pointing to a file.
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
index c137521..81a0ad8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.scheduler;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -109,6 +110,6 @@ public class DefaultScheduler implements IScheduler {
 
     @Override
     public Map<String, Map<String, Double>> config() {
-        return new HashMap<>();
+        return Collections.emptyMap();
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
index e4431a9..ccc2e34 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java
@@ -173,7 +173,7 @@ public class EvenScheduler implements IScheduler {
 
     @Override
     public Map<String, Map<String, Double>> config() {
-        return new HashMap<>();
+        return Collections.emptyMap();
     }
 
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
index 75d8d9b..87d2ae7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IsolationScheduler.java
@@ -53,7 +53,7 @@ public class IsolationScheduler implements IScheduler {
 
     @Override
     public Map<String, Map<String, Double>> config() {
-        return new HashMap<>();
+        return Collections.emptyMap();
     }
 
     // get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index aa75ff8..2121d0c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.scheduler.multitenant;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.storm.DaemonConfig;
@@ -22,6 +23,7 @@ import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
 import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
 import org.apache.storm.scheduler.utils.IConfigLoader;
+import org.apache.storm.scheduler.utils.SchedulerConfigCache;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,12 +32,14 @@ public class MultitenantScheduler implements IScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(MultitenantScheduler.class);
     protected IConfigLoader configLoader;
     private Map<String, Object> conf;
+    private SchedulerConfigCache<Map<String, Number>> schedulerConfigCache;
 
     @Override
     public void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) {
         this.conf = conf;
         configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
-
+        schedulerConfigCache = new SchedulerConfigCache<>(conf, this::loadConfig);
+        schedulerConfigCache.prepare();
     }
 
     /**
@@ -43,7 +47,7 @@ public class MultitenantScheduler implements IScheduler {
      * if no config available from multitenant-scheduler.yaml, get configs from conf. Only one will be used.
      * @return User pool configs.
      */
-    private Map<String, Number> getUserConf() {
+    private Map<String, Number> loadConfig() {
         Map<String, Number> ret;
 
         // Try the loader plugin, if configured
@@ -69,23 +73,26 @@ public class MultitenantScheduler implements IScheduler {
         // If that fails, use config
         ret = (Map<String, Number>) conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
         if (ret == null) {
-            return new HashMap<>();
+            return Collections.emptyMap();
         } else {
             return ret;
         }
     }
 
     @Override
-    public Map config() {
-        return getUserConf();
+    public Map<String, Number> config() {
+        return Collections.unmodifiableMap(schedulerConfigCache.get());
     }
 
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
         LOG.debug("Rerunning scheduling...");
+        //refresh the config every time before scheduling
+        schedulerConfigCache.refresh();
+
         Map<String, Node> nodeIdToNode = Node.getAllNodesFrom(cluster);
 
-        Map<String, Number> userConf = getUserConf();
+        Map<String, Number> userConf = config();
 
         Map<String, IsolatedPool> userPools = new HashMap<>();
         for (Map.Entry<String, Number> entry : userConf.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 788767c..f2580b8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -43,6 +43,7 @@ import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriori
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
 import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
 import org.apache.storm.scheduler.utils.IConfigLoader;
+import org.apache.storm.scheduler.utils.SchedulerConfigCache;
 import org.apache.storm.shade.com.google.common.collect.ImmutableList;
 import org.apache.storm.utils.DisallowedStrategyException;
 import org.apache.storm.utils.ObjectReader;
@@ -62,6 +63,7 @@ public class ResourceAwareScheduler implements IScheduler {
     private Map<String, Set<String>> evictedTopologiesMap;   // topoId : toposEvicted
     private Meter schedulingTimeoutMeter;
     private Meter internalErrorMeter;
+    private SchedulerConfigCache<Map<String, Map<String, Double>>> schedulerConfigCache;
 
     private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) {
         markFailedTopology(u, c, td, message, null);
@@ -92,6 +94,9 @@ public class ResourceAwareScheduler implements IScheduler {
                 conf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60);
         backgroundScheduling = Executors.newFixedThreadPool(1);
         evictedTopologiesMap = new HashMap<>();
+
+        schedulerConfigCache = new SchedulerConfigCache<>(conf, this::loadConfig);
+        schedulerConfigCache.prepare();
     }
 
     @Override
@@ -102,11 +107,14 @@ public class ResourceAwareScheduler implements IScheduler {
 
     @Override
     public Map<String, Map<String, Double>> config() {
-        return (Map) getUserResourcePools();
+        return Collections.unmodifiableMap(schedulerConfigCache.get());
     }
 
     @Override
     public void schedule(Topologies topologies, Cluster cluster) {
+        //refresh the config every time before scheduling
+        schedulerConfigCache.refresh();
+
         Map<String, User> userMap = getUsers(cluster);
         List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
         if (LOG.isDebugEnabled()) {
@@ -439,7 +447,7 @@ public class ResourceAwareScheduler implements IScheduler {
      */
     private Map<String, User> getUsers(Cluster cluster) {
         Map<String, User> userMap = new HashMap<>();
-        Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
+        Map<String, Map<String, Double>> userResourcePools = config();
         LOG.debug("userResourcePools: {}", userResourcePools);
 
         for (TopologyDetails td : cluster.getTopologies()) {
@@ -479,7 +487,7 @@ public class ResourceAwareScheduler implements IScheduler {
      * @return a map that contains resource guarantees of every user of the following format
      *     {userid->{resourceType->amountGuaranteed}}
      */
-    private Map<String, Map<String, Double>> getUserResourcePools() {
+    private Map<String, Map<String, Double>> loadConfig() {
         Map<String, Map<String, Number>> raw;
 
         // Try the loader plugin, if configured
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
index e8f8a94..9f2d8cd 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
@@ -44,6 +44,7 @@ import org.yaml.snakeyaml.constructor.SafeConstructor;
 
 /**
  * A dynamic loader that can load scheduler configurations for user resource guarantees from Artifactory (an artifact repository manager).
+ * This is not thread-safe.
  */
 public class ArtifactoryConfigLoader implements IConfigLoader {
     protected static final String LOCAL_ARTIFACT_DIR = "scheduler_artifacts";
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/utils/SchedulerConfigCache.java b/storm-server/src/main/java/org/apache/storm/scheduler/utils/SchedulerConfigCache.java
new file mode 100644
index 0000000..e85bb9f
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/utils/SchedulerConfigCache.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.StormTimer;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A utility class to cache the scheduler config and refresh after the cache expires.
+ */
+public class SchedulerConfigCache<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SchedulerConfigCache.class);
+
+    private final Reloadable<T> reloader;
+    private AtomicReference<T> schedulerConfigAtomicReference;
+    private long lastUpdateTimestamp;
+    long configCacheExpirationMs;
+
+    public SchedulerConfigCache(Map<String, Object> conf, Reloadable<T> reloader) {
+        schedulerConfigAtomicReference = new AtomicReference<>();
+        lastUpdateTimestamp = 0;
+        this.reloader = reloader;
+        configCacheExpirationMs = ObjectReader.getInt(conf.get(DaemonConfig.SCHEDULER_CONFIG_CACHE_EXPIRATION_SECS), 60) * 1000L;
+    }
+
+    public void prepare() {
+        //refresh the cache here to make sure cache is available at very beginning
+        refresh();
+    }
+
+    /**
+     * Refresh the config only after the cache expires.
+     * This is not thread-safe and should only be called in single thread.
+     */
+    public void refresh() {
+        long current = Time.currentTimeMillis();
+        if (lastUpdateTimestamp <= 0 || current > lastUpdateTimestamp + configCacheExpirationMs) {
+            LOG.debug("refreshing scheduler config since cache is expired");
+            T updatedConfig = reloader.reload();
+            schedulerConfigAtomicReference.set(updatedConfig);
+            lastUpdateTimestamp = current;
+        } else {
+            LOG.debug("skip refreshing scheduler config since cache is not yet expired;");
+        }
+    }
+
+    /**
+     * Get the scheduler config from cache.
+     * This method is thead-safe and can be called in multiple threads.
+     * @return the scheduler config
+     */
+    public T get() {
+        return schedulerConfigAtomicReference.get();
+    }
+
+    public interface Reloadable<T> {
+        /**
+         * Reload and return the configs.
+         * @return reloaded configs. It can't be null.
+         */
+        T reload();
+    }
+}