You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:45 UTC

[17/23] storm git commit: renaming RAS_Node.java variables

renaming RAS_Node.java variables


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/45f637f9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/45f637f9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/45f637f9

Branch: refs/heads/master
Commit: 45f637f9ba902f821412982f48f6ac12eff00b40
Parents: 7676e0d
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Dec 16 13:19:57 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Wed Dec 16 13:19:57 2015 -0600

----------------------------------------------------------------------
 .../storm/scheduler/resource/RAS_Node.java      | 228 +++++++++----------
 1 file changed, 114 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/45f637f9/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index be39d2a..a38d9f9 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -43,76 +43,76 @@ import backtype.storm.scheduler.WorkerSlot;
  */
 public class RAS_Node {
     private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
-    private Map<String, Set<WorkerSlot>> topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
-    private Set<WorkerSlot> freeSlots = new HashSet<WorkerSlot>();
-    private final String nodeId;
-    private String hostname;
-    private boolean isAlive;
-    private SupervisorDetails sup;
-    private Double availMemory;
-    private Double availCPU;
-    private Cluster cluster;
-    private Topologies topologies;
+    private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+    private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
+    private final String _nodeId;
+    private String _hostname;
+    private boolean _isAlive;
+    private SupervisorDetails _sup;
+    private Double _availMemory;
+    private Double _availCPU;
+    private Cluster _cluster;
+    private Topologies _topologies;
 
     public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
                     SupervisorDetails sup, Cluster cluster, Topologies topologies) {
-        this.nodeId = nodeId;
-        this.isAlive = isAlive;
-        if (this.isAlive && allPorts != null) {
+        _nodeId = nodeId;
+        _isAlive = isAlive;
+        if (_isAlive && allPorts != null) {
             for (int port : allPorts) {
-                this.freeSlots.add(new WorkerSlot(this.nodeId, port));
+                _freeSlots.add(new WorkerSlot(_nodeId, port));
             }
-            this.sup = sup;
-            this.hostname = sup.getHost();
-            this.availMemory = getTotalMemoryResources();
-            this.availCPU = getTotalCpuResources();
+            _sup = sup;
+            _hostname = sup.getHost();
+            _availMemory = getTotalMemoryResources();
+            _availCPU = getTotalCpuResources();
         }
-        this.cluster = cluster;
-        this.topologies = topologies;
+        _cluster = cluster;
+        _topologies = topologies;
     }
 
     public String getId() {
-        return this.nodeId;
+        return _nodeId;
     }
 
     public String getHostname() {
-        return this.hostname;
+        return _hostname;
     }
 
     public Collection<WorkerSlot> getFreeSlots() {
-        return this.freeSlots;
+        return _freeSlots;
     }
 
     public Collection<WorkerSlot> getUsedSlots() {
         Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
-        for (Collection<WorkerSlot> workers : this.topIdToUsedSlots.values()) {
+        for (Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
             ret.addAll(workers);
         }
         return ret;
     }
 
     public boolean isAlive() {
-        return this.isAlive;
+        return _isAlive;
     }
 
     /**
      * @return a collection of the topology ids currently running on this node
      */
     public Collection<String> getRunningTopologies() {
-        return this.topIdToUsedSlots.keySet();
+        return _topIdToUsedSlots.keySet();
     }
 
     public boolean isTotallyFree() {
-        return this.topIdToUsedSlots.isEmpty();
+        return _topIdToUsedSlots.isEmpty();
     }
 
     public int totalSlotsFree() {
-        return this.freeSlots.size();
+        return _freeSlots.size();
     }
 
     public int totalSlotsUsed() {
         int total = 0;
-        for (Set<WorkerSlot> slots : this.topIdToUsedSlots.values()) {
+        for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
             total += slots.size();
         }
         return total;
@@ -124,7 +124,7 @@ public class RAS_Node {
 
     public int totalSlotsUsed(String topId) {
         int total = 0;
-        Set<WorkerSlot> slots = this.topIdToUsedSlots.get(topId);
+        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
         if (slots != null) {
             total = slots.size();
         }
@@ -132,42 +132,42 @@ public class RAS_Node {
     }
 
     private void validateSlot(WorkerSlot ws) {
-        if (!this.nodeId.equals(ws.getNodeId())) {
+        if (!_nodeId.equals(ws.getNodeId())) {
             throw new IllegalArgumentException(
                     "Trying to add a slot to the wrong node " + ws +
-                            " is not a part of " + this.nodeId);
+                            " is not a part of " + _nodeId);
         }
     }
 
     void addOrphanedSlot(WorkerSlot ws) {
-        if (this.isAlive) {
+        if (_isAlive) {
             throw new IllegalArgumentException("Orphaned Slots " +
                     "only are allowed on dead nodes.");
         }
         validateSlot(ws);
-        if (this.freeSlots.contains(ws)) {
+        if (_freeSlots.contains(ws)) {
             return;
         }
-        for (Set<WorkerSlot> used : this.topIdToUsedSlots.values()) {
+        for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
             if (used.contains(ws)) {
                 return;
             }
         }
-        this.freeSlots.add(ws);
+        _freeSlots.add(ws);
     }
 
     boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
         validateSlot(ws);
-        if (!this.freeSlots.remove(ws)) {
+        if (!_freeSlots.remove(ws)) {
             if (dontThrow) {
                 return true;
             }
             throw new IllegalStateException("Assigning a slot that was not free " + ws);
         }
-        Set<WorkerSlot> usedSlots = this.topIdToUsedSlots.get(topId);
+        Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
         if (usedSlots == null) {
             usedSlots = new HashSet<WorkerSlot>();
-            this.topIdToUsedSlots.put(topId, usedSlots);
+            _topIdToUsedSlots.put(topId, usedSlots);
         }
         usedSlots.add(ws);
         return false;
@@ -177,18 +177,18 @@ public class RAS_Node {
      * Free all slots on this node.  This will update the Cluster too.
      */
     public void freeAllSlots() {
-        if (!this.isAlive) {
-            LOG.warn("Freeing all slots on a dead node {} ", this.nodeId);
-        }
-        for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
-            this.cluster.freeSlots(entry.getValue());
-            this.availCPU = this.getTotalCpuResources();
-            this.availMemory = this.getAvailableMemoryResources();
-            if (this.isAlive) {
-                this.freeSlots.addAll(entry.getValue());
+        if (!_isAlive) {
+            LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
+        }
+        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+            _cluster.freeSlots(entry.getValue());
+            _availCPU = getTotalCpuResources();
+            _availMemory = getAvailableMemoryResources();
+            if (_isAlive) {
+                _freeSlots.addAll(entry.getValue());
             }
         }
-        this.topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+        _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
     }
 
     /**
@@ -196,24 +196,24 @@ public class RAS_Node {
      * @param ws the slot to free
      */
     public void free(WorkerSlot ws) {
-        LOG.info("freeing ws {} on node {}", ws, this.hostname);
-        if (this.freeSlots.contains(ws)) return;
-        for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
+        LOG.info("freeing ws {} on node {}", ws, _hostname);
+        if (_freeSlots.contains(ws)) return;
+        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
             Set<WorkerSlot> slots = entry.getValue();
-            double memUsed = this.getMemoryUsedByWorker(ws);
-            double cpuUsed = this.getCpuUsedByWorker(ws);
+            double memUsed = getMemoryUsedByWorker(ws);
+            double cpuUsed = getCpuUsedByWorker(ws);
             if (slots.remove(ws)) {
-                this.cluster.freeSlot(ws);
-                if (this.isAlive) {
-                    this.freeSlots.add(ws);
+                _cluster.freeSlot(ws);
+                if (_isAlive) {
+                    _freeSlots.add(ws);
                 }
-                this.freeMemory(memUsed);
-                this.freeCPU(cpuUsed);
+                freeMemory(memUsed);
+                freeCPU(cpuUsed);
                 return;
             }
         }
         throw new IllegalArgumentException("Tried to free a slot that was not" +
-                " part of this node " + this.nodeId);
+                " part of this node " + _nodeId);
     }
 
     /**
@@ -221,45 +221,45 @@ public class RAS_Node {
      * @param topId the topology to free slots for
      */
     public void freeTopology(String topId) {
-        Set<WorkerSlot> slots = this.topIdToUsedSlots.get(topId);
+        Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
         if (slots == null || slots.isEmpty()) {
             return;
         }
         for (WorkerSlot ws : slots) {
-            this.cluster.freeSlot(ws);
-            this.freeMemory(this.getMemoryUsedByWorker(ws));
-            this.freeCPU(this.getCpuUsedByWorker(ws));
-            if (this.isAlive) {
-                this.freeSlots.add(ws);
+            _cluster.freeSlot(ws);
+            freeMemory(getMemoryUsedByWorker(ws));
+            freeCPU(getCpuUsedByWorker(ws));
+            if (_isAlive) {
+                _freeSlots.add(ws);
             }
         }
-        this.topIdToUsedSlots.remove(topId);
+        _topIdToUsedSlots.remove(topId);
     }
 
     private void freeMemory(double amount) {
-        LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), this.availMemory);
-        if((this.availMemory + amount) > this.getTotalCpuResources()) {
+        LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, getHostname(), _availMemory);
+        if((_availMemory + amount) > getTotalCpuResources()) {
             LOG.warn("Freeing more memory than there exists!");
             return;
         }
-        this.availMemory += amount;
+        _availMemory += amount;
     }
 
     private void freeCPU(double amount) {
-        LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), this.availCPU);
-        if ((this.availCPU + amount) > this.getAvailableCpuResources()) {
+        LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, getHostname(), _availCPU);
+        if ((_availCPU + amount) > getAvailableCpuResources()) {
             LOG.warn("Freeing more CPU than there exists!");
             return;
         }
-        this.availCPU += amount;
+        _availCPU += amount;
     }
 
     public double getMemoryUsedByWorker(WorkerSlot ws) {
-        TopologyDetails topo = this.findTopologyUsingWorker(ws);
+        TopologyDetails topo = findTopologyUsingWorker(ws);
         if (topo == null) {
             return 0.0;
         }
-        Collection<ExecutorDetails> execs = this.getExecutors(ws, this.cluster);
+        Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
         double totalMemoryUsed = 0.0;
         for (ExecutorDetails exec : execs) {
             totalMemoryUsed += topo.getTotalMemReqTask(exec);
@@ -268,11 +268,11 @@ public class RAS_Node {
     }
 
     public double getCpuUsedByWorker(WorkerSlot ws) {
-        TopologyDetails topo = this.findTopologyUsingWorker(ws);
+        TopologyDetails topo = findTopologyUsingWorker(ws);
         if (topo == null) {
             return 0.0;
         }
-        Collection<ExecutorDetails> execs = this.getExecutors(ws, this.cluster);
+        Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
         double totalCpuUsed = 0.0;
         for (ExecutorDetails exec : execs) {
             totalCpuUsed += topo.getTotalCpuReqTask(exec);
@@ -281,12 +281,12 @@ public class RAS_Node {
     }
 
     public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
-        for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
+        for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
             String topoId = entry.getKey();
             Set<WorkerSlot> workers = entry.getValue();
             for (WorkerSlot worker : workers) {
                 if (worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
-                    return this.topologies.getById(topoId);
+                    return _topologies.getById(topoId);
                 }
             }
         }
@@ -321,24 +321,24 @@ public class RAS_Node {
     }
 
     public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
-        if (!this.isAlive) {
-            throw new IllegalStateException("Trying to adding to a dead node " + this.nodeId);
+        if (!_isAlive) {
+            throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
         }
-        if (this.freeSlots.isEmpty()) {
-            throw new IllegalStateException("Trying to assign to a full node " + this.nodeId);
+        if (_freeSlots.isEmpty()) {
+            throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
         }
         if (executors.size() == 0) {
-            LOG.warn("Trying to assign nothing from " + td.getId() + " to " + this.nodeId + " (Ignored)");
+            LOG.warn("Trying to assign nothing from " + td.getId() + " to " + _nodeId + " (Ignored)");
         }
 
         if (target == null) {
-            target = this.freeSlots.iterator().next();
+            target = _freeSlots.iterator().next();
         }
-        if (!this.freeSlots.contains(target)) {
-            throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + this.nodeId);
+        if (!_freeSlots.contains(target)) {
+            throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
         } else {
             allocateResourceToSlot(td, executors, target);
-            this.cluster.assign(target, td.getId(), executors);
+            _cluster.assign(target, td.getId(), executors);
             assignInternal(target, td.getId(), false);
         }
     }
@@ -350,27 +350,27 @@ public class RAS_Node {
      * @param executors the executors to run in that slot.
      */
     public void assign(TopologyDetails td, Collection<ExecutorDetails> executors) {
-        this.assign(null, td, executors);
+        assign(null, td, executors);
     }
 
     @Override
     public boolean equals(Object other) {
         if (other instanceof RAS_Node) {
-            return this.nodeId.equals(((RAS_Node) other).nodeId);
+            return _nodeId.equals(((RAS_Node) other)._nodeId);
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return this.nodeId.hashCode();
+        return _nodeId.hashCode();
     }
 
     @Override
     public String toString() {
-        return "{Node: " + ((this.sup == null) ? "null (possibly down)" : this.sup.getHost())
-                + ", AvailMem: " + ((this.availMemory == null) ? "N/A" : this.availMemory.toString())
-                + ", this.availCPU: " + ((this.availCPU == null) ? "N/A" : this.availCPU.toString()) + "}";
+        return "{Node: " + ((_sup == null) ? "null (possibly down)" : _sup.getHost())
+                + ", AvailMem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
+                + ", AvailCPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + "}";
     }
 
     public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {
@@ -433,7 +433,7 @@ public class RAS_Node {
      * @param amount the amount to set as available memory
      */
     public void setAvailableMemory(Double amount) {
-        this.availMemory = amount;
+        _availMemory = amount;
     }
 
     /**
@@ -441,10 +441,10 @@ public class RAS_Node {
      * @return the available memory for this node
      */
     public Double getAvailableMemoryResources() {
-        if (this.availMemory == null) {
+        if (_availMemory == null) {
             return 0.0;
         }
-        return this.availMemory;
+        return _availMemory;
     }
 
     /**
@@ -452,8 +452,8 @@ public class RAS_Node {
      * @return the total memory for this node
      */
     public Double getTotalMemoryResources() {
-        if (this.sup != null && this.sup.getTotalMemory() != null) {
-            return this.sup.getTotalMemory();
+        if (_sup != null && _sup.getTotalMemory() != null) {
+            return _sup.getTotalMemory();
         } else {
             return 0.0;
         }
@@ -465,12 +465,12 @@ public class RAS_Node {
      * @return the current available memory for this node after consumption
      */
     public Double consumeMemory(Double amount) {
-        if (amount > this.availMemory) {
-            LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, this.availMemory);
+        if (amount > _availMemory) {
+            LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, _availMemory);
             return null;
         }
-        this.availMemory = this.availMemory - amount;
-        return this.availMemory;
+        _availMemory = _availMemory - amount;
+        return _availMemory;
     }
 
     /**
@@ -478,10 +478,10 @@ public class RAS_Node {
      * @return the available cpu for this node
      */
     public Double getAvailableCpuResources() {
-        if (this.availCPU == null) {
+        if (_availCPU == null) {
             return 0.0;
         }
-        return this.availCPU;
+        return _availCPU;
     }
 
     /**
@@ -489,8 +489,8 @@ public class RAS_Node {
      * @return the total cpu for this node
      */
     public Double getTotalCpuResources() {
-        if (this.sup != null && this.sup.getTotalCPU() != null) {
-            return this.sup.getTotalCPU();
+        if (_sup != null && _sup.getTotalCPU() != null) {
+            return _sup.getTotalCPU();
         } else {
             return 0.0;
         }
@@ -502,12 +502,12 @@ public class RAS_Node {
      * @return the current available cpu for this node after consumption
      */
     public Double consumeCPU(Double amount) {
-        if (amount > this.availCPU) {
-            LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, this.availCPU);
+        if (amount > _availCPU) {
+            LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, _availCPU);
             return null;
         }
-        this.availCPU = this.availCPU - amount;
-        return this.availCPU;
+        _availCPU = _availCPU - amount;
+        return _availCPU;
     }
 
     /**
@@ -518,11 +518,11 @@ public class RAS_Node {
     public void consumeResourcesforTask(ExecutorDetails exec, TopologyDetails topo) {
         Double taskMemReq = topo.getTotalMemReqTask(exec);
         Double taskCpuReq = topo.getTotalCpuReqTask(exec);
-        this.consumeCPU(taskCpuReq);
-        this.consumeMemory(taskMemReq);
+        consumeCPU(taskCpuReq);
+        consumeMemory(taskMemReq);
     }
 
     public Map<String, Set<WorkerSlot>> getTopoIdTousedSlots() {
-        return this.topIdToUsedSlots;
+        return _topIdToUsedSlots;
     }
 }