You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/05/14 17:36:41 UTC
[3/5] storm git commit: Fixed Checkstyle Issues
Fixed Checkstyle Issues
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d0be8aed
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d0be8aed
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d0be8aed
Branch: refs/heads/master
Commit: d0be8aed6ccba2abf7b32084fe8255f15514563c
Parents: 32392bc
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon May 7 09:56:27 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon May 7 09:56:27 2018 -0500
----------------------------------------------------------------------
.../apache/storm/testing/PerformanceTest.java | 4 +--
.../storm/scheduler/resource/RAS_Node.java | 37 +++++++++-----------
.../normalization/NormalizedResourceOffer.java | 9 +++--
.../NormalizedResourceRequest.java | 8 +++++
.../normalization/NormalizedResources.java | 11 ++++--
.../normalization/ResourceMapArrayBridge.java | 2 +-
.../GenericResourceAwareStrategy.java | 4 +++
7 files changed, 46 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
index e657dd8..c5eba2f 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/PerformanceTest.java
@@ -25,13 +25,11 @@ package org.apache.storm.testing;
* add the annotation @Category(PerformanceTest.class) to the class definition as well as to its hierarchy of superclasses.
* For example:
* <p/>
- *
- *
* @ Category(PerformanceTest.class)<br/>
* public class MyPerformanceTest {<br/>
* ...<br/>
* }
- *
+ * <p/>
* In general performance tests should have a time limit on them, but the time limit should be liberal enough to account
* for running on CI systems like travis ci, or the apache jenkins build.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
index 3a9570e..6075d5a 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/RAS_Node.java
@@ -53,6 +53,14 @@ public class RAS_Node {
private boolean isAlive;
private SupervisorDetails sup;
+ /**
+ * Create a new node.
+ * @param nodeId the id of the node.
+ * @param sup the supervisor this is for.
+ * @param cluster the cluster this is a part of.
+ * @param workerIdToWorker the mapping of slots already assigned to this node.
+ * @param assignmentMap the mapping of executors already assigned to this node.
+ */
public RAS_Node(
String nodeId,
SupervisorDetails sup,
@@ -99,26 +107,6 @@ public class RAS_Node {
}
}
- public static int countFreeSlotsAlive(Collection<RAS_Node> nodes) {
- int total = 0;
- for (RAS_Node n : nodes) {
- if (n.isAlive()) {
- total += n.totalSlotsFree();
- }
- }
- return total;
- }
-
- public static int countTotalSlotsAlive(Collection<RAS_Node> nodes) {
- int total = 0;
- for (RAS_Node n : nodes) {
- if (n.isAlive()) {
- total += n.totalSlots();
- }
- }
- return total;
- }
-
public String getId() {
return nodeId;
}
@@ -135,6 +123,10 @@ public class RAS_Node {
return ret;
}
+ /**
+ * Get the IDs of all free slots on this node.
+ * @return the ids of the free slots.
+ */
public Collection<String> getFreeSlotsId() {
if (!isAlive) {
return new HashSet<>();
@@ -164,6 +156,11 @@ public class RAS_Node {
return workerIdsToWorkers(getUsedSlotsId());
}
+ /**
+ * Get slots used by the given topology.
+ * @param topId the id of the topology to get.
+ * @return the slots currently assigned to that topology on this node.
+ */
public Collection<WorkerSlot> getUsedSlots(String topId) {
Collection<WorkerSlot> ret = null;
if (topIdToUsedSlots.get(topId) != null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
index 790a42b..b0defbb 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
@@ -96,7 +96,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
/**
* Calculate the average percentage used.
* @see NormalizedResources#calculateAveragePercentageUsedBy(org.apache.storm.scheduler.resource.normalization.NormalizedResources,
- * double, double).
+ * double, double)
*/
public double calculateAveragePercentageUsedBy(NormalizedResourceOffer used) {
return normalizedResources.calculateAveragePercentageUsedBy(
@@ -115,7 +115,7 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
/**
* Check if resources might be able to fit.
* @see NormalizedResources#couldHoldIgnoringSharedMemory(org.apache.storm.scheduler.resource.normalization.NormalizedResources, double,
- * double).
+ * double)
*/
public boolean couldHoldIgnoringSharedMemory(NormalizedResourcesWithMemory other) {
return normalizedResources.couldHoldIgnoringSharedMemory(
@@ -136,6 +136,11 @@ public class NormalizedResourceOffer implements NormalizedResourcesWithMemory {
return "Normalized resources: " + toNormalizedMap();
}
+ /**
+ * If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
+ * be less likely to be selected.
+ * @param requestedResources the requested resources.
+ */
public void updateForRareResourceAffinity(NormalizedResourceRequest requestedResources) {
normalizedResources.updateForRareResourceAffinity(requestedResources.getNormalizedResources());
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
index 86033b8..2a30ffc 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceRequest.java
@@ -134,6 +134,10 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
return topologyResources;
}
+ /**
+ * Convert to a map that is used by configuration and the UI.
+ * @return a map with the key as the resource name and the value the resource amount.
+ */
public Map<String, Double> toNormalizedMap() {
Map<String, Double> ret = this.normalizedResources.toNormalizedMap();
ret.put(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME, offHeap);
@@ -168,6 +172,10 @@ public class NormalizedResourceRequest implements NormalizedResourcesWithMemory
offHeap += other.offHeap;
}
+ /**
+ * Add the resources from a worker to those in this.
+ * @param value the resources on the worker.
+ */
public void add(WorkerResources value) {
this.normalizedResources.add(value);
//The resources are already normalized
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
index e57d8f2..9fb717f 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResources.java
@@ -347,10 +347,15 @@ public class NormalizedResources {
return min * 100.0;
}
- public void updateForRareResourceAffinity(NormalizedResources other) {
- int length = Math.min(this.otherResources.length, other.otherResources.length);
+ /**
+ * If a node or rack has a kind of resource not in a request, make that resource negative so when sorting that node or rack will
+ * be less likely to be selected.
+ * @param request the requested resources.
+ */
+ public void updateForRareResourceAffinity(NormalizedResources request) {
+ int length = Math.min(this.otherResources.length, request.otherResources.length);
for (int i = 0; i < length; i++) {
- if (other.getResourceAt(i) == 0.0) {
+ if (request.getResourceAt(i) == 0.0) {
this.otherResources[i] = -1 * this.otherResources[i];
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
index 5a71db4..4a4797b 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/ResourceMapArrayBridge.java
@@ -65,7 +65,7 @@ public class ResourceMapArrayBridge {
}
/**
- * Create an array that has all values 0;
+ * Create an array that has all values 0.
* @return the empty array.
*/
public double[] empty() {
http://git-wip-us.apache.org/repos/asf/storm/blob/d0be8aed/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
index ce98abd..0c85e1c 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/GenericResourceAwareStrategy.java
@@ -44,6 +44,10 @@ public class GenericResourceAwareStrategy extends BaseResourceAwareStrategy impl
final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails,
final ExistingScheduleFunc existingScheduleFunc) {
AllResources affinityBasedAllResources = new AllResources(allResources);
+ NormalizedResourceRequest requestedResources = topologyDetails.getTotalResources(exec);
+ for (ObjectResources objectResources : affinityBasedAllResources.objectResources) {
+ objectResources.availableResources.updateForRareResourceAffinity(requestedResources);
+ }
TreeSet<ObjectResources> sortedObjectResources =
new TreeSet<>((o1, o2) -> {