You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:39 UTC

[11/23] storm git commit: fixing simple issues based on comments

fixing simple issues based on comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/88ad3c31
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/88ad3c31
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/88ad3c31

Branch: refs/heads/master
Commit: 88ad3c316a5a0df7c083470fb66847e5fb29d663
Parents: 3e83220
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Fri Dec 4 14:02:56 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Dec 4 14:02:56 2015 -0600

----------------------------------------------------------------------
 .../storm/starter/ResourceAwareExampleTopology.java  |  4 +---
 storm-core/src/jvm/backtype/storm/Config.java        |  1 +
 .../src/jvm/backtype/storm/scheduler/Cluster.java    |  4 +---
 .../src/jvm/backtype/storm/scheduler/Topologies.java |  7 ++++---
 .../backtype/storm/scheduler/TopologyDetails.java    | 10 +++++++++-
 .../backtype/storm/scheduler/resource/RAS_Node.java  |  6 +++---
 .../scheduler/resource/ResourceAwareScheduler.java   | 12 ++++++------
 .../storm/scheduler/resource/SchedulingResult.java   | 15 ++++++++++-----
 .../jvm/backtype/storm/scheduler/resource/User.java  | 14 ++++++++------
 .../strategies/eviction/DefaultEvictionStrategy.java |  5 ++---
 10 files changed, 45 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
index a9ac659..7104281 100644
--- a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
+++ b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
@@ -54,8 +54,6 @@ public class ResourceAwareExampleTopology {
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("word"));
     }
-
-
   }
 
   public static void main(String[] args) throws Exception {
@@ -87,7 +85,7 @@ public class ResourceAwareExampleTopology {
     // Set topology priority 0-30 with 0 being the highest priority and 30 being the lowest priority.
     conf.setTopologyPriority(30);
 
-    //Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
+    // Set strategy to schedule topology. If not specified, default to backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy
     conf.setTopologyStrategy(backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class);
 
     if (args != null && args.length > 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index f3c8c4a..dacf4f8 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -2229,6 +2229,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * Takes as input the strategy class name. Strategy must implement the IStrategy interface
+     * @param clazz class of the strategy to use
      */
     public void setTopologyStrategy(Class<? extends IStrategy> clazz) {
         if (clazz != null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
index c35dbbd..92b2219 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
@@ -104,9 +104,7 @@ public class Cluster {
         Map newConf = new HashMap<String, Object>();
         newConf.putAll(this.conf);
         Cluster copy = new Cluster(this.inimbus, this.supervisors, newAssignments, newConf);
-        for (Map.Entry<String, String> entry : this.status.entrySet()) {
-            copy.setStatus(entry.getKey(), entry.getValue());
-        }
+        copy.status = new HashMap<>(this.status);
         return copy;
     }
     

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
index 3a6361f..b6fbd07 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
@@ -74,10 +74,11 @@ public class Topologies {
 
     @Override
     public String toString() {
-        String ret = "Topologies:\n";
+        StringBuilder ret = new StringBuilder();
+        ret.append("Topologies:\n");
         for (TopologyDetails td : this.getTopologies()) {
-            ret += td.toString() + "\n";
+            ret.append(td.toString()).append("\n");
         }
-        return ret;
+        return ret.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
index 871ae9b..166493f 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -469,7 +469,7 @@ public class TopologyDetails {
         String user = (String) this.topologyConf.get(Config.TOPOLOGY_SUBMITTER_USER);
         if (user == null || user.equals("")) {
             LOG.debug("Topology {} submitted by anonymous user", this.getName());
-            user = "anonymous";
+            user = System.getProperty("user.name");
         }
         return user;
     }
@@ -506,4 +506,12 @@ public class TopologyDetails {
     public int hashCode() {
         return this.topologyId.hashCode();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof TopologyDetails)) {
+            return false;
+        }
+        return (this.topologyId.equals(((TopologyDetails) o).getId()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index 54775bf..8d0df62 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -144,7 +144,7 @@ public class RAS_Node {
         }
     }
 
-     void addOrphanedSlot(WorkerSlot ws) {
+    void addOrphanedSlot(WorkerSlot ws) {
         if (_isAlive) {
             throw new IllegalArgumentException("Orphaned Slots " +
                     "only are allowed on dead nodes.");
@@ -241,7 +241,7 @@ public class RAS_Node {
         _topIdToUsedSlots.remove(topId);
     }
 
-    public void freeMemory(double amount) {
+    private void freeMemory(double amount) {
         _availMemory += amount;
         LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), _availMemory);
         if (_availMemory > this.getTotalMemoryResources()) {
@@ -249,7 +249,7 @@ public class RAS_Node {
         }
     }
 
-    public void freeCPU(double amount) {
+    private void freeCPU(double amount) {
         _availCPU += amount;
         LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), _availCPU);
         if (_availCPU > this.getAvailableCpuResources()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
index 53672f6..c2e2fcd 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -131,9 +131,9 @@ public class ResourceAwareScheduler implements IScheduler {
             LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter);
 
             SchedulingState schedulingState = this.checkpointSchedulingState();
-            IStrategy RAStrategy = null;
+            IStrategy rasStrategy = null;
             try {
-                RAStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
+                rasStrategy = (IStrategy) Utils.newInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
             } catch (RuntimeException e) {
                 LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.",
                         td.getName(), td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage());
@@ -150,17 +150,17 @@ public class ResourceAwareScheduler implements IScheduler {
                 SchedulingResult result = null;
                 try {
                     //Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored
-                    RAStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
-                    result = RAStrategy.schedule(td);
+                    rasStrategy.prepare(this.topologies, this.cluster, this.userMap, this.nodes);
+                    result = rasStrategy.schedule(td);
                 } catch (Exception e) {
                     LOG.error("Exception thrown when running strategy {} to schedule topology {}. Topology will not be scheduled! Error: {} StackTrack: {}"
-                            , RAStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
+                            , rasStrategy.getClass().getName(), td.getName(), e.getMessage(), Arrays.toString(e.getStackTrace()));
                     this.restoreCheckpointSchedulingState(schedulingState);
                     //since state is restored need the update User topologySubmitter to the new User object in userMap
                     topologySubmitter = this.userMap.get(td.getTopologySubmitter());
                     topologySubmitter.moveTopoFromPendingToInvalid(td);
                     this.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}"
-                            + RAStrategy.getClass().getName() + ". Please check logs for details");
+                            + rasStrategy.getClass().getName() + ". Please check logs for details");
                 }
                 LOG.debug("scheduling result: {}", result);
                 if (result != null && result.isValid()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
index 9e7b1ff..13ed8ad 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/SchedulingResult.java
@@ -26,21 +26,26 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Map;
 
+/**
+ * This class serves as a mechanism to return results and messages from a scheduling strategy to the Resource Aware Scheduler
+ */
 public class SchedulingResult {
 
     //contains the result for the attempted scheduling
     private Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap = null;
 
+    //status of scheduling the topology e.g. success or fail?
     private SchedulingStatus status = null;
 
+    //arbitrary message to be returned when scheduling is done
     private String message = null;
 
+    //error message returned is something went wrong
     private String errorMessage = null;
 
     private static final Logger LOG = LoggerFactory.getLogger(SchedulingResult.class);
 
-
-    public SchedulingResult(SchedulingStatus status, Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap, String message, String errorMessage) {
+    private SchedulingResult(SchedulingStatus status, Map<WorkerSlot, Collection<ExecutorDetails>> schedulingResultMap, String message, String errorMessage) {
         this.status = status;
         this.schedulingResultMap = schedulingResultMap;
         this.message = message;
@@ -100,11 +105,11 @@ public class SchedulingResult {
 
     @Override
     public String toString() {
-        String ret = "";
+        String ret = null;
         if(this.isSuccess()) {
-            ret += "Status: " + this.getStatus() + " message: " + this.getMessage() + " scheduling: " + this.getSchedulingResultMap().toString();
+            ret = "Status: " + this.getStatus() + " message: " + this.getMessage() + " scheduling: " + this.getSchedulingResultMap();
         } else {
-            ret += "Status: " + this.getStatus() + " error message: " + this.getErrorMessage();
+            ret = "Status: " + this.getStatus() + " error message: " + this.getErrorMessage();
         }
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
index 8542120..7f49446 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/User.java
@@ -159,7 +159,6 @@ public class User {
         this.moveTopoFromPendingToRunning(topo, null);
     }
 
-
     public void moveTopoFromPendingToAttempted(TopologyDetails topo, Cluster cluster) {
         moveTopology(topo, this.pendingQueue, "pending", this.attemptedQueue, "attempted");
         if (cluster != null) {
@@ -171,7 +170,6 @@ public class User {
         this.moveTopoFromPendingToAttempted(topo, null);
     }
 
-
     public void moveTopoFromPendingToInvalid(TopologyDetails topo, Cluster cluster) {
         moveTopology(topo, this.pendingQueue, "pending", this.invalidQueue, "invalid");
         if (cluster != null) {
@@ -183,7 +181,6 @@ public class User {
         this.moveTopoFromPendingToInvalid(topo, null);
     }
 
-
     public void moveTopoFromRunningToPending(TopologyDetails topo, Cluster cluster) {
         moveTopology(topo, this.runningQueue, "running", this.pendingQueue, "pending");
         if (cluster != null) {
@@ -195,7 +192,6 @@ public class User {
         this.moveTopoFromRunningToPending(topo, null);
     }
 
-
     private void moveTopology(TopologyDetails topo, Set<TopologyDetails> src, String srcName, Set<TopologyDetails> dest, String destName) {
         LOG.debug("For User {} Moving topo {} from {} to {}", this.userId, topo.getName(), srcName, destName);
         if (topo == null) {
@@ -213,7 +209,6 @@ public class User {
         dest.add(topo);
     }
 
-
     public double getResourcePoolAverageUtilization() {
         Double cpuResourcePoolUtilization = this.getCPUResourcePoolUtilization();
         Double memoryResourcePoolUtilization = this.getMemoryResourcePoolUtilization();
@@ -243,7 +238,6 @@ public class User {
         return this.getMemoryResourceUsedByUser() / memoryGuarantee;
     }
 
-
     public double getCPUResourceUsedByUser() {
         double sum = 0.0;
         for (TopologyDetails topo : this.runningQueue) {
@@ -294,6 +288,14 @@ public class User {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof User)) {
+            return false;
+        }
+        return this.getId().equals(((User) o).getId());
+    }
+
+    @Override
     public String toString() {
         return this.userId;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/88ad3c31/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
index f0401ce..81c2abc 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/strategies/eviction/DefaultEvictionStrategy.java
@@ -23,7 +23,6 @@ import backtype.storm.scheduler.Topologies;
 import backtype.storm.scheduler.TopologyDetails;
 import backtype.storm.scheduler.WorkerSlot;
 import backtype.storm.scheduler.resource.RAS_Nodes;
-import backtype.storm.scheduler.resource.ResourceUtils;
 import backtype.storm.scheduler.resource.User;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +57,7 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
         double cpuNeeded = td.getTotalRequestedCpu() / submitter.getCPUResourceGuaranteed();
         double memoryNeeded = (td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap()) / submitter.getMemoryResourceGuaranteed();
 
-        User evictUser = this.findUserWithMostResourcesAboveGuarantee();
+        User evictUser = this.findUserWithHighestAverageResourceUtilAboveGuarantee();
         //user has enough resource under his or her resource guarantee to schedule topology
         if ((1.0 - submitter.getCPUResourcePoolUtilization()) >= cpuNeeded && (1.0 - submitter.getMemoryResourcePoolUtilization()) >= memoryNeeded) {
             if (evictUser != null) {
@@ -96,7 +95,7 @@ public class DefaultEvictionStrategy implements IEvictionStrategy {
         submitter.moveTopoFromRunningToPending(topologyEvict, this.cluster);
     }
 
-    private User findUserWithMostResourcesAboveGuarantee() {
+    private User findUserWithHighestAverageResourceUtilAboveGuarantee() {
         double most = 0.0;
         User mostOverUser = null;
         for (User user : this.userMap.values()) {