You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by da...@apache.org on 2015/12/21 15:46:45 UTC
[17/23] storm git commit: renaming RAS_Node.java variables
renaming RAS_Node.java variables
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/45f637f9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/45f637f9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/45f637f9
Branch: refs/heads/master
Commit: 45f637f9ba902f821412982f48f6ac12eff00b40
Parents: 7676e0d
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Wed Dec 16 13:19:57 2015 -0600
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Wed Dec 16 13:19:57 2015 -0600
----------------------------------------------------------------------
.../storm/scheduler/resource/RAS_Node.java | 228 +++++++++----------
1 file changed, 114 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/45f637f9/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
index be39d2a..a38d9f9 100644
--- a/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
+++ b/storm-core/src/jvm/backtype/storm/scheduler/resource/RAS_Node.java
@@ -43,76 +43,76 @@ import backtype.storm.scheduler.WorkerSlot;
*/
public class RAS_Node {
private static final Logger LOG = LoggerFactory.getLogger(RAS_Node.class);
- private Map<String, Set<WorkerSlot>> topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
- private Set<WorkerSlot> freeSlots = new HashSet<WorkerSlot>();
- private final String nodeId;
- private String hostname;
- private boolean isAlive;
- private SupervisorDetails sup;
- private Double availMemory;
- private Double availCPU;
- private Cluster cluster;
- private Topologies topologies;
+ private Map<String, Set<WorkerSlot>> _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+ private Set<WorkerSlot> _freeSlots = new HashSet<WorkerSlot>();
+ private final String _nodeId;
+ private String _hostname;
+ private boolean _isAlive;
+ private SupervisorDetails _sup;
+ private Double _availMemory;
+ private Double _availCPU;
+ private Cluster _cluster;
+ private Topologies _topologies;
public RAS_Node(String nodeId, Set<Integer> allPorts, boolean isAlive,
SupervisorDetails sup, Cluster cluster, Topologies topologies) {
- this.nodeId = nodeId;
- this.isAlive = isAlive;
- if (this.isAlive && allPorts != null) {
+ _nodeId = nodeId;
+ _isAlive = isAlive;
+ if (_isAlive && allPorts != null) {
for (int port : allPorts) {
- this.freeSlots.add(new WorkerSlot(this.nodeId, port));
+ _freeSlots.add(new WorkerSlot(_nodeId, port));
}
- this.sup = sup;
- this.hostname = sup.getHost();
- this.availMemory = getTotalMemoryResources();
- this.availCPU = getTotalCpuResources();
+ _sup = sup;
+ _hostname = sup.getHost();
+ _availMemory = getTotalMemoryResources();
+ _availCPU = getTotalCpuResources();
}
- this.cluster = cluster;
- this.topologies = topologies;
+ _cluster = cluster;
+ _topologies = topologies;
}
public String getId() {
- return this.nodeId;
+ return _nodeId;
}
public String getHostname() {
- return this.hostname;
+ return _hostname;
}
public Collection<WorkerSlot> getFreeSlots() {
- return this.freeSlots;
+ return _freeSlots;
}
public Collection<WorkerSlot> getUsedSlots() {
Collection<WorkerSlot> ret = new LinkedList<WorkerSlot>();
- for (Collection<WorkerSlot> workers : this.topIdToUsedSlots.values()) {
+ for (Collection<WorkerSlot> workers : _topIdToUsedSlots.values()) {
ret.addAll(workers);
}
return ret;
}
public boolean isAlive() {
- return this.isAlive;
+ return _isAlive;
}
/**
* @return a collection of the topology ids currently running on this node
*/
public Collection<String> getRunningTopologies() {
- return this.topIdToUsedSlots.keySet();
+ return _topIdToUsedSlots.keySet();
}
public boolean isTotallyFree() {
- return this.topIdToUsedSlots.isEmpty();
+ return _topIdToUsedSlots.isEmpty();
}
public int totalSlotsFree() {
- return this.freeSlots.size();
+ return _freeSlots.size();
}
public int totalSlotsUsed() {
int total = 0;
- for (Set<WorkerSlot> slots : this.topIdToUsedSlots.values()) {
+ for (Set<WorkerSlot> slots : _topIdToUsedSlots.values()) {
total += slots.size();
}
return total;
@@ -124,7 +124,7 @@ public class RAS_Node {
public int totalSlotsUsed(String topId) {
int total = 0;
- Set<WorkerSlot> slots = this.topIdToUsedSlots.get(topId);
+ Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
if (slots != null) {
total = slots.size();
}
@@ -132,42 +132,42 @@ public class RAS_Node {
}
private void validateSlot(WorkerSlot ws) {
- if (!this.nodeId.equals(ws.getNodeId())) {
+ if (!_nodeId.equals(ws.getNodeId())) {
throw new IllegalArgumentException(
"Trying to add a slot to the wrong node " + ws +
- " is not a part of " + this.nodeId);
+ " is not a part of " + _nodeId);
}
}
void addOrphanedSlot(WorkerSlot ws) {
- if (this.isAlive) {
+ if (_isAlive) {
throw new IllegalArgumentException("Orphaned Slots " +
"only are allowed on dead nodes.");
}
validateSlot(ws);
- if (this.freeSlots.contains(ws)) {
+ if (_freeSlots.contains(ws)) {
return;
}
- for (Set<WorkerSlot> used : this.topIdToUsedSlots.values()) {
+ for (Set<WorkerSlot> used : _topIdToUsedSlots.values()) {
if (used.contains(ws)) {
return;
}
}
- this.freeSlots.add(ws);
+ _freeSlots.add(ws);
}
boolean assignInternal(WorkerSlot ws, String topId, boolean dontThrow) {
validateSlot(ws);
- if (!this.freeSlots.remove(ws)) {
+ if (!_freeSlots.remove(ws)) {
if (dontThrow) {
return true;
}
throw new IllegalStateException("Assigning a slot that was not free " + ws);
}
- Set<WorkerSlot> usedSlots = this.topIdToUsedSlots.get(topId);
+ Set<WorkerSlot> usedSlots = _topIdToUsedSlots.get(topId);
if (usedSlots == null) {
usedSlots = new HashSet<WorkerSlot>();
- this.topIdToUsedSlots.put(topId, usedSlots);
+ _topIdToUsedSlots.put(topId, usedSlots);
}
usedSlots.add(ws);
return false;
@@ -177,18 +177,18 @@ public class RAS_Node {
* Free all slots on this node. This will update the Cluster too.
*/
public void freeAllSlots() {
- if (!this.isAlive) {
- LOG.warn("Freeing all slots on a dead node {} ", this.nodeId);
- }
- for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
- this.cluster.freeSlots(entry.getValue());
- this.availCPU = this.getTotalCpuResources();
- this.availMemory = this.getAvailableMemoryResources();
- if (this.isAlive) {
- this.freeSlots.addAll(entry.getValue());
+ if (!_isAlive) {
+ LOG.warn("Freeing all slots on a dead node {} ", _nodeId);
+ }
+ for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
+ _cluster.freeSlots(entry.getValue());
+ _availCPU = getTotalCpuResources();
+ _availMemory = getAvailableMemoryResources();
+ if (_isAlive) {
+ _freeSlots.addAll(entry.getValue());
}
}
- this.topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+ _topIdToUsedSlots = new HashMap<String, Set<WorkerSlot>>();
}
/**
@@ -196,24 +196,24 @@ public class RAS_Node {
* @param ws the slot to free
*/
public void free(WorkerSlot ws) {
- LOG.info("freeing ws {} on node {}", ws, this.hostname);
- if (this.freeSlots.contains(ws)) return;
- for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
+ LOG.info("freeing ws {} on node {}", ws, _hostname);
+ if (_freeSlots.contains(ws)) return;
+ for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
Set<WorkerSlot> slots = entry.getValue();
- double memUsed = this.getMemoryUsedByWorker(ws);
- double cpuUsed = this.getCpuUsedByWorker(ws);
+ double memUsed = getMemoryUsedByWorker(ws);
+ double cpuUsed = getCpuUsedByWorker(ws);
if (slots.remove(ws)) {
- this.cluster.freeSlot(ws);
- if (this.isAlive) {
- this.freeSlots.add(ws);
+ _cluster.freeSlot(ws);
+ if (_isAlive) {
+ _freeSlots.add(ws);
}
- this.freeMemory(memUsed);
- this.freeCPU(cpuUsed);
+ freeMemory(memUsed);
+ freeCPU(cpuUsed);
return;
}
}
throw new IllegalArgumentException("Tried to free a slot that was not" +
- " part of this node " + this.nodeId);
+ " part of this node " + _nodeId);
}
/**
@@ -221,45 +221,45 @@ public class RAS_Node {
* @param topId the topology to free slots for
*/
public void freeTopology(String topId) {
- Set<WorkerSlot> slots = this.topIdToUsedSlots.get(topId);
+ Set<WorkerSlot> slots = _topIdToUsedSlots.get(topId);
if (slots == null || slots.isEmpty()) {
return;
}
for (WorkerSlot ws : slots) {
- this.cluster.freeSlot(ws);
- this.freeMemory(this.getMemoryUsedByWorker(ws));
- this.freeCPU(this.getCpuUsedByWorker(ws));
- if (this.isAlive) {
- this.freeSlots.add(ws);
+ _cluster.freeSlot(ws);
+ freeMemory(getMemoryUsedByWorker(ws));
+ freeCPU(getCpuUsedByWorker(ws));
+ if (_isAlive) {
+ _freeSlots.add(ws);
}
}
- this.topIdToUsedSlots.remove(topId);
+ _topIdToUsedSlots.remove(topId);
}
private void freeMemory(double amount) {
- LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, this.getHostname(), this.availMemory);
- if((this.availMemory + amount) > this.getTotalCpuResources()) {
+ LOG.debug("freeing {} memory on node {}...avail mem: {}", amount, getHostname(), _availMemory);
+ if((_availMemory + amount) > getTotalCpuResources()) {
LOG.warn("Freeing more memory than there exists!");
return;
}
- this.availMemory += amount;
+ _availMemory += amount;
}
private void freeCPU(double amount) {
- LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, this.getHostname(), this.availCPU);
- if ((this.availCPU + amount) > this.getAvailableCpuResources()) {
+ LOG.debug("freeing {} CPU on node...avail CPU: {}", amount, getHostname(), _availCPU);
+ if ((_availCPU + amount) > getAvailableCpuResources()) {
LOG.warn("Freeing more CPU than there exists!");
return;
}
- this.availCPU += amount;
+ _availCPU += amount;
}
public double getMemoryUsedByWorker(WorkerSlot ws) {
- TopologyDetails topo = this.findTopologyUsingWorker(ws);
+ TopologyDetails topo = findTopologyUsingWorker(ws);
if (topo == null) {
return 0.0;
}
- Collection<ExecutorDetails> execs = this.getExecutors(ws, this.cluster);
+ Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
double totalMemoryUsed = 0.0;
for (ExecutorDetails exec : execs) {
totalMemoryUsed += topo.getTotalMemReqTask(exec);
@@ -268,11 +268,11 @@ public class RAS_Node {
}
public double getCpuUsedByWorker(WorkerSlot ws) {
- TopologyDetails topo = this.findTopologyUsingWorker(ws);
+ TopologyDetails topo = findTopologyUsingWorker(ws);
if (topo == null) {
return 0.0;
}
- Collection<ExecutorDetails> execs = this.getExecutors(ws, this.cluster);
+ Collection<ExecutorDetails> execs = getExecutors(ws, _cluster);
double totalCpuUsed = 0.0;
for (ExecutorDetails exec : execs) {
totalCpuUsed += topo.getTotalCpuReqTask(exec);
@@ -281,12 +281,12 @@ public class RAS_Node {
}
public TopologyDetails findTopologyUsingWorker(WorkerSlot ws) {
- for (Entry<String, Set<WorkerSlot>> entry : this.topIdToUsedSlots.entrySet()) {
+ for (Entry<String, Set<WorkerSlot>> entry : _topIdToUsedSlots.entrySet()) {
String topoId = entry.getKey();
Set<WorkerSlot> workers = entry.getValue();
for (WorkerSlot worker : workers) {
if (worker.getNodeId().equals(ws.getNodeId()) && worker.getPort() == ws.getPort()) {
- return this.topologies.getById(topoId);
+ return _topologies.getById(topoId);
}
}
}
@@ -321,24 +321,24 @@ public class RAS_Node {
}
public void assign(WorkerSlot target, TopologyDetails td, Collection<ExecutorDetails> executors) {
- if (!this.isAlive) {
- throw new IllegalStateException("Trying to adding to a dead node " + this.nodeId);
+ if (!_isAlive) {
+ throw new IllegalStateException("Trying to adding to a dead node " + _nodeId);
}
- if (this.freeSlots.isEmpty()) {
- throw new IllegalStateException("Trying to assign to a full node " + this.nodeId);
+ if (_freeSlots.isEmpty()) {
+ throw new IllegalStateException("Trying to assign to a full node " + _nodeId);
}
if (executors.size() == 0) {
- LOG.warn("Trying to assign nothing from " + td.getId() + " to " + this.nodeId + " (Ignored)");
+ LOG.warn("Trying to assign nothing from " + td.getId() + " to " + _nodeId + " (Ignored)");
}
if (target == null) {
- target = this.freeSlots.iterator().next();
+ target = _freeSlots.iterator().next();
}
- if (!this.freeSlots.contains(target)) {
- throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + this.nodeId);
+ if (!_freeSlots.contains(target)) {
+ throw new IllegalStateException("Trying to assign already used slot" + target.getPort() + "on node " + _nodeId);
} else {
allocateResourceToSlot(td, executors, target);
- this.cluster.assign(target, td.getId(), executors);
+ _cluster.assign(target, td.getId(), executors);
assignInternal(target, td.getId(), false);
}
}
@@ -350,27 +350,27 @@ public class RAS_Node {
* @param executors the executors to run in that slot.
*/
public void assign(TopologyDetails td, Collection<ExecutorDetails> executors) {
- this.assign(null, td, executors);
+ assign(null, td, executors);
}
@Override
public boolean equals(Object other) {
if (other instanceof RAS_Node) {
- return this.nodeId.equals(((RAS_Node) other).nodeId);
+ return _nodeId.equals(((RAS_Node) other)._nodeId);
}
return false;
}
@Override
public int hashCode() {
- return this.nodeId.hashCode();
+ return _nodeId.hashCode();
}
@Override
public String toString() {
- return "{Node: " + ((this.sup == null) ? "null (possibly down)" : this.sup.getHost())
- + ", AvailMem: " + ((this.availMemory == null) ? "N/A" : this.availMemory.toString())
- + ", this.availCPU: " + ((this.availCPU == null) ? "N/A" : this.availCPU.toString()) + "}";
+ return "{Node: " + ((_sup == null) ? "null (possibly down)" : _sup.getHost())
+ + ", AvailMem: " + ((_availMemory == null) ? "N/A" : _availMemory.toString())
+ + ", AvailCPU: " + ((_availCPU == null) ? "N/A" : _availCPU.toString()) + "}";
}
public static int countSlotsUsed(String topId, Collection<RAS_Node> nodes) {
@@ -433,7 +433,7 @@ public class RAS_Node {
* @param amount the amount to set as available memory
*/
public void setAvailableMemory(Double amount) {
- this.availMemory = amount;
+ _availMemory = amount;
}
/**
@@ -441,10 +441,10 @@ public class RAS_Node {
* @return the available memory for this node
*/
public Double getAvailableMemoryResources() {
- if (this.availMemory == null) {
+ if (_availMemory == null) {
return 0.0;
}
- return this.availMemory;
+ return _availMemory;
}
/**
@@ -452,8 +452,8 @@ public class RAS_Node {
* @return the total memory for this node
*/
public Double getTotalMemoryResources() {
- if (this.sup != null && this.sup.getTotalMemory() != null) {
- return this.sup.getTotalMemory();
+ if (_sup != null && _sup.getTotalMemory() != null) {
+ return _sup.getTotalMemory();
} else {
return 0.0;
}
@@ -465,12 +465,12 @@ public class RAS_Node {
* @return the current available memory for this node after consumption
*/
public Double consumeMemory(Double amount) {
- if (amount > this.availMemory) {
- LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, this.availMemory);
+ if (amount > _availMemory) {
+ LOG.error("Attempting to consume more memory than available! Needed: {}, we only have: {}", amount, _availMemory);
return null;
}
- this.availMemory = this.availMemory - amount;
- return this.availMemory;
+ _availMemory = _availMemory - amount;
+ return _availMemory;
}
/**
@@ -478,10 +478,10 @@ public class RAS_Node {
* @return the available cpu for this node
*/
public Double getAvailableCpuResources() {
- if (this.availCPU == null) {
+ if (_availCPU == null) {
return 0.0;
}
- return this.availCPU;
+ return _availCPU;
}
/**
@@ -489,8 +489,8 @@ public class RAS_Node {
* @return the total cpu for this node
*/
public Double getTotalCpuResources() {
- if (this.sup != null && this.sup.getTotalCPU() != null) {
- return this.sup.getTotalCPU();
+ if (_sup != null && _sup.getTotalCPU() != null) {
+ return _sup.getTotalCPU();
} else {
return 0.0;
}
@@ -502,12 +502,12 @@ public class RAS_Node {
* @return the current available cpu for this node after consumption
*/
public Double consumeCPU(Double amount) {
- if (amount > this.availCPU) {
- LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, this.availCPU);
+ if (amount > _availCPU) {
+ LOG.error("Attempting to consume more CPU than available! Needed: {}, we only have: {}", amount, _availCPU);
return null;
}
- this.availCPU = this.availCPU - amount;
- return this.availCPU;
+ _availCPU = _availCPU - amount;
+ return _availCPU;
}
/**
@@ -518,11 +518,11 @@ public class RAS_Node {
public void consumeResourcesforTask(ExecutorDetails exec, TopologyDetails topo) {
Double taskMemReq = topo.getTotalMemReqTask(exec);
Double taskCpuReq = topo.getTotalCpuReqTask(exec);
- this.consumeCPU(taskCpuReq);
- this.consumeMemory(taskMemReq);
+ consumeCPU(taskCpuReq);
+ consumeMemory(taskMemReq);
}
public Map<String, Set<WorkerSlot>> getTopoIdTousedSlots() {
- return this.topIdToUsedSlots;
+ return _topIdToUsedSlots;
}
}