You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/29 18:36:08 UTC
[1/2] storm git commit: Merge branch 'ras_refactor' of
https://github.com/jerrypeng/storm into STORM-1634
Repository: storm
Updated Branches:
refs/heads/1.x-branch 1cfaf1245 -> 20ba5189f
Merge branch 'ras_refactor' of https://github.com/jerrypeng/storm into STORM-1634
STORM-1634: Refactoring of Resource Aware Scheduler
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bf99ae51
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bf99ae51
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bf99ae51
Branch: refs/heads/1.x-branch
Commit: bf99ae51019c53135cbc23c7465440fc63b1299a
Parents: 1cfaf12
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Mar 29 11:10:55 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Mar 29 11:12:14 2016 -0500
----------------------------------------------------------------------
.../jvm/org/apache/storm/scheduler/Cluster.java | 23 ++--
.../org/apache/storm/scheduler/Topologies.java | 11 +-
.../scheduler/resource/ClusterStateData.java | 101 --------------
.../resource/ResourceAwareScheduler.java | 138 ++++++++-----------
.../scheduler/resource/SchedulingState.java | 56 ++++++++
.../apache/storm/scheduler/resource/User.java | 24 ++--
.../eviction/DefaultEvictionStrategy.java | 10 +-
.../strategies/eviction/IEvictionStrategy.java | 9 +-
.../DefaultSchedulingPriorityStrategy.java | 9 +-
.../priority/ISchedulingPriorityStrategy.java | 9 +-
.../DefaultResourceAwareStrategy.java | 86 ++++++------
.../strategies/scheduling/IStrategy.java | 6 +-
.../resource/TestResourceAwareScheduler.java | 45 ++++++
13 files changed, 247 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
index 4ac4eaf..a6622ce 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Cluster.java
@@ -92,18 +92,17 @@ public class Cluster {
}
/**
- * Get a copy of this cluster object
- */
- public static Cluster getCopy(Cluster cluster) {
- HashMap<String, SchedulerAssignmentImpl> newAssignments = new HashMap<String, SchedulerAssignmentImpl>();
- for (Map.Entry<String, SchedulerAssignmentImpl> entry : cluster.assignments.entrySet()) {
- newAssignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
- }
- Map newConf = new HashMap<String, Object>();
- newConf.putAll(cluster.conf);
- Cluster copy = new Cluster(cluster.inimbus, cluster.supervisors, newAssignments, newConf);
- copy.status = new HashMap<>(cluster.status);
- return copy;
+ * Copy constructor
+ */
+ public Cluster(Cluster src) {
+ this(src.inimbus, src.supervisors, new HashMap<String, SchedulerAssignmentImpl>(), new HashMap<String, Object>(src.conf));
+ this.supervisorsResources.putAll(src.supervisorsResources);
+ for (Map.Entry<String, SchedulerAssignmentImpl> entry : src.assignments.entrySet()) {
+ this.assignments.put(entry.getKey(), new SchedulerAssignmentImpl(entry.getValue().getTopologyId(), entry.getValue().getExecutorToSlot()));
+ }
+ this.status.putAll(src.status);
+ this.topologyResources.putAll(src.topologyResources);
+ this.blackListedHosts.addAll(src.blackListedHosts);
}
public void setBlacklistedHosts(Set<String> hosts) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java b/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
index f9478ab..82cb790 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/Topologies.java
@@ -39,6 +39,13 @@ public class Topologies {
this.nameToId.put(topology.getName(), entry.getKey());
}
}
+
+ /**
+ * copy constructor
+ */
+ public Topologies(Topologies src) {
+ this(src.topologies);
+ }
public TopologyDetails getById(String topologyId) {
return this.topologies.get(topologyId);
@@ -68,10 +75,6 @@ public class Topologies {
return _allComponents;
}
- public static Topologies getCopy(Topologies topologies) {
- return new Topologies(topologies.topologies);
- }
-
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
deleted file mode 100644
index ece2800..0000000
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ClusterStateData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.scheduler.resource;
-
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.ExecutorDetails;
-import org.apache.storm.scheduler.Topologies;
-import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.WorkerSlot;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A class to specify which data and API to expose to a scheduling strategy
- */
-public class ClusterStateData {
-
- private final Cluster cluster;
-
- public final Topologies topologies;
-
- // Information regarding all nodes in the cluster
- public Map<String, NodeDetails> nodes = new HashMap<String, NodeDetails>();
-
- public static final class NodeDetails {
-
- private final RAS_Node node;
-
- public NodeDetails(RAS_Node node) {
- this.node = node;
- }
-
- public String getId() {
- return this.node.getId();
- }
-
- public String getHostname() {
- return this.node.getHostname();
- }
-
- public Collection<WorkerSlot> getFreeSlots() {
- return this.node.getFreeSlots();
- }
-
- public void consumeResourcesforTask(ExecutorDetails exec, TopologyDetails topo) {
- this.node.consumeResourcesforTask(exec, topo);
- }
-
- public Double getAvailableMemoryResources() {
- return this.node.getAvailableMemoryResources();
- }
-
- public Double getAvailableCpuResources() {
- return this.node.getAvailableCpuResources();
- }
-
- public Double getTotalMemoryResources() {
- return this.node.getTotalMemoryResources();
- }
-
- public Double getTotalCpuResources() {
- return this.node.getTotalCpuResources();
- }
- }
-
- public ClusterStateData(Cluster cluster, Topologies topologies) {
- this.cluster = cluster;
- this.topologies = topologies;
- Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies);
- for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
- this.nodes.put(entry.getKey(), new NodeDetails(entry.getValue()));
- }
- }
-
- public Collection<ExecutorDetails> getUnassignedExecutors(String topoId) {
- return this.cluster.getUnassignedExecutors(this.topologies.getById(topoId));
- }
-
- public Map<String, List<String>> getNetworkTopography() {
- return this.cluster.getNetworkTopography();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 2b35d6b..087fe6b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -41,30 +41,8 @@ import java.util.Set;
public class ResourceAwareScheduler implements IScheduler {
- private Map<String, User> userMap;
- private Cluster cluster;
- private Topologies topologies;
- private RAS_Nodes nodes;
-
- private class SchedulingState {
- private Map<String, User> userMap = new HashMap<String, User>();
- private Cluster cluster;
- private Topologies topologies;
- private RAS_Nodes nodes;
- private Map conf = new Config();
-
- public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, RAS_Nodes nodes, Map conf) {
- for (Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
- String userId = userMapEntry.getKey();
- User user = userMapEntry.getValue();
- this.userMap.put(userId, user.getCopy());
- }
- this.cluster = Cluster.getCopy(cluster);
- this.topologies = topologies.getCopy(topologies);
- this.nodes = new RAS_Nodes(this.cluster, this.topologies);
- this.conf.putAll(conf);
- }
- }
+ // Object that holds the current scheduling state
+ private SchedulingState schedulingState;
@SuppressWarnings("rawtypes")
private Map conf;
@@ -86,7 +64,7 @@ public class ResourceAwareScheduler implements IScheduler {
//logs everything that is currently scheduled and the location at which they are scheduled
LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
//logs the resources available/used for every node
- LOG.info("Nodes:\n{}", this.nodes);
+ LOG.info("Nodes:\n{}", this.schedulingState.nodes);
//logs the detailed info about each user
for (User user : getUserMap().values()) {
LOG.info(user.getDetailedInfo());
@@ -104,10 +82,10 @@ public class ResourceAwareScheduler implements IScheduler {
break;
}
}
- TopologyDetails td = null;
+ TopologyDetails td;
try {
//need to re prepare since scheduling state might have been restored
- schedulingPrioritystrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ schedulingPrioritystrategy.prepare(this.schedulingState);
//Call scheduling priority strategy
td = schedulingPrioritystrategy.getNextTopologyToSchedule();
} catch (Exception ex) {
@@ -120,15 +98,27 @@ public class ResourceAwareScheduler implements IScheduler {
}
scheduleTopology(td);
- LOG.debug("Nodes after scheduling:\n{}", this.nodes);
+ LOG.debug("Nodes after scheduling:\n{}", this.schedulingState.nodes);
}
+
+ //update changes to cluster
+ updateChanges(cluster, topologies);
+ }
+
+ private void updateChanges(Cluster cluster, Topologies topologies) {
+ //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
+ cluster.setAssignments(schedulingState.cluster.getAssignments());
+ cluster.setBlacklistedHosts(schedulingState.cluster.getBlacklistedHosts());
+ cluster.setStatusMap(schedulingState.cluster.getStatusMap());
+ cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
+ cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
//updating resources used by supervisor
- updateSupervisorsResources(this.cluster, this.topologies);
+ updateSupervisorsResources(cluster, topologies);
}
public void scheduleTopology(TopologyDetails td) {
- User topologySubmitter = this.userMap.get(td.getTopologySubmitter());
- if (cluster.getUnassignedExecutors(td).size() > 0) {
+ User topologySubmitter = this.schedulingState.userMap.get(td.getTopologySubmitter());
+ if (this.schedulingState.cluster.getUnassignedExecutors(td).size() > 0) {
LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
SchedulingState schedulingState = checkpointSchedulingState();
@@ -140,7 +130,7 @@ public class ResourceAwareScheduler implements IScheduler {
td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td);
- this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+ this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy "
+ td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
return;
}
@@ -148,15 +138,17 @@ public class ResourceAwareScheduler implements IScheduler {
while (true) {
SchedulingResult result = null;
try {
- //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
- rasStrategy.prepare(new ClusterStateData(this.cluster, this.topologies));
+ // Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
+ // Pass in a copy of scheduling state since the scheduling strategy should not be able to be able to make modifications to
+ // the state of cluster directly
+ rasStrategy.prepare(new SchedulingState(this.schedulingState));
result = rasStrategy.schedule(td);
} catch (Exception ex) {
LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!"
, rasStrategy.getClass().getName(), td.getName()), ex);
topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToInvalid(td);
- this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+ this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
+ rasStrategy.getClass().getName() + ". Please check logs for details");
}
LOG.debug("scheduling result: {}", result);
@@ -165,17 +157,17 @@ public class ResourceAwareScheduler implements IScheduler {
try {
if (mkAssignment(td, result.getSchedulingResultMap())) {
topologySubmitter.moveTopoFromPendingToRunning(td);
- this.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
+ this.schedulingState.cluster.setStatus(td.getId(), "Running - " + result.getMessage());
} else {
topologySubmitter = this.cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
- this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
+ this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
}
} catch (IllegalStateException ex) {
LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex);
topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
- this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
+ this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
}
break;
} else {
@@ -193,7 +185,7 @@ public class ResourceAwareScheduler implements IScheduler {
boolean madeSpace = false;
try {
//need to re prepare since scheduling state might have been restored
- evictionStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ evictionStrategy.prepare(this.schedulingState);
madeSpace = evictionStrategy.makeSpaceForTopo(td);
} catch (Exception ex) {
LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s"
@@ -206,32 +198,32 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.debug("Could not make space for topo {} will move to attempted", td);
topologySubmitter = cleanup(schedulingState, td);
topologySubmitter.moveTopoFromPendingToAttempted(td);
- this.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
+ this.schedulingState.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage());
break;
}
continue;
} else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+ topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
break;
} else {
topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToAttempted(td, this.cluster);
+ topologySubmitter.moveTopoFromPendingToAttempted(td, this.schedulingState.cluster);
break;
}
}
} else {
LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName());
topologySubmitter = cleanup(schedulingState, td);
- topologySubmitter.moveTopoFromPendingToInvalid(td, this.cluster);
+ topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster);
break;
}
}
} else {
LOG.warn("Topology {} is already fully scheduled!", td.getName());
topologySubmitter.moveTopoFromPendingToRunning(td);
- if (this.cluster.getStatusMap().get(td.getId()) == null || this.cluster.getStatusMap().get(td.getId()).equals("")) {
- this.cluster.setStatus(td.getId(), "Fully Scheduled");
+ if (this.schedulingState.cluster.getStatusMap().get(td.getId()) == null || this.schedulingState.cluster.getStatusMap().get(td.getId()).equals("")) {
+ this.schedulingState.cluster.setStatus(td.getId(), "Fully Scheduled");
}
}
}
@@ -239,7 +231,7 @@ public class ResourceAwareScheduler implements IScheduler {
private User cleanup(SchedulingState schedulingState, TopologyDetails td) {
restoreCheckpointSchedulingState(schedulingState);
//since state is restored need the update User topologySubmitter to the new User object in userMap
- return this.userMap.get(td.getTopologySubmitter());
+ return this.schedulingState.userMap.get(td.getTopologySubmitter());
}
private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) {
@@ -255,7 +247,7 @@ public class ResourceAwareScheduler implements IScheduler {
for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
WorkerSlot targetSlot = workerToTasksEntry.getKey();
Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue();
- RAS_Node targetNode = this.nodes.getNodeById(targetSlot.getNodeId());
+ RAS_Node targetNode = this.schedulingState.nodes.getNodeById(targetSlot.getNodeId());
targetSlot = allocateResourceToSlot(td, execsNeedScheduling, targetSlot);
@@ -282,7 +274,7 @@ public class ResourceAwareScheduler implements IScheduler {
td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
//updating resources used for a topology
- this.cluster.setTopologyResources(td.getId(), resources);
+ this.schedulingState.cluster.setTopologyResources(td.getId(), resources);
return true;
} else {
LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName());
@@ -327,11 +319,11 @@ public class ResourceAwareScheduler implements IScheduler {
}
public User getUser(String user) {
- return this.userMap.get(user);
+ return this.schedulingState.userMap.get(user);
}
public Map<String, User> getUserMap() {
- return this.userMap;
+ return this.schedulingState.userMap;
}
/**
@@ -340,8 +332,8 @@ public class ResourceAwareScheduler implements IScheduler {
* @param topologies
* @param cluster
*/
- private void initUsers(Topologies topologies, Cluster cluster) {
- this.userMap = new HashMap<String, User>();
+ private Map<String, User> getUsers(Topologies topologies, Cluster cluster) {
+ Map<String, User> userMap = new HashMap<String, User>();
Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
LOG.debug("userResourcePools: {}", userResourcePools);
@@ -353,27 +345,26 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.error("Cannot determine user for topology {}. Will skip scheduling this topology", td.getName());
continue;
}
- if (!this.userMap.containsKey(topologySubmitter)) {
- this.userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
+ if (!userMap.containsKey(topologySubmitter)) {
+ userMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
}
if (cluster.getUnassignedExecutors(td).size() > 0) {
LOG.debug("adding td: {} to pending queue", td.getName());
- this.userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
+ userMap.get(topologySubmitter).addTopologyToPendingQueue(td);
} else {
LOG.debug("adding td: {} to running queue with existing status: {}", td.getName(), cluster.getStatusMap().get(td.getId()));
- this.userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
+ userMap.get(topologySubmitter).addTopologyToRunningQueue(td);
if (cluster.getStatusMap().get(td.getId()) == null || cluster.getStatusMap().get(td.getId()).equals("")) {
cluster.setStatus(td.getId(), "Fully Scheduled");
}
}
}
+ return userMap;
}
private void initialize(Topologies topologies, Cluster cluster) {
- this.cluster = cluster;
- this.topologies = topologies;
- this.nodes = new RAS_Nodes(this.cluster, this.topologies);
- initUsers(topologies, cluster);
+ Map<String, User> userMap = getUsers(topologies, cluster);
+ this.schedulingState = new SchedulingState(userMap, cluster, topologies, this.conf);
}
/**
@@ -412,35 +403,24 @@ public class ResourceAwareScheduler implements IScheduler {
private SchedulingState checkpointSchedulingState() {
LOG.debug("/*********Checkpoint scheduling state************/");
- for (User user : getUserMap().values()) {
+ for (User user : this.schedulingState.userMap.values()) {
LOG.debug(user.getDetailedInfo());
}
- LOG.debug(ResourceUtils.printScheduling(this.cluster, this.topologies));
- LOG.debug("nodes:\n{}", this.nodes);
+ LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
+ LOG.debug("nodes:\n{}", this.schedulingState.nodes);
LOG.debug("/*********End************/");
- return new SchedulingState(this.userMap, this.cluster, this.topologies, this.nodes, this.conf);
+ return new SchedulingState(this.schedulingState);
}
private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
LOG.debug("/*********restoring scheduling state************/");
//reseting cluster
- //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable
- this.cluster.setAssignments(schedulingState.cluster.getAssignments());
- this.cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
- this.cluster.setStatusMap(schedulingState.cluster.getStatusMap());
- this.cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
- //don't need to explicitly set data structues like Cluster since nothing can really be changed
- //unless this.topologies is set to another object
- this.topologies = schedulingState.topologies;
- this.conf = schedulingState.conf;
- this.userMap = schedulingState.userMap;
- this.nodes = schedulingState.nodes;
-
- for (User user : getUserMap().values()) {
+ this.schedulingState = schedulingState;
+ for (User user : this.schedulingState.userMap.values()) {
LOG.debug(user.getDetailedInfo());
}
- LOG.debug(ResourceUtils.printScheduling(cluster, topologies));
- LOG.debug("nodes:\n{}", this.nodes);
+ LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
+ LOG.debug("nodes:\n{}", this.schedulingState.nodes);
LOG.debug("/*********End************/");
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java
new file mode 100644
index 0000000..8a28ac0
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/SchedulingState.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class that holds the completely scheduling state of Resource Aware Scheduler
+ */
+public class SchedulingState {
+ public final Map<String, User> userMap = new HashMap<String, User>();
+ public final Cluster cluster;
+ public final Topologies topologies;
+ public final RAS_Nodes nodes;
+ public final Map conf = new Config();
+
+ public SchedulingState(Map<String, User> userMap, Cluster cluster, Topologies topologies, Map conf) {
+ for (Map.Entry<String, User> userMapEntry : userMap.entrySet()) {
+ String userId = userMapEntry.getKey();
+ User user = userMapEntry.getValue();
+ this.userMap.put(userId, new User(user));
+ }
+ this.cluster = new Cluster(cluster);
+ this.topologies = new Topologies(topologies);
+ this.nodes = new RAS_Nodes(this.cluster, this.topologies);
+ this.conf.putAll(conf);
+ }
+
+ /**
+ * copy constructor
+ */
+ public SchedulingState(SchedulingState src) {
+ this(src.userMap, src.cluster, src.topologies, src.conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
index 9d450ab..0f5a563 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/User.java
@@ -65,21 +65,23 @@ public class User {
}
}
- public User getCopy() {
- User newUser = new User(this.userId, this.resourcePool);
- for (TopologyDetails topo : this.pendingQueue) {
- newUser.addTopologyToPendingQueue(topo);
+ /**
+ * Copy Constructor
+ */
+ public User(User src) {
+ this(src.userId, src.resourcePool);
+ for (TopologyDetails topo : src.pendingQueue) {
+ addTopologyToPendingQueue(topo);
}
- for (TopologyDetails topo : this.runningQueue) {
- newUser.addTopologyToRunningQueue(topo);
+ for (TopologyDetails topo : src.runningQueue) {
+ addTopologyToRunningQueue(topo);
}
- for (TopologyDetails topo : this.attemptedQueue) {
- newUser.addTopologyToAttemptedQueue(topo);
+ for (TopologyDetails topo : src.attemptedQueue) {
+ addTopologyToAttemptedQueue(topo);
}
- for (TopologyDetails topo : this.invalidQueue) {
- newUser.addTopologyToInvalidQueue(topo);
+ for (TopologyDetails topo : src.invalidQueue) {
+ addTopologyToInvalidQueue(topo);
}
- return newUser;
}
public String getId() {
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 91f0058..182017b 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -19,10 +19,10 @@
package org.apache.storm.scheduler.resource.strategies.eviction;
import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,10 +39,10 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
private RAS_Nodes nodes;
@Override
- public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
- this.cluster = cluster;
- this.userMap = userMap;
- this.nodes = nodes;
+ public void prepare(SchedulingState schedulingState) {
+ this.cluster = schedulingState.cluster;
+ this.userMap = schedulingState.userMap;
+ this.nodes = schedulingState.nodes;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
index e8ba3a9..9499424 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
@@ -18,20 +18,15 @@
package org.apache.storm.scheduler.resource.strategies.eviction;
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.User;
-
-import java.util.Map;
+import org.apache.storm.scheduler.resource.SchedulingState;
public interface IEvictionStrategy {
/**
* Initialization
*/
- public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+ public void prepare(SchedulingState schedulingState);
/**
* This method when invoked should attempt to make space on the cluster so that the topology specified can be scheduled
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index 57ef3ca..e3109d5 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -19,9 +19,8 @@
package org.apache.storm.scheduler.resource.strategies.priority;
import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,9 +35,9 @@ public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStr
private Map<String, User> userMap;
@Override
- public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes) {
- this.cluster = cluster;
- this.userMap = userMap;
+ public void prepare(SchedulingState schedulingState) {
+ this.cluster = schedulingState.cluster;
+ this.userMap = schedulingState.userMap;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
index 63ab919..ffb463f 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -18,20 +18,15 @@
package org.apache.storm.scheduler.resource.strategies.priority;
-import org.apache.storm.scheduler.Cluster;
-import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.User;
-
-import java.util.Map;
+import org.apache.storm.scheduler.resource.SchedulingState;
public interface ISchedulingPriorityStrategy {
/**
* initializes
*/
- public void prepare(Topologies topologies, Cluster cluster, Map<String, User> userMap, RAS_Nodes nodes);
+ public void prepare(SchedulingState schedulingState);
/**
* Gets the next topology to schedule
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
index 9ecba47..9a12f80 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -30,9 +30,12 @@ import java.util.TreeMap;
import java.util.HashSet;
import java.util.Iterator;
-import org.apache.storm.scheduler.resource.ClusterStateData.NodeDetails;
-import org.apache.storm.scheduler.resource.ClusterStateData;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,25 +47,21 @@ import org.apache.storm.scheduler.resource.Component;
public class DefaultResourceAwareStrategy implements IStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
- private ClusterStateData _clusterStateData;
- //Map key is the supervisor id and the value is the corresponding RAS_Node Object
- private Map<String, NodeDetails> _availNodes;
- private NodeDetails refNode = null;
- /**
- * supervisor id -> Node
- */
- private Map<String, NodeDetails> _nodes;
+ private Cluster _cluster;
+ private Topologies _topologies;
+ private RAS_Node refNode = null;
private Map<String, List<String>> _clusterInfo;
+ private RAS_Nodes _nodes;
private final double CPU_WEIGHT = 1.0;
private final double MEM_WEIGHT = 1.0;
private final double NETWORK_WEIGHT = 1.0;
- public void prepare (ClusterStateData clusterStateData) {
- _clusterStateData = clusterStateData;
- _nodes = clusterStateData.nodes;
- _availNodes = this.getAvailNodes();
- _clusterInfo = _clusterStateData.getNetworkTopography();
+ public void prepare (SchedulingState schedulingState) {
+ _cluster = schedulingState.cluster;
+ _topologies = schedulingState.topologies;
+ _nodes = schedulingState.nodes;
+ _clusterInfo = schedulingState.cluster.getNetworkTopography();
LOG.debug(this.getClusterInfo());
}
@@ -84,11 +83,11 @@ public class DefaultResourceAwareStrategy implements IStrategy {
}
public SchedulingResult schedule(TopologyDetails td) {
- if (_availNodes.size() <= 0) {
+ if (_nodes.getNodes().size() <= 0) {
LOG.warn("No available nodes to schedule tasks on!");
return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
}
- Collection<ExecutorDetails> unassignedExecutors = _clusterStateData.getUnassignedExecutors(td.getId());
+ Collection<ExecutorDetails> unassignedExecutors = _cluster.getUnassignedExecutors(td);
Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap = new HashMap<>();
LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
@@ -149,7 +148,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
Collection<ExecutorDetails>> schedulerAssignmentMap, Collection<ExecutorDetails> scheduledTasks) {
WorkerSlot targetSlot = this.findWorkerForExec(exec, td, schedulerAssignmentMap);
if (targetSlot != null) {
- NodeDetails targetNode = this.idToNode(targetSlot.getNodeId());
+ RAS_Node targetNode = this.idToNode(targetSlot.getNodeId());
if (!schedulerAssignmentMap.containsKey(targetSlot)) {
schedulerAssignmentMap.put(targetSlot, new LinkedList<ExecutorDetails>());
}
@@ -189,7 +188,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String clusterId, Map<WorkerSlot, Collection<ExecutorDetails>> scheduleAssignmentMap) {
double taskMem = td.getTotalMemReqTask(exec);
double taskCPU = td.getTotalCpuReqTask(exec);
- List<NodeDetails> nodes;
+ List<RAS_Node> nodes;
if(clusterId != null) {
nodes = this.getAvailableNodesFromCluster(clusterId);
@@ -197,8 +196,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
nodes = this.getAvailableNodes();
}
//First sort nodes by distance
- TreeMap<Double, NodeDetails> nodeRankMap = new TreeMap<>();
- for (NodeDetails n : nodes) {
+ TreeMap<Double, RAS_Node> nodeRankMap = new TreeMap<>();
+ for (RAS_Node n : nodes) {
if(n.getFreeSlots().size()>0) {
if (n.getAvailableMemoryResources() >= taskMem
&& n.getAvailableCpuResources() >= taskCPU) {
@@ -217,8 +216,8 @@ public class DefaultResourceAwareStrategy implements IStrategy {
}
}
//Then, pick worker from closest node that satisfy constraints
- for(Map.Entry<Double, NodeDetails> entry : nodeRankMap.entrySet()) {
- NodeDetails n = entry.getValue();
+ for(Map.Entry<Double, RAS_Node> entry : nodeRankMap.entrySet()) {
+ RAS_Node n = entry.getValue();
for(WorkerSlot ws : n.getFreeSlots()) {
if(checkWorkerConstraints(exec, ws, td, scheduleAssignmentMap)) {
return ws;
@@ -245,15 +244,15 @@ public class DefaultResourceAwareStrategy implements IStrategy {
private Double getTotalClusterRes(List<String> cluster) {
Double res = 0.0;
for (String node : cluster) {
- res += _availNodes.get(this.NodeHostnameToId(node))
+ res += _nodes.getNodeById(this.NodeHostnameToId(node))
.getAvailableMemoryResources()
- + _availNodes.get(this.NodeHostnameToId(node))
+ + _nodes.getNodeById(this.NodeHostnameToId(node))
.getAvailableCpuResources();
}
return res;
}
- private Double distToNode(NodeDetails src, NodeDetails dest) {
+ private Double distToNode(RAS_Node src, RAS_Node dest) {
if (src.getId().equals(dest.getId())) {
return 0.0;
} else if (this.NodeToCluster(src).equals(this.NodeToCluster(dest))) {
@@ -263,7 +262,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
}
}
- private String NodeToCluster(NodeDetails node) {
+ private String NodeToCluster(RAS_Node node) {
for (Entry<String, List<String>> entry : _clusterInfo
.entrySet()) {
if (entry.getValue().contains(node.getHostname())) {
@@ -274,27 +273,27 @@ public class DefaultResourceAwareStrategy implements IStrategy {
return null;
}
- private List<NodeDetails> getAvailableNodes() {
- LinkedList<NodeDetails> nodes = new LinkedList<>();
+ private List<RAS_Node> getAvailableNodes() {
+ LinkedList<RAS_Node> nodes = new LinkedList<>();
for (String clusterId : _clusterInfo.keySet()) {
nodes.addAll(this.getAvailableNodesFromCluster(clusterId));
}
return nodes;
}
- private List<NodeDetails> getAvailableNodesFromCluster(String clus) {
- List<NodeDetails> retList = new ArrayList<>();
+ private List<RAS_Node> getAvailableNodesFromCluster(String clus) {
+ List<RAS_Node> retList = new ArrayList<>();
for (String node_id : _clusterInfo.get(clus)) {
- retList.add(_availNodes.get(this
+ retList.add(_nodes.getNodeById(this
.NodeHostnameToId(node_id)));
}
return retList;
}
private List<WorkerSlot> getAvailableWorkersFromCluster(String clusterId) {
- List<NodeDetails> nodes = this.getAvailableNodesFromCluster(clusterId);
+ List<RAS_Node> nodes = this.getAvailableNodesFromCluster(clusterId);
List<WorkerSlot> workers = new LinkedList<>();
- for(NodeDetails node : nodes) {
+ for(RAS_Node node : nodes) {
workers.addAll(node.getFreeSlots());
}
return workers;
@@ -309,13 +308,6 @@ public class DefaultResourceAwareStrategy implements IStrategy {
}
/**
- * In case in the future RAS can only use a subset of nodes
- */
- private Map<String, NodeDetails> getAvailNodes() {
- return _nodes;
- }
-
- /**
* Breadth first traversal of the topology DAG
* @param td
* @param spouts
@@ -429,7 +421,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
String clusterId = clusterEntry.getKey();
retVal += "Rack: " + clusterId + "\n";
for(String nodeHostname : clusterEntry.getValue()) {
- NodeDetails node = this.idToNode(this.NodeHostnameToId(nodeHostname));
+ RAS_Node node = this.idToNode(this.NodeHostnameToId(nodeHostname));
retVal += "-> Node: " + node.getHostname() + " " + node.getId() + "\n";
retVal += "--> Avail Resources: {Mem " + node.getAvailableMemoryResources() + ", CPU " + node.getAvailableCpuResources() + "}\n";
retVal += "--> Total Resources: {Mem " + node.getTotalMemoryResources() + ", CPU " + node.getTotalCpuResources() + "}\n";
@@ -444,7 +436,7 @@ public class DefaultResourceAwareStrategy implements IStrategy {
* @return the id of a node
*/
public String NodeHostnameToId(String hostname) {
- for (NodeDetails n : _nodes.values()) {
+ for (RAS_Node n : _nodes.getNodes()) {
if (n.getHostname() == null) {
continue;
}
@@ -461,11 +453,11 @@ public class DefaultResourceAwareStrategy implements IStrategy {
* @param id
* @return a RAS_Node object
*/
- public NodeDetails idToNode(String id) {
- if(_nodes.containsKey(id) == false) {
+ public RAS_Node idToNode(String id) {
+ RAS_Node ret = _nodes.getNodeById(id);
+ if(ret == null) {
LOG.error("Cannot find Node with Id: {}", id);
- return null;
}
- return _nodes.get(id);
+ return ret;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
index 4a1180a..b3b6305 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -19,8 +19,8 @@
package org.apache.storm.scheduler.resource.strategies.scheduling;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.ClusterStateData;
import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingState;
/**
* An interface to for implementing different scheduling strategies for the resource aware scheduling
@@ -31,7 +31,7 @@ public interface IStrategy {
/**
* initialize prior to scheduling
*/
- void prepare(ClusterStateData clusterStateData);
+ void prepare(SchedulingState schedulingState);
/**
* This method is invoked to calcuate a scheduling for topology td
@@ -40,6 +40,8 @@ public interface IStrategy {
* The strategy must calculate a scheduling in the format of Map<WorkerSlot, Collection<ExecutorDetails>> where the key of
* this map is the worker slot that the value (collection of executors) should be assigned to.
* if a scheduling is calculated successfully, put the scheduling map in the SchedulingResult object.
+ * PLEASE NOTE: Any other operations done on the cluster from a scheduling strategy will NOT persist or be realized.
+ * The data structures passed in can be used in any way necessary to assist in calculating a scheduling, but will NOT actually change the state of the cluster.
*/
SchedulingResult schedule(TopologyDetails td);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bf99ae51/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
index c4c1b3b..d1c261b 100644
--- a/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
+++ b/storm-core/test/jvm/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java
@@ -1385,4 +1385,49 @@ public class TestResourceAwareScheduler {
}
}
}
+
+ /**
+ * When the first topology failed to be scheduled make sure subsequent schedulings can still succeed
+ */
+ @Test
+ public void TestSchedulingAfterFailedScheduling() {
+ INimbus iNimbus = new TestUtilsForResourceAwareScheduler.INimbusTest();
+ Map<String, Number> resourceMap = new HashMap<String, Number>();
+ resourceMap.put(Config.SUPERVISOR_CPU_CAPACITY, 100.0);
+ resourceMap.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, 1000.0);
+ Map<String, SupervisorDetails> supMap = TestUtilsForResourceAwareScheduler.genSupervisors(8, 4, resourceMap);
+ Config config = new Config();
+ config.putAll(Utils.readDefaultConfig());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config.put(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+ config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 100.0);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 500);
+ config.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 500);
+
+ Cluster cluster = new Cluster(iNimbus, supMap, new HashMap<String, SchedulerAssignmentImpl>(), config);
+
+ config.put(Config.TOPOLOGY_SUBMITTER_USER, "jerry");
+
+ TopologyDetails topo1 = TestUtilsForResourceAwareScheduler.getTopology("topo-1", config, 8, 0, 2, 0, currentTime - 2, 10);
+ TopologyDetails topo2 = TestUtilsForResourceAwareScheduler.getTopology("topo-2", config, 2, 0, 2, 0, currentTime - 2, 20);
+ TopologyDetails topo3 = TestUtilsForResourceAwareScheduler.getTopology("topo-3", config, 1, 2, 1, 1, currentTime - 2, 20);
+
+ Map<String, TopologyDetails> topoMap = new HashMap<String, TopologyDetails>();
+ topoMap.put(topo1.getId(), topo1);
+ topoMap.put(topo2.getId(), topo2);
+ topoMap.put(topo3.getId(), topo3);
+
+ Topologies topologies = new Topologies(topoMap);
+
+ ResourceAwareScheduler rs = new ResourceAwareScheduler();
+
+ rs.prepare(config);
+ rs.schedule(topologies, cluster);
+
+ Assert.assertTrue("Topo-2 scheduled?", cluster.getAssignmentById(topo2.getId()) != null);
+ Assert.assertEquals("Topo-2 all executors scheduled?", 4, cluster.getAssignmentById(topo2.getId()).getExecutorToSlot().size());
+ Assert.assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(topo3.getId()) != null);
+ Assert.assertEquals("Topo-3 all executors scheduled?", 3, cluster.getAssignmentById(topo3.getId()).getExecutorToSlot().size());
+ }
}
[2/2] storm git commit: Added STORM-1634 to Changelog
Posted by bo...@apache.org.
Added STORM-1634 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/20ba5189
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/20ba5189
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/20ba5189
Branch: refs/heads/1.x-branch
Commit: 20ba5189fdf8cc5894f163a21c194501de69542d
Parents: bf99ae5
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Tue Mar 29 11:11:28 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Tue Mar 29 11:12:25 2016 -0500
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/20ba5189/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3c36a3b..8fb0902 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.0
+ * STORM-1634: Refactoring of Resource Aware Scheduler
* STORM-1030: Hive Connector Fixes
* STORM-676: Storm Trident support for sliding/tumbling windows
* STORM-1630: Add guide page for Windows users