You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/29 20:24:46 UTC

[storm] branch master updated: STORM-3596 use send assignment status in blacklist scheduling

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 669d886  STORM-3596 use send assignment status in blacklist scheduling
     new 626ebd7  Merge pull request #3254 from agresch/agresch_agresch_storm_3596_2
669d886 is described below

commit 669d886bef942498ec83d5c6839cfd03f25a0307
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Apr 20 15:50:59 2020 -0500

    STORM-3596 use send assignment status in blacklist scheduling
---
 .../main/java/org/apache/storm/DaemonConfig.java   |  7 ++++
 .../org/apache/storm/daemon/nimbus/Nimbus.java     |  2 +-
 .../nimbus/AssignmentDistributionService.java      | 15 ++++++--
 .../scheduler/INodeAssignmentSentCallBack.java     | 19 ++++++++++
 .../org/apache/storm/scheduler/IScheduler.java     |  2 +-
 .../scheduler/blacklist/BlacklistScheduler.java    | 44 +++++++++++++++++++---
 .../strategies/DefaultBlacklistStrategy.java       | 14 ++++++-
 .../blacklist/strategies/IBlacklistStrategy.java   |  5 ++-
 8 files changed, 96 insertions(+), 12 deletions(-)

diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 731ca7a..4d186aa 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -122,6 +122,13 @@ public class DaemonConfig implements Validated {
     public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs";
 
     /**
+     * Enables blacklisting support for supervisors with failed send assignment calls.
+     */
+    @IsBoolean
+    public static final String BLACKLIST_SCHEDULER_ENABLE_SEND_ASSIGNMENT_FAILURES =
+            "blacklist.scheduler.enable.send.assignment.failures";
+
+    /**
      * The class that the blacklist scheduler will report the blacklist.
      */
     @NotNull
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index aa94735..d01e223 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -588,7 +588,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         this.blobStore.setLeaderElector(this.leaderElector);
 
         this.topoCache = topoCache;
-        this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf);
+        this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf, this.scheduler);
         this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
         this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
         this.idToResources = new AtomicReference<>(new HashMap<>());
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4eb1bb4..ee9f136 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -27,6 +27,7 @@ import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorAssignments;
 import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.INodeAssignmentSentCallBack;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
@@ -88,15 +89,17 @@ public class AssignmentDistributionService implements Closeable {
     private Map conf;
 
     private boolean isLocalMode = false; // boolean cache for local mode decision
+    private INodeAssignmentSentCallBack sendAssignmentCallback;
 
     /**
      * Factory method for initialize a instance.
      * @param conf config.
+     * @param callback callback for sendAssignment results
      * @return an instance of {@link AssignmentDistributionService}
      */
-    public static AssignmentDistributionService getInstance(Map conf) {
+    public static AssignmentDistributionService getInstance(Map conf, INodeAssignmentSentCallBack callback) {
         AssignmentDistributionService service = new AssignmentDistributionService();
-        service.prepare(conf);
+        service.prepare(conf, callback);
         return service;
     }
 
@@ -104,9 +107,11 @@ public class AssignmentDistributionService implements Closeable {
      * Function for initialization.
      *
      * @param conf config
+     * @param callback callback for sendAssignment results
      */
-    public void prepare(Map conf) {
+    public void prepare(Map conf, INodeAssignmentSentCallBack callBack) {
         this.conf = conf;
+        this.sendAssignmentCallback = callBack;
         this.random = new Random(47);
 
         this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
@@ -289,8 +294,10 @@ public class AssignmentDistributionService implements Closeable {
                 Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode());
                 if (supervisor != null) {
                     supervisor.sendSupervisorAssignments(assignments.getAssignments());
+                    service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), true);
                 } else {
                     LOG.error("Can not find node {} for assignments distribution", assignments.getNode());
+                    service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), false);
                     throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance.");
                 }
             } else {
@@ -299,9 +306,11 @@ public class AssignmentDistributionService implements Closeable {
                                                                                     assignments.getHost(), assignments.getServerPort())) {
                     try {
                         client.getIface().sendSupervisorAssignments(assignments.getAssignments());
+                        service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), true);
                     } catch (Exception e) {
                         assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
                         LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
+                        service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), false);
                     }
                 } catch (Throwable e) {
                     //just ignore any error/exception.
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/INodeAssignmentSentCallBack.java b/storm-server/src/main/java/org/apache/storm/scheduler/INodeAssignmentSentCallBack.java
new file mode 100644
index 0000000..9c7cd5f
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/INodeAssignmentSentCallBack.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+public interface INodeAssignmentSentCallBack {
+    default void nodeAssignmentSent(String node, boolean successful) {
+    }
+}
+
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
index 3e150ec..e7888c8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
@@ -15,7 +15,7 @@ package org.apache.storm.scheduler;
 import java.util.Map;
 import org.apache.storm.metric.StormMetricsRegistry;
 
-public interface IScheduler {
+public interface IScheduler extends INodeAssignmentSentCallBack {
 
     void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry);
 
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index b5c18ca..b29981f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -51,12 +51,15 @@ public class BlacklistScheduler implements IScheduler {
     protected IBlacklistStrategy blacklistStrategy;
     protected int nimbusMonitorFreqSecs;
     protected Map<String, Set<Integer>> cachedSupervisors;
-    //key is supervisor key ,value is supervisor ports
-    protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
+    // key is supervisor nodeId, value is supervisor ports
+    protected EvictingQueue<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
+    protected EvictingQueue<Map<String, Integer>> sendAssignmentFailureCount;
+    private final Map<String, Integer> assignmentFailures = new HashMap<>();
     protected int windowSize;
     protected volatile Set<String> blacklistedSupervisorIds;     // supervisor ids
     private boolean blacklistOnBadSlots;
     private Map<String, Object> conf;
+    private boolean blacklistSendAssignentFailures;
 
     public BlacklistScheduler(IScheduler underlyingScheduler) {
         this.underlyingScheduler = underlyingScheduler;
@@ -75,6 +78,8 @@ public class BlacklistScheduler implements IScheduler {
                                              DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
         resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME),
                                          DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+        blacklistSendAssignentFailures = ObjectReader.getBoolean(this.conf.get(
+                DaemonConfig.BLACKLIST_SCHEDULER_ENABLE_SEND_ASSIGNMENT_FAILURES), false);
 
         String reporterClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
                                                           LogReporter.class.getName());
@@ -89,6 +94,7 @@ public class BlacklistScheduler implements IScheduler {
 
         windowSize = toleranceTime / nimbusMonitorFreqSecs;
         badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
+        sendAssignmentFailureCount = EvictingQueue.create(windowSize);
         cachedSupervisors = new HashMap<>();
         blacklistedSupervisorIds = new HashSet<>();
         blacklistOnBadSlots = ObjectReader.getBoolean(
@@ -114,7 +120,8 @@ public class BlacklistScheduler implements IScheduler {
 
         Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
         blacklistStrategy.resumeFromBlacklist();
-        badSupervisors(supervisors);
+        trackMissedHeartbeats(supervisors);
+        trackAssignmentFailures();
         // this step also frees up some bad supervisors to greylist due to resource shortage
         blacklistedSupervisorIds = refreshBlacklistedSupervisorIds(cluster, topologies);
         Set<String> blacklistHosts = getBlacklistHosts(cluster, blacklistedSupervisorIds);
@@ -129,7 +136,7 @@ public class BlacklistScheduler implements IScheduler {
         return underlyingScheduler.config();
     }
 
-    private void badSupervisors(Map<String, SupervisorDetails> supervisors) {
+    private void trackMissedHeartbeats(Map<String, SupervisorDetails> supervisors) {
         Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
         Set<String> supervisorsKeySet = supervisors.keySet();
 
@@ -155,6 +162,18 @@ public class BlacklistScheduler implements IScheduler {
         badSupervisorsToleranceSlidingWindow.add(badSupervisors);
     }
 
+    private void trackAssignmentFailures() {
+        if (!blacklistSendAssignentFailures) {
+            return;
+        }
+        Map<String, Integer> assignmentFailureWindow = new HashMap<>();
+        synchronized (assignmentFailures) {
+            assignmentFailureWindow.putAll(this.assignmentFailures);
+            this.assignmentFailures.clear();
+        }
+        this.sendAssignmentFailureCount.add(assignmentFailureWindow);
+    }
+
     private Set<Integer> badSlots(SupervisorDetails supervisor, String supervisorKey) {
         Set<Integer> cachedSupervisorPorts = cachedSupervisors.get(supervisorKey);
         Set<Integer> supervisorPorts = supervisor.getAllPorts();
@@ -171,7 +190,9 @@ public class BlacklistScheduler implements IScheduler {
     }
 
     private Set<String> refreshBlacklistedSupervisorIds(Cluster cluster, Topologies topologies) {
-        Set<String> blacklistedSupervisors = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow),
+        Set<String> blacklistedSupervisors = blacklistStrategy.getBlacklist(
+                new ArrayList<>(badSupervisorsToleranceSlidingWindow),
+                new ArrayList<>(sendAssignmentFailureCount),
                 cluster, topologies);
         LOG.info("Supervisors {} are blacklisted.", blacklistedSupervisors);
         return blacklistedSupervisors;
@@ -266,4 +287,17 @@ public class BlacklistScheduler implements IScheduler {
     public Set<String> getBlacklistSupervisorIds() {
         return Collections.unmodifiableSet(blacklistedSupervisorIds);
     }
+
+    @Override
+    public void nodeAssignmentSent(String node, boolean successful) {
+        if (!blacklistSendAssignentFailures) {
+            return;
+        }
+        if (!successful) {
+            synchronized (assignmentFailures) {
+                int failCount = assignmentFailures.getOrDefault(node, 0) + 1;
+                assignmentFailures.put(node, failCount);
+            }
+        }
+    }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 7115503..675e636 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -61,7 +61,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
     }
 
     @Override
-    public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
+    public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures,
+                                    List<Map<String, Integer>> sendAssignmentFailureCount,
+                                    Cluster cluster, Topologies topologies) {
         Map<String, Integer> countMap = new HashMap<>();
 
         for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
@@ -72,6 +74,15 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
             }
         }
 
+        // update countMap failures for sendAssignments failing
+        for (Map<String, Integer> item : sendAssignmentFailureCount) {
+            for (Map.Entry<String, Integer> entry : item.entrySet()) {
+                String supervisorNode = entry.getKey();
+                int sendAssignmentFailures = entry.getValue() + countMap.getOrDefault(supervisorNode, 0);
+                countMap.put(supervisorNode, sendAssignmentFailures);
+            }
+        }
+
         for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
             String supervisor = entry.getKey();
             int count = entry.getValue();
@@ -79,6 +90,7 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
                 if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
                     LOG.debug("Added supervisor {} to blacklist", supervisor);
                     LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
+                    LOG.debug("sendAssignmentFailureCount: {}", sendAssignmentFailureCount);
                     reporter.reportBlacklist(supervisor, supervisorsWithFailures);
                     blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs);
                 }
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index 295dd82..1201916 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -26,6 +26,7 @@ public interface IBlacklistStrategy {
      * Get blacklist by blacklist strategy.
      *
      * @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in sliding window
+     * @param sendAssignmentFailureCount supervisors with failed assignment calls in sliding window
      * @param cluster the cluster these topologies are running in. `cluster` contains everything user
      *       need to develop a new scheduling logic. e.g. supervisors information, available slots, current
      *       assignments for all the topologies etc. User can set the new assignment for topologies using
@@ -35,7 +36,9 @@ public interface IBlacklistStrategy {
      *       the `cluster` object.
      * @return blacklisted supervisors' id set
      */
-    Set<String> getBlacklist(List<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
+    Set<String> getBlacklist(List<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow,
+                             List<Map<String, Integer>> sendAssignmentFailureCount,
+                             Cluster cluster, Topologies topologies);
 
     /**
      * resume supervisors form blacklist. Blacklist is just a temporary list for supervisors,