You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/31 02:07:03 UTC
[04/20] storm git commit: STORM-2497: Let Supervisor enforce memory
and add in support for shared memory regions
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
new file mode 100644
index 0000000..703f4b8
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/SchedulingStatus.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.EnumSet;
+
+public enum SchedulingStatus {
+ SUCCESS,
+ FAIL_NOT_ENOUGH_RESOURCES,
+ FAIL_INVALID_TOPOLOGY,
+ FAIL_OTHER;
+
+ public static EnumSet<SchedulingStatus> success = EnumSet.of(SUCCESS);
+ public static EnumSet<SchedulingStatus> failure = EnumSet.of(FAIL_INVALID_TOPOLOGY, FAIL_NOT_ENOUGH_RESOURCES,
+ FAIL_OTHER);
+
+ public static boolean isStatusSuccess(SchedulingStatus status) {
+ return success.contains(status);
+ }
+
+ public static boolean isStatusFailure(SchedulingStatus status) {
+ return failure.contains(status);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
new file mode 100644
index 0000000..4850ebb
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.storm.daemon.nimbus.TopologyResources;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ISchedulingState;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+
+public class User {
+ private String userId;
+
+ //Topologies that was deemed to be invalid
+ private final Set<TopologyDetails> unsuccess = new HashSet<>();
+
+ private final double cpuGuarantee;
+ private final double memoryGuarantee;
+
+ public User(String userId) {
+ this(userId, 0, 0);
+ }
+
+ public User(String userId, Map<String, Double> resourcePool) {
+ this(
+ userId,
+ resourcePool == null ? 0.0 : resourcePool.getOrDefault("cpu", 0.0),
+ resourcePool == null ? 0.0 : resourcePool.getOrDefault("memory", 0.0));
+ }
+
+ private User(String userId, double cpuGuarantee, double memoryGuarantee) {
+ this.userId = userId;
+ this.cpuGuarantee = cpuGuarantee;
+ this.memoryGuarantee = memoryGuarantee;
+ }
+
+ public String getId() {
+ return this.userId;
+ }
+
+ public TreeSet<TopologyDetails> getRunningTopologies(ISchedulingState cluster) {
+ TreeSet<TopologyDetails> ret =
+ new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+ if (!cluster.needsSchedulingRas(td)) {
+ ret.add(td);
+ }
+ }
+ return ret;
+ }
+
+ public TreeSet<TopologyDetails> getPendingTopologies(ISchedulingState cluster) {
+ TreeSet<TopologyDetails> ret =
+ new TreeSet<TopologyDetails>(new PQsortByPriorityAndSubmittionTime());
+ for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+ if (cluster.needsSchedulingRas(td) && !unsuccess.contains(td)) {
+ ret.add(td);
+ }
+ }
+ return ret;
+ }
+
+ public void markTopoUnsuccess(TopologyDetails topo, Cluster cluster) {
+ unsuccess.add(topo);
+ if (cluster != null) {
+ cluster.setStatus(topo.getId(), "Scheduling Attempted but topology is invalid");
+ }
+ }
+
+ public void markTopoUnsuccess(TopologyDetails topo) {
+ this.markTopoUnsuccess(topo, null);
+ }
+
+ public double getResourcePoolAverageUtilization(ISchedulingState cluster) {
+ double cpuResourcePoolUtilization = getCpuResourcePoolUtilization(cluster);
+ double memoryResourcePoolUtilization = getMemoryResourcePoolUtilization(cluster);
+
+ //cannot be (cpuResourcePoolUtilization + memoryResourcePoolUtilization)/2
+ //since memoryResourcePoolUtilization or cpuResourcePoolUtilization can be Double.MAX_VALUE
+ //Should not return infinity in that case
+ return ((cpuResourcePoolUtilization) / 2.0) + ((memoryResourcePoolUtilization) / 2.0);
+ }
+
+ public double getCpuResourcePoolUtilization(ISchedulingState cluster) {
+ if (cpuGuarantee == 0.0) {
+ return Double.MAX_VALUE;
+ }
+ return getCpuResourceUsedByUser(cluster) / cpuGuarantee;
+ }
+
+ public double getMemoryResourcePoolUtilization(ISchedulingState cluster) {
+ if (memoryGuarantee == 0.0) {
+ return Double.MAX_VALUE;
+ }
+ return getMemoryResourceUsedByUser(cluster) / memoryGuarantee;
+ }
+
+ public double getCpuResourceUsedByUser(ISchedulingState cluster) {
+ double sum = 0.0;
+ for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+ SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
+ if (assignment != null) {
+ TopologyResources tr = new TopologyResources(td, assignment);
+ sum += tr.getAssignedCpu();
+ }
+ }
+ return sum;
+ }
+
+ public double getMemoryResourceUsedByUser(ISchedulingState cluster) {
+ double sum = 0.0;
+ for (TopologyDetails td : cluster.getTopologies().getTopologiesOwnedBy(userId)) {
+ SchedulerAssignment assignment = cluster.getAssignmentById(td.getId());
+ if (assignment != null) {
+ TopologyResources tr = new TopologyResources(td, assignment);
+ sum += tr.getAssignedMemOnHeap() + tr.getAssignedMemOffHeap();
+ }
+ }
+ return sum;
+ }
+
+ public double getMemoryResourceGuaranteed() {
+ return memoryGuarantee;
+ }
+
+ public double getCpuResourceGuaranteed() {
+ return cpuGuarantee;
+ }
+
+ public TopologyDetails getNextTopologyToSchedule(ISchedulingState cluster) {
+ for (TopologyDetails topo : getPendingTopologies(cluster)) {
+ return topo;
+ }
+ return null;
+ }
+
+ public boolean hasTopologyNeedSchedule(ISchedulingState cluster) {
+ return (!this.getPendingTopologies(cluster).isEmpty());
+ }
+
+ public TopologyDetails getRunningTopologyWithLowestPriority(ISchedulingState cluster) {
+ TreeSet<TopologyDetails> queue = getRunningTopologies(cluster);
+ if (queue.isEmpty()) {
+ return null;
+ }
+ return queue.last();
+ }
+
+ @Override
+ public int hashCode() {
+ return this.userId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof User)) {
+ return false;
+ }
+ return this.getId().equals(((User) other).getId());
+ }
+
+ @Override
+ public String toString() {
+ return this.userId;
+ }
+
+ public String getDetailedInfo() {
+ String ret = "\nUser: " + userId;
+ ret += "\n - " + " Resource Pool: " + cpuGuarantee + "% " + memoryGuarantee + "MB ";
+ ret += "\n - " + " Unsuccess Queue: " + unsuccess + " size: " + unsuccess.size();
+ return ret;
+ }
+
+ public static String getResourcePoolAverageUtilizationForUsers(
+ Collection<User> users, Cluster cluster) {
+ String ret = "";
+ for (User user : users) {
+ ret += user.getId() + " - " + user.getResourcePoolAverageUtilization(cluster) + " ";
+ }
+ return ret;
+ }
+
+ /**
+ * Comparator that sorts topologies by priority and then by submission time First sort by Topology
+ * Priority, if there is a tie for topology priority, topology uptime is used to sort.
+ */
+ static class PQsortByPriorityAndSubmittionTime implements Comparator<TopologyDetails> {
+
+ public int compare(TopologyDetails topo1, TopologyDetails topo2) {
+ if (topo1.getTopologyPriority() > topo2.getTopologyPriority()) {
+ return 1;
+ } else if (topo1.getTopologyPriority() < topo2.getTopologyPriority()) {
+ return -1;
+ } else {
+ if (topo1.getUpTime() > topo2.getUpTime()) {
+ return -1;
+ } else if (topo1.getUpTime() < topo2.getUpTime()) {
+ return 1;
+ } else {
+ return topo1.getId().compareTo(topo2.getId());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index 182017b..3abc65e 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -18,18 +18,17 @@
package org.apache.storm.scheduler.resource.strategies.eviction;
+import java.util.Collection;
+import java.util.Map;
+
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RAS_Nodes;
-import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Map;
-
public class DefaultEvictionStrategy implements IEvictionStrategy {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultEvictionStrategy.class);
@@ -39,49 +38,35 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
private RAS_Nodes nodes;
@Override
- public void prepare(SchedulingState schedulingState) {
- this.cluster = schedulingState.cluster;
- this.userMap = schedulingState.userMap;
- this.nodes = schedulingState.nodes;
- }
-
- @Override
- public boolean makeSpaceForTopo(TopologyDetails td) {
+ public boolean makeSpaceForTopo(TopologyDetails td, Cluster schedulingState, Map<String, User> userMap) {
+ this.cluster = schedulingState;
+ this.userMap = userMap;
+ this.nodes = new RAS_Nodes(schedulingState);
LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter());
User submitter = this.userMap.get(td.getTopologySubmitter());
- if (submitter.getCPUResourceGuaranteed() == null || submitter.getMemoryResourceGuaranteed() == null
- || submitter.getCPUResourceGuaranteed() == 0.0 || submitter.getMemoryResourceGuaranteed() == 0.0) {
+ if (submitter.getCpuResourceGuaranteed() == 0.0 || submitter.getMemoryResourceGuaranteed() == 0.0) {
return false;
}
- double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
- double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
+ double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCpuResourceGuaranteed();
+ double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap())
+ / submitter.getMemoryResourceGuaranteed();
User evictUser = this.findUserWithHighestAverageResourceUtilAboveGuarantee();
//check if user has enough resource under his or her resource guarantee to schedule topology
- if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
+ if ((1.0 - submitter.getCpuResourcePoolUtilization(schedulingState)) >= cpuNeeded
+ && (1.0 - submitter.getMemoryResourcePoolUtilization(schedulingState)) >= memoryNeeded) {
if (evictUser != null) {
- TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
- LOG.debug("Running Topology {} from user {} is still within user's resource guarantee thus, POTENTIALLY evicting Topology {} from user {} since:" +
- "\n(1.0 - submitter.getCPUResourcePoolUtilization()) = {} >= cpuNeeded = {}" +
- "\nand" +
- "\n(1.0 - submitter.getMemoryResourcePoolUtilization()) = {} >= memoryNeeded = {}"
- ,td, submitter, topologyEvict, evictUser, (1.0 - submitter.getCPUResourcePoolUtilization())
- , cpuNeeded, (1.0 - submitter.getMemoryResourcePoolUtilization()), memoryNeeded);
+ TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority(schedulingState);
evictTopology(topologyEvict);
return true;
}
} else {
if (evictUser != null) {
- if ((evictUser.getResourcePoolAverageUtilization() - 1.0) > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0))) {
- TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority();
- LOG.debug("POTENTIALLY Evicting Topology {} from user {} since:" +
- "\n((evictUser.getResourcePoolAverageUtilization() - 1.0) = {}" +
- "\n(cpuNeeded + memoryNeeded) / 2) = {} and (submitter.getResourcePoolAverageUtilization() - 1.0)) = {} Thus," +
- "\n(evictUser.getResourcePoolAverageUtilization() - 1.0) = {} > (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)) = {}"
- ,topologyEvict, evictUser, (evictUser.getResourcePoolAverageUtilization() - 1.0), ((cpuNeeded + memoryNeeded) / 2)
- , (submitter.getResourcePoolAverageUtilization() - 1.0), (evictUser.getResourcePoolAverageUtilization() - 1.0)
- , (((cpuNeeded + memoryNeeded) / 2) + (submitter.getResourcePoolAverageUtilization() - 1.0)));
+ if ((evictUser.getResourcePoolAverageUtilization(schedulingState) - 1.0)
+ > (((cpuNeeded + memoryNeeded) / 2)
+ + (submitter.getResourcePoolAverageUtilization(schedulingState) - 1.0))) {
+ TopologyDetails topologyEvict = evictUser.getRunningTopologyWithLowestPriority(schedulingState);
evictTopology(topologyEvict);
return true;
}
@@ -90,12 +75,12 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
//See if there is a lower priority topology that can be evicted from the current user
//topologies should already be sorted in order of increasing priority.
//Thus, topology at the front of the queue has the lowest priority
- for (TopologyDetails topo : submitter.getTopologiesRunning()) {
+ for (TopologyDetails topo : submitter.getRunningTopologies(schedulingState)) {
//check to if there is a topology with a lower priority we can evict
if (topo.getTopologyPriority() > td.getTopologyPriority()) {
- LOG.debug("POTENTIALLY Evicting Topology {} from user {} (itself) since topology {} has a lower priority than topology {}"
- , topo, submitter, topo, td);
- evictTopology(topo);
+ LOG.debug("POTENTIALLY Evicting Topology {} from user {} (itself) since topology {} has a lower "
+ + "priority than topology {}", topo, submitter, topo, td);
+ evictTopology(topo);
return true;
}
}
@@ -104,19 +89,18 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
private void evictTopology(TopologyDetails topologyEvict) {
Collection<WorkerSlot> workersToEvict = this.cluster.getUsedSlotsByTopologyId(topologyEvict.getId());
- User submitter = this.userMap.get(topologyEvict.getTopologySubmitter());
- LOG.info("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict, topologyEvict.getTopologySubmitter());
+ LOG.info("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict,
+ topologyEvict.getTopologySubmitter());
this.nodes.freeSlots(workersToEvict);
- submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
}
private User findUserWithHighestAverageResourceUtilAboveGuarantee() {
double most = 0.0;
User mostOverUser = null;
for (User user : this.userMap.values()) {
- double over = user.getResourcePoolAverageUtilization() - 1.0;
- if ((over > most) && (!user.getTopologiesRunning().isEmpty())) {
+ double over = user.getResourcePoolAverageUtilization(cluster) - 1.0;
+ if ((over > most) && (!user.getRunningTopologies(cluster).isEmpty())) {
most = over;
mostOverUser = user;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
index 9499424..2947f7a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/eviction/IEvictionStrategy.java
@@ -18,25 +18,20 @@
package org.apache.storm.scheduler.resource.strategies.eviction;
+import java.util.Map;
+
+import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.SchedulingState;
+import org.apache.storm.scheduler.resource.User;
public interface IEvictionStrategy {
-
- /**
- * Initialization
- */
- public void prepare(SchedulingState schedulingState);
-
/**
* This method when invoked should attempt to make space on the cluster so that the topology specified can be scheduled
* @param td the topology to make space for
* @return return true to indicate that space has been made for topology and try schedule topology td again.
- * Return false to inidcate that no space could be made for the topology on the cluster and the scheduler should give up
+ * Return false to indicate that no space could be made for the topology on the cluster and the scheduler should give up
* trying to schedule the topology for this round of scheduling. This method will be invoked until the topology indicated
* could be scheduled or the method returns false
*/
- public boolean makeSpaceForTopo(TopologyDetails td);
-
-
+ public boolean makeSpaceForTopo(TopologyDetails td, Cluster schedulingState, Map<String, User> userMap);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
index e3109d5..b6d8ada 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/DefaultSchedulingPriorityStrategy.java
@@ -18,55 +18,46 @@
package org.apache.storm.scheduler.resource.strategies.priority;
-import org.apache.storm.scheduler.Cluster;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.SchedulingState;
import org.apache.storm.scheduler.resource.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public class DefaultSchedulingPriorityStrategy implements ISchedulingPriorityStrategy {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultSchedulingPriorityStrategy.class);
- private Cluster cluster;
- private Map<String, User> userMap;
-
@Override
- public void prepare(SchedulingState schedulingState) {
- this.cluster = schedulingState.cluster;
- this.userMap = schedulingState.userMap;
- }
-
- @Override
- public TopologyDetails getNextTopologyToSchedule() {
- User nextUser = this.getNextUser();
+ public TopologyDetails getNextTopologyToSchedule(ISchedulingState schedulingState, Map<String, User> userMap) {
+ User nextUser = getNextUser(schedulingState, userMap);
if (nextUser == null) {
return null;
}
- return nextUser.getNextTopologyToSchedule();
+ return nextUser.getNextTopologyToSchedule(schedulingState);
}
- public User getNextUser() {
- Double least = Double.POSITIVE_INFINITY;
+ public User getNextUser(ISchedulingState cluster, Map<String, User> userMap) {
+ double least = Double.POSITIVE_INFINITY;
User ret = null;
- for (User user : this.userMap.values()) {
- if (user.hasTopologyNeedSchedule()) {
- Double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization();
+ final double totalCpu = cluster.getClusterTotalCpuResource();
+ final double totalMem = cluster.getClusterTotalMemoryResource();
+ for (User user : userMap.values()) {
+ if (user.hasTopologyNeedSchedule(cluster)) {
+ double userResourcePoolAverageUtilization = user.getResourcePoolAverageUtilization(cluster);
if (least > userResourcePoolAverageUtilization) {
ret = user;
least = userResourcePoolAverageUtilization;
- }
- // if ResourcePoolAverageUtilization is equal to the user that is being compared
- else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
- double currentCpuPercentage = ret.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
- double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+ } else if (Math.abs(least - userResourcePoolAverageUtilization) < 0.0001) {
+ // if ResourcePoolAverageUtilization is equal to the user that is being compared
+ double currentCpuPercentage = ret.getCpuResourceGuaranteed() / totalCpu;
+ double currentMemoryPercentage = ret.getMemoryResourceGuaranteed() / totalMem;
double currentAvgPercentage = (currentCpuPercentage + currentMemoryPercentage) / 2.0;
- double userCpuPercentage = user.getCPUResourceGuaranteed() / this.cluster.getClusterTotalCPUResource();
- double userMemoryPercentage = user.getMemoryResourceGuaranteed() / this.cluster.getClusterTotalMemoryResource();
+ double userCpuPercentage = user.getCpuResourceGuaranteed() / totalCpu;
+ double userMemoryPercentage = user.getMemoryResourceGuaranteed() / totalMem;
double userAvgPercentage = (userCpuPercentage + userMemoryPercentage) / 2.0;
if (userAvgPercentage > currentAvgPercentage) {
ret = user;
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
index ffb463f..8654bb3 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/ISchedulingPriorityStrategy.java
@@ -18,19 +18,16 @@
package org.apache.storm.scheduler.resource.strategies.priority;
+import java.util.Map;
+
+import org.apache.storm.scheduler.ISchedulingState;
import org.apache.storm.scheduler.TopologyDetails;
-import org.apache.storm.scheduler.resource.SchedulingState;
+import org.apache.storm.scheduler.resource.User;
public interface ISchedulingPriorityStrategy {
-
- /**
- * initializes
- */
- public void prepare(SchedulingState schedulingState);
-
/**
* Gets the next topology to schedule
* @return return the next topology to schedule. If there is no topologies left to schedule, return null
*/
- public TopologyDetails getNextTopologyToSchedule();
+ public TopologyDetails getNextTopologyToSchedule(ISchedulingState schedulingState, Map<String, User> userMap);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
new file mode 100644
index 0000000..650f3df
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/DefaultResourceAwareStrategy.java
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.storm.generated.ComponentType;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.Component;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.ResourceUtils;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultResourceAwareStrategy implements IStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultResourceAwareStrategy.class);
+ private Cluster cluster;
+ private Map<String, List<String>> networkTopography;
+ private RAS_Nodes nodes;
+
+ private TreeSet<ObjectResources> sortedRacks = null;
+ private Map<String, TreeSet<ObjectResources>> rackIdToSortedNodes = new HashMap<>();
+
+ @VisibleForTesting
+ void prepare(Cluster cluster) {
+ this.cluster = cluster;
+ nodes = new RAS_Nodes(cluster);
+ networkTopography = cluster.getNetworkTopography();
+ logClusterInfo();
+ }
+
+ @Override
+ public void prepare(Map<String, Object> config) {
+ //NOOP
+ }
+
+ @Override
+ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+ prepare(cluster);
+ if (nodes.getNodes().size() <= 0) {
+ LOG.warn("No available nodes to schedule tasks on!");
+ return SchedulingResult.failure(
+ SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
+ }
+ Collection<ExecutorDetails> unassignedExecutors =
+ new HashSet<>(this.cluster.getUnassignedExecutors(td));
+ LOG.debug("ExecutorsNeedScheduling: {}", unassignedExecutors);
+ Collection<ExecutorDetails> scheduledTasks = new ArrayList<>();
+ List<Component> spouts = this.getSpouts(td);
+
+ if (spouts.size() == 0) {
+ LOG.error("Cannot find a Spout!");
+ return SchedulingResult.failure(
+ SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
+ }
+
+ //order executors to be scheduled
+ List<ExecutorDetails> orderedExecutors = orderExecutors(td, unassignedExecutors);
+
+ Collection<ExecutorDetails> executorsNotScheduled = new HashSet<>(unassignedExecutors);
+
+ for (ExecutorDetails exec : orderedExecutors) {
+ LOG.debug(
+ "Attempting to schedule: {} of component {}[ REQ {} ]",
+ exec,
+ td.getExecutorToComponent().get(exec),
+ td.getTaskResourceReqList(exec));
+ scheduleExecutor(exec, td, scheduledTasks);
+ }
+
+ executorsNotScheduled.removeAll(scheduledTasks);
+ LOG.debug("/* Scheduling left over task (most likely sys tasks) */");
+ // schedule left over system tasks
+ for (ExecutorDetails exec : executorsNotScheduled) {
+ scheduleExecutor(exec, td, scheduledTasks);
+ }
+
+ SchedulingResult result;
+ executorsNotScheduled.removeAll(scheduledTasks);
+ if (executorsNotScheduled.size() > 0) {
+ LOG.error("Not all executors successfully scheduled: {}", executorsNotScheduled);
+ result =
+ SchedulingResult.failure(
+ SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+ (td.getExecutors().size() - unassignedExecutors.size())
+ + "/"
+ + td.getExecutors().size()
+ + " executors scheduled");
+ } else {
+ LOG.debug("All resources successfully scheduled!");
+ result = SchedulingResult.success("Fully Scheduled by DefaultResourceAwareStrategy");
+ }
+ return result;
+ }
+
+ /**
+ * Schedule executor exec from topology td.
+ *
+ * @param exec the executor to schedule
+ * @param td the topology executor exec is a part of
+ * @param scheduledTasks executors that have been scheduled
+ */
+ private void scheduleExecutor(
+ ExecutorDetails exec, TopologyDetails td, Collection<ExecutorDetails> scheduledTasks) {
+ WorkerSlot targetSlot = findWorkerForExec(exec, td);
+ if (targetSlot != null) {
+ RAS_Node targetNode = idToNode(targetSlot.getNodeId());
+ targetNode.assignSingleExecutor(targetSlot, exec, td);
+ scheduledTasks.add(exec);
+ LOG.debug(
+ "TASK {} assigned to Node: {} avail [ mem: {} cpu: {} ] total [ mem: {} cpu: {} ] on "
+ + "slot: {} on Rack: {}",
+ exec,
+ targetNode.getHostname(),
+ targetNode.getAvailableMemoryResources(),
+ targetNode.getAvailableCpuResources(),
+ targetNode.getTotalMemoryResources(),
+ targetNode.getTotalCpuResources(),
+ targetSlot,
+ nodeToRack(targetNode));
+ } else {
+ LOG.error("Not Enough Resources to schedule Task {}", exec);
+ }
+ }
+
+ /**
+ * Find a worker to schedule executor exec on
+ *
+ * @param exec the executor to schedule
+ * @param td the topology that the executor is a part of
+ * @return a worker to assign exec on. Returns null if a worker cannot be successfully found in cluster
+ */
+ private WorkerSlot findWorkerForExec(ExecutorDetails exec, TopologyDetails td) {
+ WorkerSlot ws = null;
+
+ // iterate through an ordered list of all racks available to make sure we cannot schedule
+ // the first executor in any rack before we "give up" the list is ordered in decreasing order
+ // of effective resources. With the rack in the front of the list having the most effective
+ // resources.
+ if (sortedRacks == null) {
+ sortedRacks = sortRacks(td.getId());
+ }
+
+ for (ObjectResources rack : sortedRacks) {
+ ws = this.getBestWorker(exec, td, rack.id);
+ if (ws != null) {
+ LOG.debug("best rack: {}", rack.id);
+ break;
+ }
+ }
+ return ws;
+ }
+
+ /**
+ * Get the best worker to assign executor exec on a rack
+ *
+ * @param exec the executor to schedule
+ * @param td the topology that the executor is a part of
+ * @param rackId the rack id of the rack to find a worker on
+ * @return a worker to assign executor exec to. Returns null if a worker cannot be successfully found on rack with
+ * rackId
+ */
+ private WorkerSlot getBestWorker(ExecutorDetails exec, TopologyDetails td, String rackId) {
+ if (!rackIdToSortedNodes.containsKey(rackId)) {
+ rackIdToSortedNodes.put(
+ rackId, sortNodes(this.getAvailableNodesFromRack(rackId), rackId, td.getId()));
+ }
+
+ TreeSet<ObjectResources> sortedNodes = rackIdToSortedNodes.get(rackId);
+
+ for (ObjectResources nodeResources : sortedNodes) {
+ RAS_Node node = nodes.getNodeById(nodeResources.id);
+ for (WorkerSlot ws : node.getSlotsAvailbleTo(td)) {
+ if (node.wouldFit(ws, exec, td)) {
+ return ws;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * interface for calculating the number of existing executors scheduled on a object (rack or
+ * node).
+ */
+ private interface ExistingScheduleFunc {
+ int getNumExistingSchedule(String objectId);
+ }
+
+ /**
+ * a class to contain individual object resources as well as cumulative stats.
+ */
+ static class AllResources {
+ List<ObjectResources> objectResources = new LinkedList<ObjectResources>();
+ double availMemResourcesOverall = 0.0;
+ double totalMemResourcesOverall = 0.0;
+ double availCpuResourcesOverall = 0.0;
+ double totalCpuResourcesOverall = 0.0;
+ String identifier;
+
+ public AllResources(String identifier) {
+ this.identifier = identifier;
+ }
+ }
+
+ /**
+ * class to keep track of resources on a rack or node.
+ */
+ static class ObjectResources {
+ String id;
+ double availMem = 0.0;
+ double totalMem = 0.0;
+ double availCpu = 0.0;
+ double totalCpu = 0.0;
+ double effectiveResources = 0.0;
+
+ public ObjectResources(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return this.id;
+ }
+ }
+
+ /**
+ * Nodes are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled is already on the node in
+ * descending order. The reasoning to sort based on criterion 1 is so we schedule the rest of a
+ * topology on the same node as the existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a node in descending
+ * order We calculate the resource availability percentage by dividing the resource availability
+ * that have exhausted or little of one of the resources mentioned above will be ranked after
+ * on the node by the resource availability of the entire rack By doing this calculation, nodes
+ * nodes that have more balanced resource availability. So we will be less likely to pick a node
+ * that have a lot of one resource but a low amount of another.
+ *
+ * @param availNodes a list of all the nodes we want to sort
+ * @param rackId the rack id availNodes are a part of
+ * @param topoId the topology that we are trying to schedule
+ * @return a sorted list of nodes.
+ */
+ private TreeSet<ObjectResources> sortNodes(
+ List<RAS_Node> availNodes, String rackId, final String topoId) {
+ AllResources allResources = new AllResources("RACK");
+ List<ObjectResources> nodes = allResources.objectResources;
+
+ for (RAS_Node rasNode : availNodes) {
+ String nodeId = rasNode.getId();
+ ObjectResources node = new ObjectResources(nodeId);
+
+ double availMem = rasNode.getAvailableMemoryResources();
+ node.availMem = availMem;
+ double availCpu = rasNode.getAvailableCpuResources();
+ node.availCpu = availCpu;
+ double totalMem = rasNode.getTotalMemoryResources();
+ node.totalMem = totalMem;
+ double totalCpu = rasNode.getTotalCpuResources();
+ node.totalCpu = totalCpu;
+
+ nodes.add(node);
+
+ allResources.availMemResourcesOverall += availMem;
+ allResources.availCpuResourcesOverall += availCpu;
+
+ allResources.totalMemResourcesOverall += totalMem;
+ allResources.totalCpuResourcesOverall += totalCpu;
+ }
+
+ LOG.debug(
+ "Rack {}: Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]",
+ rackId,
+ allResources.availCpuResourcesOverall,
+ allResources.availMemResourcesOverall,
+ allResources.totalCpuResourcesOverall,
+ allResources.totalMemResourcesOverall);
+
+ return sortObjectResources(
+ allResources,
+ new ExistingScheduleFunc() {
+ @Override
+ public int getNumExistingSchedule(String objectId) {
+
+ //Get execs already assigned in rack
+ Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
+ if (cluster.getAssignmentById(topoId) != null) {
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
+ cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+ WorkerSlot workerSlot = entry.getValue();
+ ExecutorDetails exec = entry.getKey();
+ if (workerSlot.getNodeId().equals(objectId)) {
+ execs.add(exec);
+ }
+ }
+ }
+ return execs.size();
+ }
+ });
+ }
+
+ /**
+ * Racks are sorted by two criteria.
+ *
+ * <p>1) the number executors of the topology that needs to be scheduled is already on the rack in descending order.
+ * The reasoning to sort based on criterion 1 is so we schedule the rest of a topology on the same rack as the
+ * existing executors of the topology.
+ *
+ * <p>2) the subordinate/subservient resource availability percentage of a rack in descending order We calculate
+ * the resource availability percentage by dividing the resource availability on the rack by the resource
+ * availability of the entire cluster By doing this calculation, racks that have exhausted or little of one of
+ * the resources mentioned above will be ranked after racks that have more balanced resource availability. So we
+ * will be less likely to pick a rack that have a lot of one resource but a low amount of another.
+ *
+ * @param topoId topology id
+ * @return a sorted list of racks
+ */
+ @VisibleForTesting
+ TreeSet<ObjectResources> sortRacks(final String topoId) {
+ AllResources allResources = new AllResources("Cluster");
+ List<ObjectResources> racks = allResources.objectResources;
+
+ final Map<String, String> nodeIdToRackId = new HashMap<String, String>();
+
+ for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+ String rackId = entry.getKey();
+ List<String> nodeIds = entry.getValue();
+ ObjectResources rack = new ObjectResources(rackId);
+ racks.add(rack);
+ for (String nodeId : nodeIds) {
+ RAS_Node node = nodes.getNodeById(nodeHostnameToId(nodeId));
+ double availMem = node.getAvailableMemoryResources();
+ rack.availMem += availMem;
+ double availCpu = node.getAvailableCpuResources();
+ rack.availCpu += availCpu;
+ double totalMem = node.getTotalMemoryResources();
+ rack.totalMem += totalMem;
+ double totalCpu = node.getTotalCpuResources();
+ rack.totalCpu += totalCpu;
+
+ nodeIdToRackId.put(nodeId, rack.id);
+
+ allResources.availMemResourcesOverall += availMem;
+ allResources.availCpuResourcesOverall += availCpu;
+
+ allResources.totalMemResourcesOverall += totalMem;
+ allResources.totalCpuResourcesOverall += totalCpu;
+ }
+ }
+ LOG.debug(
+ "Cluster Overall Avail [ CPU {} MEM {} ] Total [ CPU {} MEM {} ]",
+ allResources.availCpuResourcesOverall,
+ allResources.availMemResourcesOverall,
+ allResources.totalCpuResourcesOverall,
+ allResources.totalMemResourcesOverall);
+
+ return sortObjectResources(
+ allResources,
+ new ExistingScheduleFunc() {
+ @Override
+ public int getNumExistingSchedule(String objectId) {
+ String rackId = objectId;
+ //Get execs already assigned in rack
+ Collection<ExecutorDetails> execs = new LinkedList<ExecutorDetails>();
+ if (cluster.getAssignmentById(topoId) != null) {
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry :
+ cluster.getAssignmentById(topoId).getExecutorToSlot().entrySet()) {
+ String nodeId = entry.getValue().getNodeId();
+ String hostname = idToNode(nodeId).getHostname();
+ ExecutorDetails exec = entry.getKey();
+ if (nodeIdToRackId.get(hostname) != null
+ && nodeIdToRackId.get(hostname).equals(rackId)) {
+ execs.add(exec);
+ }
+ }
+ }
+ return execs.size();
+ }
+ });
+ }
+
+ /**
+ * Sort objects by the following two criteria. 1) the number executors of the topology that needs
+ * to be scheduled is already on the object (node or rack) in descending order. The reasoning to
+ * sort based on criterion 1 is so we schedule the rest of a topology on the same object (node or
+ * rack) as the existing executors of the topology. 2) the subordinate/subservient resource
+ * availability percentage of a rack in descending order We calculate the resource availability
+ * percentage by dividing the resource availability of the object (node or rack) by the resource
+ * availability of the entire rack or cluster depending on if object references a node or a rack.
+ * By doing this calculation, objects (node or rack) that have exhausted or little of one of the
+ * resources mentioned above will be ranked after racks that have more balanced resource
+ * availability. So we will be less likely to pick a rack that have a lot of one resource but a
+ * low amount of another.
+ *
+ * @param allResources contains all individual ObjectResources as well as cumulative stats
+ * @param existingScheduleFunc a function to get existing executors already scheduled on this object
+ * @return a sorted list of ObjectResources
+ */
+ private TreeSet<ObjectResources> sortObjectResources(
+ final AllResources allResources, final ExistingScheduleFunc existingScheduleFunc) {
+
+ for (ObjectResources objectResources : allResources.objectResources) {
+ StringBuilder sb = new StringBuilder();
+ if (allResources.availCpuResourcesOverall <= 0.0
+ || allResources.availMemResourcesOverall <= 0.0) {
+ objectResources.effectiveResources = 0.0;
+ } else {
+ List<Double> values = new LinkedList<>();
+
+ //add cpu
+ double cpuPercent =
+ (objectResources.availCpu / allResources.availCpuResourcesOverall) * 100.0;
+ values.add(cpuPercent);
+ sb.append(String.format("CPU %f(%f%%) ", objectResources.availCpu, cpuPercent));
+
+ //add memory
+ double memoryPercent =
+ (objectResources.availMem / allResources.availMemResourcesOverall) * 100.0;
+ values.add(memoryPercent);
+ sb.append(String.format("MEM %f(%f%%) ", objectResources.availMem, memoryPercent));
+
+ objectResources.effectiveResources = Collections.min(values);
+ }
+ LOG.debug(
+ "{}: Avail [ {} ] Total [ CPU {} MEM {}] effective resources: {}",
+ objectResources.id,
+ sb.toString(),
+ objectResources.totalCpu,
+ objectResources.totalMem,
+ objectResources.effectiveResources);
+ }
+
+ TreeSet<ObjectResources> sortedObjectResources =
+ new TreeSet<>(( o1, o2) -> {
+ int execsScheduled1 = existingScheduleFunc.getNumExistingSchedule(o1.id);
+ int execsScheduled2 = existingScheduleFunc.getNumExistingSchedule(o2.id);
+ if (execsScheduled1 > execsScheduled2) {
+ return -1;
+ } else if (execsScheduled1 < execsScheduled2) {
+ return 1;
+ } else {
+ if (o1.effectiveResources > o2.effectiveResources) {
+ return -1;
+ } else if (o1.effectiveResources < o2.effectiveResources) {
+ return 1;
+ } else {
+ List<Double> o1Values = new LinkedList<Double>();
+ List<Double> o2Values = new LinkedList<Double>();
+ o1Values.add((o1.availCpu / allResources.availCpuResourcesOverall) * 100.0);
+ o2Values.add((o2.availCpu / allResources.availCpuResourcesOverall) * 100.0);
+
+ o1Values.add((o1.availMem / allResources.availMemResourcesOverall) * 100.0);
+ o2Values.add((o2.availMem / allResources.availMemResourcesOverall) * 100.0);
+
+ double o1Avg = ResourceUtils.avg(o1Values);
+ double o2Avg = ResourceUtils.avg(o2Values);
+
+ if (o1Avg > o2Avg) {
+ return -1;
+ } else if (o1Avg < o2Avg) {
+ return 1;
+ } else {
+ return o1.id.compareTo(o2.id);
+ }
+ }
+ }
+ });
+ sortedObjectResources.addAll(allResources.objectResources);
+ LOG.debug("Sorted Object Resources: {}", sortedObjectResources);
+ return sortedObjectResources;
+ }
+
+ /**
+ * Get the rack on which a node is a part of.
+ *
+ * @param node the node to find out which rack its on
+ * @return the rack id
+ */
+ private String nodeToRack(RAS_Node node) {
+ for (Map.Entry<String, List<String>> entry : networkTopography.entrySet()) {
+ if (entry.getValue().contains(node.getHostname())) {
+ return entry.getKey();
+ }
+ }
+ LOG.error("Node: {} not found in any racks", node.getHostname());
+ return null;
+ }
+
+ /**
+ * get a list nodes from a rack.
+ *
+ * @param rackId the rack id of the rack to get nodes from
+ * @return a list of nodes
+ */
+ private List<RAS_Node> getAvailableNodesFromRack(String rackId) {
+ List<RAS_Node> retList = new ArrayList<>();
+ for (String nodeId : networkTopography.get(rackId)) {
+ retList.add(nodes.getNodeById(this.nodeHostnameToId(nodeId)));
+ }
+ return retList;
+ }
+
+ /**
+ * sort components by the number of in and out connections that need to be made, in descending order
+ *
+ * @param componentMap The components that need to be sorted
+ * @return a sorted set of components
+ */
+ private Set<Component> sortComponents(final Map<String, Component> componentMap) {
+ Set<Component> sortedComponents =
+ new TreeSet<>((o1, o2) -> {
+ int connections1 = 0;
+ int connections2 = 0;
+
+ for (String childId : union(o1.getChildren(), o1.getParents())) {
+ connections1 +=
+ (componentMap.get(childId).getExecs().size() * o1.getExecs().size());
+ }
+
+ for (String childId : union(o2.getChildren(), o2.getParents())) {
+ connections2 +=
+ (componentMap.get(childId).getExecs().size() * o2.getExecs().size());
+ }
+
+ if (connections1 > connections2) {
+ return -1;
+ } else if (connections1 < connections2) {
+ return 1;
+ } else {
+ return o1.getId().compareTo(o2.getId());
+ }
+ });
+ sortedComponents.addAll(componentMap.values());
+ return sortedComponents;
+ }
+
+ private static <T> Set<T> union(Set<T> a, Set<T> b) {
+ HashSet<T> ret = new HashSet<>(a);
+ ret.addAll(b);
+ return ret;
+ }
+
+ /**
+ * Sort a component's neighbors by the number of connections it needs to make with this component.
+ *
+ * @param thisComp the component that we need to sort its neighbors
+ * @param componentMap all the components to sort
+ * @return a sorted set of components
+ */
+ private Set<Component> sortNeighbors(
+ final Component thisComp, final Map<String, Component> componentMap) {
+ Set<Component> sortedComponents =
+ new TreeSet<>((o1, o2) -> {
+ int connections1 = o1.getExecs().size() * thisComp.getExecs().size();
+ int connections2 = o2.getExecs().size() * thisComp.getExecs().size();
+ if (connections1 < connections2) {
+ return -1;
+ } else if (connections1 > connections2) {
+ return 1;
+ } else {
+ return o1.getId().compareTo(o2.getId());
+ }
+ });
+ sortedComponents.addAll(componentMap.values());
+ return sortedComponents;
+ }
+
+ /**
+ * Order executors based on how many in and out connections it will potentially need to make, in descending order.
+ * First order components by the number of in and out connections it will have. Then iterate through the sorted list of components.
+ * For each component sort the neighbors of that component by how many connections it will have to make with that component.
+ * Add an executor from this component and then from each neighboring component in sorted order. Do this until there is nothing left to schedule
+ *
+ * @param td The topology the executors belong to
+ * @param unassignedExecutors a collection of unassigned executors that need to be unassigned. Should only try to
+ * assign executors from this list
+ * @return a list of executors in sorted order
+ */
+ private List<ExecutorDetails> orderExecutors(
+ TopologyDetails td, Collection<ExecutorDetails> unassignedExecutors) {
+ Map<String, Component> componentMap = td.getComponents();
+ List<ExecutorDetails> execsScheduled = new LinkedList<>();
+
+ Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
+ for (Component component : componentMap.values()) {
+ compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
+ for (ExecutorDetails exec : component.getExecs()) {
+ if (unassignedExecutors.contains(exec)) {
+ compToExecsToSchedule.get(component.getId()).add(exec);
+ }
+ }
+ }
+
+ Set<Component> sortedComponents = sortComponents(componentMap);
+ sortedComponents.addAll(componentMap.values());
+
+ for (Component currComp : sortedComponents) {
+ Map<String, Component> neighbors = new HashMap<String, Component>();
+ for (String compId : union(currComp.getChildren(), currComp.getParents())) {
+ neighbors.put(compId, componentMap.get(compId));
+ }
+ Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
+ Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
+
+ boolean flag = false;
+ do {
+ flag = false;
+ if (!currCompExesToSched.isEmpty()) {
+ execsScheduled.add(currCompExesToSched.poll());
+ flag = true;
+ }
+
+ for (Component neighborComp : sortedNeighbors) {
+ Queue<ExecutorDetails> neighborCompExesToSched =
+ compToExecsToSchedule.get(neighborComp.getId());
+ if (!neighborCompExesToSched.isEmpty()) {
+ execsScheduled.add(neighborCompExesToSched.poll());
+ flag = true;
+ }
+ }
+ } while (flag);
+ }
+ return execsScheduled;
+ }
+
+ /**
+ * Get a list of all the spouts in the topology.
+ *
+ * @param td topology to get spouts from
+ * @return a list of spouts
+ */
+ private List<Component> getSpouts(TopologyDetails td) {
+ List<Component> spouts = new ArrayList<>();
+
+ for (Component c : td.getComponents().values()) {
+ if (c.getType() == ComponentType.SPOUT) {
+ spouts.add(c);
+ }
+ }
+ return spouts;
+ }
+
+ /**
+ * Get the amount of resources available and total for each node.
+ *
+ * @return a String with cluster resource info for debug
+ */
+ private void logClusterInfo() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cluster:");
+ for (Map.Entry<String, List<String>> clusterEntry : networkTopography.entrySet()) {
+ String rackId = clusterEntry.getKey();
+ LOG.debug("Rack: {}", rackId);
+ for (String nodeHostname : clusterEntry.getValue()) {
+ RAS_Node node = idToNode(this.nodeHostnameToId(nodeHostname));
+ LOG.debug("-> Node: {} {}", node.getHostname(), node.getId());
+ LOG.debug(
+ "--> Avail Resources: {Mem {}, CPU {} Slots: {}}",
+ node.getAvailableMemoryResources(),
+ node.getAvailableCpuResources(),
+ node.totalSlotsFree());
+ LOG.debug(
+ "--> Total Resources: {Mem {}, CPU {} Slots: {}}",
+ node.getTotalMemoryResources(),
+ node.getTotalCpuResources(),
+ node.totalSlots());
+ }
+ }
+ }
+ }
+
+ /**
+ * hostname to Id.
+ *
+ * @param hostname the hostname to convert to node id
+ * @return the id of a node
+ */
+ public String nodeHostnameToId(String hostname) {
+ for (RAS_Node n : nodes.getNodes()) {
+ if (n.getHostname() == null) {
+ continue;
+ }
+ if (n.getHostname().equals(hostname)) {
+ return n.getId();
+ }
+ }
+ LOG.error("Cannot find Node with hostname {}", hostname);
+ return null;
+ }
+
+ /**
+ * Find RAS_Node for specified node id.
+ *
+ * @param id the node/supervisor id to lookup
+ * @return a RAS_Node object
+ */
+ public RAS_Node idToNode(String id) {
+ RAS_Node ret = nodes.getNodeById(id);
+ if (ret == null) {
+ LOG.error("Cannot find Node with Id: {}", id);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
new file mode 100644
index 0000000..53859fd
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/IStrategy.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import java.util.Map;
+
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+
+/**
+ * An interface to for implementing different scheduling strategies for the resource aware scheduling.
+ * In the future strategies will be pluggable
+ */
+public interface IStrategy {
+
+ /**
+ * Prepare the Strategy for scheduling.
+ * @param config the cluster configuration
+ */
+ void prepare(Map<String, Object> config);
+
+ /**
+ * This method is invoked to calculate a scheduling for topology td. Cluster will reject any changes that are
+ * not for the given topology. Any changes made to the cluster will be committed if the scheduling is successful.
+ * @param schedulingState the current state of the cluster
+ * @param td the topology to schedule for
+ * @return returns a SchedulingResult object containing SchedulingStatus object to indicate whether scheduling is
+ * successful.
+ */
+ SchedulingResult schedule(Cluster schedulingState, TopologyDetails td);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/TestCgroups.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/TestCgroups.java b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
index 0a57cc5..c785dde 100644
--- a/storm-server/src/test/java/org/apache/storm/TestCgroups.java
+++ b/storm-server/src/test/java/org/apache/storm/TestCgroups.java
@@ -60,11 +60,8 @@ public class TestCgroups {
CgroupManager manager = new CgroupManager();
manager.prepare(config);
- Map<String, Number> resourcesMap = new HashMap<>();
- resourcesMap.put("cpu", 200);
- resourcesMap.put("memory", 1024);
String workerId = UUID.randomUUID().toString();
- manager.reserveResourcesForWorker(workerId, resourcesMap);
+ manager.reserveResourcesForWorker(workerId, 1024, 200);
List<String> commandList = manager.getLaunchCommand(workerId, new ArrayList<String>());
StringBuilder command = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
new file mode 100644
index 0000000..0458ca4
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.nimbus;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.TopologyBuilder;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class NimbusTest {
+ @Test
+ public void testMemoryLoadLargerThanMaxHeapSize() throws Exception {
+ // Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
+ // Largest memory requirement of a component in the topology).
+ TopologyBuilder builder1 = new TopologyBuilder();
+ builder1.setSpout("wordSpout1", new TestWordSpout(), 4);
+ StormTopology stormTopology1 = builder1.createTopology();
+ Config config1 = new Config();
+ config1.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
+ config1.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
+ config1.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
+
+ config1.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
+ config1.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
+ config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
+ config1.put(Config.TOPOLOGY_PRIORITY, 0);
+ config1.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
+ config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
+ config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 129.0);
+ try {
+ Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, stormTopology1);
+ fail("Expected exception not thrown");
+ } catch (IllegalArgumentException e) {
+ //Expected...
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index dd267e3..fe0a993 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -416,6 +416,7 @@ public class BasicContainerTest {
"-Dlog4j.configurationFile=" + workerConf,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
+ "-Dworker.memory_limit_mb=768",
"org.apache.storm.LogWriter",
"java",
"-server",
@@ -430,6 +431,7 @@ public class BasicContainerTest {
"-Dlog4j.configurationFile=" + workerConf,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
+ "-Dworker.memory_limit_mb=768",
"-Dtesting=true",
"-Djava.library.path=JLP",
"-Dstorm.conf.file=",
http://git-wip-us.apache.org/repos/asf/storm/blob/b4d33955/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
new file mode 100644
index 0000000..9c01a2a
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/ClusterTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link Cluster}.
+ */
+public class ClusterTest {
+
+ /** This should match the value in Cluster.getAssignedMemoryForSlot. */
+ final Double TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION = 768.0;
+
+ private Map<String, Object> getConfig(String key, Object value) {
+ Map<String, Object> topConf = getEmptyConfig();
+ topConf.put(key, value);
+ return topConf;
+ }
+
+ private Map<String, Object> getEmptyConfig() {
+ Map<String, Object> topConf = new HashMap<>();
+ return topConf;
+ }
+
+ private Map<String, Object> getPopulatedConfig() {
+ Map<String, Object> topConf = new HashMap<>();
+ topConf.put(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m");
+ topConf.put(Config.WORKER_GC_CHILDOPTS, "-Xmx256m");
+ topConf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m");
+ topConf.put(Config.WORKER_CHILDOPTS, "-Xmx768m");
+ topConf.put(Config.WORKER_HEAP_MEMORY_MB, 1024);
+ topConf.put(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m");
+ return topConf;
+ }
+
+ /**
+ * Test Cluster.getAssignedMemoryForSlot with a single config value set.
+ * @param key - the config key to set
+ * @param value - the config value to set
+ * @param expectedValue - the expected result
+ */
+ private void singleValueTest(String key, String value, double expectedValue) {
+ Map<String, Object> topConf = getConfig(key, value);
+ Assert.assertEquals(expectedValue, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_allNull() {
+ Map<String, Object> topConf = getEmptyConfig();
+ Assert.assertEquals(TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION, Cluster.getAssignedMemoryForSlot(topConf));
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_topologyWorkerGcChildopts() {
+ singleValueTest(Config.TOPOLOGY_WORKER_GC_CHILDOPTS, "-Xmx128m", 128.0);
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_workerGcChildopts() {
+ singleValueTest(Config.WORKER_GC_CHILDOPTS, "-Xmx256m", 256.0);
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_topologyWorkerChildopts() {
+ singleValueTest(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx512m", 512.0);
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_workerChildopts() {
+ singleValueTest(Config.WORKER_CHILDOPTS, "-Xmx768m", 768.0);
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_workerHeapMemoryMb() {
+ Map<String, Object> topConf = getConfig(Config.WORKER_HEAP_MEMORY_MB, 1024);
+ Assert.assertEquals(1024.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_topologyWorkerLwChildopts() {
+ singleValueTest(Config.TOPOLOGY_WORKER_LOGWRITER_CHILDOPTS, "-Xmx64m",
+ TOPOLOGY_WORKER_DEFAULT_MEMORY_ALLOCATION + 64.0);
+ }
+
+ @Test
+ public void getAssignedMemoryForSlot_all() {
+ Map<String, Object> topConf = getPopulatedConfig();
+ Assert.assertEquals(128.0 + 64.0, Cluster.getAssignedMemoryForSlot(topConf).doubleValue(), 0);
+ }
+}