You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/04/29 20:24:46 UTC
[storm] branch master updated: STORM-3596 use send assignment
status in blacklist scheduling
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 669d886 STORM-3596 use send assignment status in blacklist scheduling
new 626ebd7 Merge pull request #3254 from agresch/agresch_agresch_storm_3596_2
669d886 is described below
commit 669d886bef942498ec83d5c6839cfd03f25a0307
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Mon Apr 20 15:50:59 2020 -0500
STORM-3596 use send assignment status in blacklist scheduling
---
.../main/java/org/apache/storm/DaemonConfig.java | 7 ++++
.../org/apache/storm/daemon/nimbus/Nimbus.java | 2 +-
.../nimbus/AssignmentDistributionService.java | 15 ++++++--
.../scheduler/INodeAssignmentSentCallBack.java | 19 ++++++++++
.../org/apache/storm/scheduler/IScheduler.java | 2 +-
.../scheduler/blacklist/BlacklistScheduler.java | 44 +++++++++++++++++++---
.../strategies/DefaultBlacklistStrategy.java | 14 ++++++-
.../blacklist/strategies/IBlacklistStrategy.java | 5 ++-
8 files changed, 96 insertions(+), 12 deletions(-)
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 731ca7a..4d186aa 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -122,6 +122,13 @@ public class DaemonConfig implements Validated {
public static final String BLACKLIST_SCHEDULER_RESUME_TIME = "blacklist.scheduler.resume.time.secs";
/**
+ * Enables blacklisting support for supervisors with failed send assignment calls.
+ */
+ @IsBoolean
+ public static final String BLACKLIST_SCHEDULER_ENABLE_SEND_ASSIGNMENT_FAILURES =
+ "blacklist.scheduler.enable.send.assignment.failures";
+
+ /**
* The class that the blacklist scheduler will report the blacklist.
*/
@NotNull
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index aa94735..d01e223 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -588,7 +588,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
this.blobStore.setLeaderElector(this.leaderElector);
this.topoCache = topoCache;
- this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf);
+ this.assignmentsDistributer = AssignmentDistributionService.getInstance(conf, this.scheduler);
this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
this.nodeIdToResources = new AtomicReference<>(new HashMap<>());
this.idToResources = new AtomicReference<>(new HashMap<>());
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4eb1bb4..ee9f136 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -27,6 +27,7 @@ import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorAssignments;
import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.INodeAssignmentSentCallBack;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorClient;
@@ -88,15 +89,17 @@ public class AssignmentDistributionService implements Closeable {
private Map conf;
private boolean isLocalMode = false; // boolean cache for local mode decision
+ private INodeAssignmentSentCallBack sendAssignmentCallback;
/**
* Factory method for initialize a instance.
* @param conf config.
+ * @param callback callback for sendAssignment results
* @return an instance of {@link AssignmentDistributionService}
*/
- public static AssignmentDistributionService getInstance(Map conf) {
+ public static AssignmentDistributionService getInstance(Map conf, INodeAssignmentSentCallBack callback) {
AssignmentDistributionService service = new AssignmentDistributionService();
- service.prepare(conf);
+ service.prepare(conf, callback);
return service;
}
@@ -104,9 +107,11 @@ public class AssignmentDistributionService implements Closeable {
* Function for initialization.
*
* @param conf config
+ * @param callback callback for sendAssignment results
*/
- public void prepare(Map conf) {
+ public void prepare(Map conf, INodeAssignmentSentCallBack callBack) {
this.conf = conf;
+ this.sendAssignmentCallback = callBack;
this.random = new Random(47);
this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10);
@@ -289,8 +294,10 @@ public class AssignmentDistributionService implements Closeable {
Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode());
if (supervisor != null) {
supervisor.sendSupervisorAssignments(assignments.getAssignments());
+ service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), true);
} else {
LOG.error("Can not find node {} for assignments distribution", assignments.getNode());
+ service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), false);
throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance.");
}
} else {
@@ -299,9 +306,11 @@ public class AssignmentDistributionService implements Closeable {
assignments.getHost(), assignments.getServerPort())) {
try {
client.getIface().sendSupervisorAssignments(assignments.getAssignments());
+ service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), true);
} catch (Exception e) {
assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
+ service.sendAssignmentCallback.nodeAssignmentSent(assignments.getNode(), false);
}
} catch (Throwable e) {
//just ignore any error/exception.
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/INodeAssignmentSentCallBack.java b/storm-server/src/main/java/org/apache/storm/scheduler/INodeAssignmentSentCallBack.java
new file mode 100644
index 0000000..9c7cd5f
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/INodeAssignmentSentCallBack.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.scheduler;
+
+public interface INodeAssignmentSentCallBack {
+ default void nodeAssignmentSent(String node, boolean successful) {
+ }
+}
+
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
index 3e150ec..e7888c8 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/IScheduler.java
@@ -15,7 +15,7 @@ package org.apache.storm.scheduler;
import java.util.Map;
import org.apache.storm.metric.StormMetricsRegistry;
-public interface IScheduler {
+public interface IScheduler extends INodeAssignmentSentCallBack {
void prepare(Map<String, Object> conf, StormMetricsRegistry metricsRegistry);
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
index b5c18ca..b29981f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java
@@ -51,12 +51,15 @@ public class BlacklistScheduler implements IScheduler {
protected IBlacklistStrategy blacklistStrategy;
protected int nimbusMonitorFreqSecs;
protected Map<String, Set<Integer>> cachedSupervisors;
- //key is supervisor key ,value is supervisor ports
- protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
+ // key is supervisor nodeId, value is supervisor ports
+ protected EvictingQueue<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
+ protected EvictingQueue<Map<String, Integer>> sendAssignmentFailureCount;
+ private final Map<String, Integer> assignmentFailures = new HashMap<>();
protected int windowSize;
protected volatile Set<String> blacklistedSupervisorIds; // supervisor ids
private boolean blacklistOnBadSlots;
private Map<String, Object> conf;
+ private boolean blacklistSendAssignentFailures;
public BlacklistScheduler(IScheduler underlyingScheduler) {
this.underlyingScheduler = underlyingScheduler;
@@ -75,6 +78,8 @@ public class BlacklistScheduler implements IScheduler {
DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME),
DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
+ blacklistSendAssignentFailures = ObjectReader.getBoolean(this.conf.get(
+ DaemonConfig.BLACKLIST_SCHEDULER_ENABLE_SEND_ASSIGNMENT_FAILURES), false);
String reporterClassName = ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
LogReporter.class.getName());
@@ -89,6 +94,7 @@ public class BlacklistScheduler implements IScheduler {
windowSize = toleranceTime / nimbusMonitorFreqSecs;
badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
+ sendAssignmentFailureCount = EvictingQueue.create(windowSize);
cachedSupervisors = new HashMap<>();
blacklistedSupervisorIds = new HashSet<>();
blacklistOnBadSlots = ObjectReader.getBoolean(
@@ -114,7 +120,8 @@ public class BlacklistScheduler implements IScheduler {
Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
blacklistStrategy.resumeFromBlacklist();
- badSupervisors(supervisors);
+ trackMissedHeartbeats(supervisors);
+ trackAssignmentFailures();
// this step also frees up some bad supervisors to greylist due to resource shortage
blacklistedSupervisorIds = refreshBlacklistedSupervisorIds(cluster, topologies);
Set<String> blacklistHosts = getBlacklistHosts(cluster, blacklistedSupervisorIds);
@@ -129,7 +136,7 @@ public class BlacklistScheduler implements IScheduler {
return underlyingScheduler.config();
}
- private void badSupervisors(Map<String, SupervisorDetails> supervisors) {
+ private void trackMissedHeartbeats(Map<String, SupervisorDetails> supervisors) {
Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
Set<String> supervisorsKeySet = supervisors.keySet();
@@ -155,6 +162,18 @@ public class BlacklistScheduler implements IScheduler {
badSupervisorsToleranceSlidingWindow.add(badSupervisors);
}
+ private void trackAssignmentFailures() {
+ if (!blacklistSendAssignentFailures) {
+ return;
+ }
+ Map<String, Integer> assignmentFailureWindow = new HashMap<>();
+ synchronized (assignmentFailures) {
+ assignmentFailureWindow.putAll(this.assignmentFailures);
+ this.assignmentFailures.clear();
+ }
+ this.sendAssignmentFailureCount.add(assignmentFailureWindow);
+ }
+
private Set<Integer> badSlots(SupervisorDetails supervisor, String supervisorKey) {
Set<Integer> cachedSupervisorPorts = cachedSupervisors.get(supervisorKey);
Set<Integer> supervisorPorts = supervisor.getAllPorts();
@@ -171,7 +190,9 @@ public class BlacklistScheduler implements IScheduler {
}
private Set<String> refreshBlacklistedSupervisorIds(Cluster cluster, Topologies topologies) {
- Set<String> blacklistedSupervisors = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow),
+ Set<String> blacklistedSupervisors = blacklistStrategy.getBlacklist(
+ new ArrayList<>(badSupervisorsToleranceSlidingWindow),
+ new ArrayList<>(sendAssignmentFailureCount),
cluster, topologies);
LOG.info("Supervisors {} are blacklisted.", blacklistedSupervisors);
return blacklistedSupervisors;
@@ -266,4 +287,17 @@ public class BlacklistScheduler implements IScheduler {
public Set<String> getBlacklistSupervisorIds() {
return Collections.unmodifiableSet(blacklistedSupervisorIds);
}
+
+ @Override
+ public void nodeAssignmentSent(String node, boolean successful) {
+ if (!blacklistSendAssignentFailures) {
+ return;
+ }
+ if (!successful) {
+ synchronized (assignmentFailures) {
+ int failCount = assignmentFailures.getOrDefault(node, 0) + 1;
+ assignmentFailures.put(node, failCount);
+ }
+ }
+ }
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
index 7115503..675e636 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java
@@ -61,7 +61,9 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
}
@Override
- public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures, Cluster cluster, Topologies topologies) {
+ public Set<String> getBlacklist(List<Map<String, Set<Integer>>> supervisorsWithFailures,
+ List<Map<String, Integer>> sendAssignmentFailureCount,
+ Cluster cluster, Topologies topologies) {
Map<String, Integer> countMap = new HashMap<>();
for (Map<String, Set<Integer>> item : supervisorsWithFailures) {
@@ -72,6 +74,15 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
}
}
+ // update countMap failures for sendAssignments failing
+ for (Map<String, Integer> item : sendAssignmentFailureCount) {
+ for (Map.Entry<String, Integer> entry : item.entrySet()) {
+ String supervisorNode = entry.getKey();
+ int sendAssignmentFailures = entry.getValue() + countMap.getOrDefault(supervisorNode, 0);
+ countMap.put(supervisorNode, sendAssignmentFailures);
+ }
+ }
+
for (Map.Entry<String, Integer> entry : countMap.entrySet()) {
String supervisor = entry.getKey();
int count = entry.getValue();
@@ -79,6 +90,7 @@ public class DefaultBlacklistStrategy implements IBlacklistStrategy {
if (!blacklist.containsKey(supervisor)) { // if not in blacklist then add it and set the resume time according to config
LOG.debug("Added supervisor {} to blacklist", supervisor);
LOG.debug("supervisorsWithFailures : {}", supervisorsWithFailures);
+ LOG.debug("sendAssignmentFailureCount: {}", sendAssignmentFailureCount);
reporter.reportBlacklist(supervisor, supervisorsWithFailures);
blacklist.put(supervisor, resumeTime / nimbusMonitorFreqSecs);
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
index 295dd82..1201916 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/IBlacklistStrategy.java
@@ -26,6 +26,7 @@ public interface IBlacklistStrategy {
* Get blacklist by blacklist strategy.
*
* @param badSupervisorsToleranceSlidingWindow bad supervisors buffered in sliding window
+ * @param sendAssignmentFailureCount supervisors with failed assignment calls in sliding window
* @param cluster the cluster these topologies are running in. `cluster` contains everything user
* need to develop a new scheduling logic. e.g. supervisors information, available slots, current
* assignments for all the topologies etc. User can set the new assignment for topologies using
@@ -35,7 +36,9 @@ public interface IBlacklistStrategy {
* the `cluster` object.
* @return blacklisted supervisors' id set
*/
- Set<String> getBlacklist(List<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow, Cluster cluster, Topologies topologies);
+ Set<String> getBlacklist(List<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow,
+ List<Map<String, Integer>> sendAssignmentFailureCount,
+ Cluster cluster, Topologies topologies);
/**
* resume supervisors form blacklist. Blacklist is just a temporary list for supervisors,