You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:39 UTC
[11/23] storm git commit: fixing simple issues based on comments
fixing simple issues based on comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88ad3c31
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88ad3c31
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88ad3c31
Branch: refs/heads/master
Commit: 88ad3c316a5a0df7c083470fb66847e5fb29d663
Parents: 3e83220
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Dec 4 14:02:56 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 14:02:56 2015 -0600
----------------------------------------------------------------------
.../storm/starter/ResourceAwareExampleTopology.java | 4 +---
storm-core/src/jvm/backtype/storm/Config.java | 1 +
.../src/jvm/backtype/storm/scheduler/Cluster.java | 4 +---
.../src/jvm/backtype/storm/scheduler/Topologies.java | 7 ++++---
.../backtype/storm/scheduler/TopologyDetails.java | 10 +++++++++-
.../backtype/storm/scheduler/resource/RAS_Node.java | 6 +++---
.../scheduler/resource/ResourceAwareScheduler.java | 12 ++++++------
.../storm/scheduler/resource/SchedulingResult.java | 15 ++++++++++-----
.../jvm/backtype/storm/scheduler/resource/User.java | 14 ++++++++------
.../strategies/eviction/DefaultEvictionStrategy.java | 5 ++---
10 files changed, 45 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
index a9ac659..7104281 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
@@ -54,8 +54,6 @@ public class ResourceAwareExampleTopology {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
-
-
}
public static void main(String[] args) throws Exception {
@@ -87,7 +85,7 @@ public class ResourceAwareExampleTopology {
// Set topology priority 0-30 with 0 being the highest priority and 30 being the lowest priority.
conf.setTopologyPriority(30);
- //Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
+ // Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
if (args != null && args.length > 0) {
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index f3c8c4a..dacf4f8 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -2229,6 +2229,7 @@ public class Config extends HashMap<String, Object> {
/**
* Takes as input the strategy class name. Strategy must implement the IStrategy interface
+ * @param clazz class of the strategy to use
*/
public void setTopologyStrategy(Class<? extends IStrategy> clazz) {
if (clazz != null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index c35dbbd..92b2219 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -104,9 +104,7 @@ public class Cluster {
Map newConf = new HashMap<String, Object>();
newConf.putAll(this.conf);
Cluster copy = new Cluster(this.inimbus, this.supervisors, newAssignments, newConf);
- for (Map.Entry<String, String> entry : this.status.entrySet()) {
- copy.setStatus(entry.getKey(), entry.getValue());
- }
+ copy.status = new HashMap<>(this.status);
return copy;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 3a6361f..b6fbd07 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -74,10 +74,11 @@ public class Topologies {
@Override
public String toString() {
- String ret = "Topologies:\n";
+ StringBuilder ret = new StringBuilder();
+ ret.append("Topologies:\n");
for (TopologyDetails td : this.getTopologies()) {
- ret += td.toString() + "\n";
+ ret.append(td.toString()).append("\n");
}
- return ret;
+ return ret.toString();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index 871ae9b..166493f 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -469,7 +469,7 @@ public class TopologyDetails {
String user = (String) this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
if (user == null || user.equals("")) {
LOG.debug("Topology {} submitted by anonymous user", this.getName());
- user = "anonymous";
+ user = System.getProperty("user.name");
}
return user;
}
@@ -506,4 +506,12 @@ public class TopologyDetails {
public int hashCode() {
return this.topologyId.hashCode();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof TopologyDetails)) {
+ return false;
+ }
+ return (this.topologyId.equals(((TopologyDetails) o).getId()));
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 54775bf..8d0df62 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -144,7 +144,7 @@ public class RAS_Node {
}
}
- void addOrphanedSlot(WorkerSlot ws) {
+ void addOrphanedSlot(WorkerSlot ws) {
if (_isAlive) {
throw new IllegalArgumentException("Orphaned Slots " +
"only are allowed on dead nodes.");
@@ -241,7 +241,7 @@ public class RAS_Node {
_topIdToUsedSlots.remove(topId);
}
- public void freeMemory(double amount) {
+ private void freeMemory(double amount) {
_availMemory += amount;
LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), _availMemory);
if (_availMemory > this.getTotalMemoryResources()) {
@@ -249,7 +249,7 @@ public class RAS_Node {
}
}
- public void freeCPU(double amount) {
+ private void freeCPU(double amount) {
_availCPU += amount;
LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), _availCPU);
if (_availCPU > this.getAvailableCpuResources()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 53672f6..c2e2fcd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -131,9 +131,9 @@ public class ResourceAwareScheduler implements IScheduler {
LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
SchedulingState schedulingState = this.checkpointSchedulingState();
- IStrategy RAStrategy = null;
+ IStrategy rasStrategy = null;
try {
- RAStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
+ rasStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
} catch (RuntimeException e) {
LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
@@ -150,17 +150,17 @@ public class ResourceAwareScheduler implements IScheduler {
SchedulingResult result = null;
try {
//Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
- RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
- result = RAStrategy.schedule(td);
+ rasStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+ result = rasStrategy.schedule(td);
} catch (Exception e) {
LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrack: {}"
- , RAStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+ , rasStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
this.restoreCheckpointSchedulingState(schedulingState);
//since state is restored need the update User topologySubmitter to the new User object in userMap
topologySubmitter = this.userMap.get(td.getTopologySubmitter());
topologySubmitter.moveTopoFromPendingToInvalid(td);
this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
- + RAStrategy.getClass().getName() + ". Please check logs for details");
+ + rasStrategy.getClass().getName() + ". Please check logs for details");
}
LOG.debug("scheduling result: {}", result);
if (result != null && result.isValid()) {
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
index 9e7b1ff..13ed8ad 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
@@ -26,21 +26,26 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Map;
+/**
+ * This class serves as a mechanism to return results and messages from a scheduling strategy to the Resource Aware Scheduler
+ */
public class SchedulingResult {
//contains the result for the attempted scheduling
private Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap = null;
+ //status of scheduling the topology e.g. success or fail?
private SchedulingStatus status = null;
+ //arbitrary message to be returned when scheduling is done
private String message = null;
+ //error message returned is something went wrong
private String errorMessage = null;
private static final Logger LOG = LoggerFactory.getLogger(SchedulingResult.class);
-
- public SchedulingResult(SchedulingStatus status, Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap, String message, String errorMessage) {
+ private SchedulingResult(SchedulingStatus status, Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap, String message, String errorMessage) {
this.status = status;
this.schedulingResultMap = schedulingResultMap;
this.message = message;
@@ -100,11 +105,11 @@ public class SchedulingResult {
@Override
public String toString() {
- String ret = "";
+ String ret = null;
if(this.isSuccess()) {
- ret += "Status: " + this.getStatus() + " message: " + this.getMessage() + " scheduling: " + this.getSchedulingResultMap().toString();
+ ret = "Status: " + this.getStatus() + " message: " + this.getMessage() + " scheduling: " + this.getSchedulingResultMap();
} else {
- ret += "Status: " + this.getStatus() + " error message: " + this.getErrorMessage();
+ ret = "Status: " + this.getStatus() + " error message: " + this.getErrorMessage();
}
return ret;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 8542120..7f49446 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -159,7 +159,6 @@ public class User {
this.moveTopoFromPendingToRunning(topo, null);
}
-
public void moveTopoFromPendingToAttempted(TopologyDetails topo, Cluster cluster) {
moveTopology(topo, this.pendingQueue, "pending", this.attemptedQueue, "attempted");
if (cluster != null) {
@@ -171,7 +170,6 @@ public class User {
this.moveTopoFromPendingToAttempted(topo, null);
}
-
public void moveTopoFromPendingToInvalid(TopologyDetails topo, Cluster cluster) {
moveTopology(topo, this.pendingQueue, "pending", this.invalidQueue, "invalid");
if (cluster != null) {
@@ -183,7 +181,6 @@ public class User {
this.moveTopoFromPendingToInvalid(topo, null);
}
-
public void moveTopoFromRunningToPending(TopologyDetails topo, Cluster cluster) {
moveTopology(topo, this.runningQueue, "running", this.pendingQueue, "pending");
if (cluster != null) {
@@ -195,7 +192,6 @@ public class User {
this.moveTopoFromRunningToPending(topo, null);
}
-
private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName) {
LOG.debug("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
if (topo == null) {
@@ -213,7 +209,6 @@ public class User {
dest.add(topo);
}
-
public double getResourcePoolAverageUtilization() {
Double cpuResourcePoolUtilization = this.getCPUResourcePoolUtilization();
Double memoryResourcePoolUtilization = this.getMemoryResourcePoolUtilization();
@@ -243,7 +238,6 @@ public class User {
return this.getMemoryResourceUsedByUser() / memoryGuarantee;
}
-
public double getCPUResourceUsedByUser() {
double sum = 0.0;
for (TopologyDetails topo : this.runningQueue) {
@@ -294,6 +288,14 @@ public class User {
}
@Override
+ public boolean equals(Object o) {
+ if (!(o instanceof User)) {
+ return false;
+ }
+ return this.getId().equals(((User) o).getId());
+ }
+
+ @Override
public String toString() {
return this.userId;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index f0401ce..81c2abc 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -23,7 +23,6 @@ import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.scheduler.resource.RAS_Nodes;
-import backtype.storm.scheduler.resource.ResourceUtils;
import backtype.storm.scheduler.resource.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +57,7 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
- User evictUser = this.findUserWithMostResourcesAboveGuarantee();
+ User evictUser = this.findUserWithHighestAverageResourceUtilAboveGuarantee();
//user has enough resource under his or her resource guarantee to schedule topology
if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
if (evictUser != null) {
@@ -96,7 +95,7 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
}
- private User findUserWithMostResourcesAboveGuarantee() {
+ private User findUserWithHighestAverageResourceUtilAboveGuarantee() {
double most = 0.0;
User mostOverUser = null;
for (User user : this.userMap.values()) {