You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/19 20:42:39 UTC
[1/2] storm git commit: [STORM-2738] The number of ackers should
default to the number of actual running workers on RAS cluster
Repository: storm
Updated Branches:
refs/heads/master f7443a822 -> 2da6b8197
[STORM-2738] The number of ackers should default to the number of actual running workers on RAS cluster
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb86dcc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb86dcc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb86dcc8
Branch: refs/heads/master
Commit: bb86dcc8cb57351b34077a506336f6b5c90f7bbc
Parents: da2f035
Author: Ethan Li <et...@gmail.com>
Authored: Wed Sep 13 13:55:47 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Tue Sep 19 08:40:32 2017 -0500
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/utils/Utils.java | 8 ++
.../org/apache/storm/daemon/nimbus/Nimbus.java | 105 +++++++++----------
.../org/apache/storm/utils/ServerUtils.java | 65 ++++++++++--
3 files changed, 118 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bb86dcc8/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index a028935..b1f3ca7 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -1501,4 +1501,12 @@ public class Utils {
public static boolean isLocalhostAddress(String address) {
return LOCALHOST_ADDRESSES.contains(address);
}
+
+ public static <K, V> Map<K, V> merge(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> other) {
+ Map<K, V> ret = new HashMap<>(first);
+ if (other != null) {
+ ret.putAll(other);
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bb86dcc8/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 00397a6..2c81f8e 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -18,6 +18,9 @@
package org.apache.storm.daemon.nimbus;
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -47,9 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import javax.security.auth.Subject;
-
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.StormTimer;
@@ -179,11 +180,6 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private final static Logger LOG = LoggerFactory.getLogger(Nimbus.class);
@@ -451,14 +447,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
});
}
- private static <K, V> Map<K, V> merge(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> other) {
- Map<K, V> ret = new HashMap<>(first);
- if (other != null) {
- ret.putAll(other);
- }
- return ret;
- }
-
private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
Map<K, V> ret = new HashMap<>();
for (Entry<? extends K, ? extends V> entry: second.entrySet()) {
@@ -757,23 +745,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
return ret;
}
-
- private static int componentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException {
- Map<String, Object> combinedConf = merge(topoConf, StormCommon.componentConf(component));
- int numTasks = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_TASKS), StormCommon.numStartExecutors(component));
- Integer maxParallel = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM), null);
- int ret = numTasks;
- if (maxParallel != null) {
- ret = Math.min(maxParallel, numTasks);
- }
- return ret;
- }
-
+
private static StormTopology normalizeTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
StormTopology ret = topology.deepCopy();
for (Object comp: StormCommon.allComponents(ret).values()) {
Map<String, Object> mergedConf = StormCommon.componentConf(comp);
- mergedConf.put(Config.TOPOLOGY_TASKS, componentParallelism(topoConf, comp));
+ mergedConf.put(Config.TOPOLOGY_TASKS, ServerUtils.getComponentParallelism(topoConf, comp));
String jsonConf = JSONValue.toJSONString(mergedConf);
StormCommon.getComponentCommon(comp).set_json_conf(jsonConf);
}
@@ -820,7 +797,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
addToSerializers(serializers, (List<Object>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER,
conf.get(Config.TOPOLOGY_KRYO_REGISTER)));
- Map<String, Object> mergedConf = merge(conf, topoConf);
+ Map<String, Object> mergedConf = Utils.merge(conf, topoConf);
Map<String, Object> ret = new HashMap<>(topoConf);
ret.put(Config.TOPOLOGY_KRYO_REGISTER, serializers);
ret.put(Config.TOPOLOGY_KRYO_DECORATORS, new ArrayList<>(decorators));
@@ -1006,7 +983,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
public static Nimbus launch(INimbus inimbus) throws Exception {
- Map<String, Object> conf = merge(Utils.readStormConfig(),
+ Map<String, Object> conf = Utils.merge(Utils.readStormConfig(),
ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
return launchServer(conf, inimbus);
}
@@ -1628,12 +1605,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
scheduler.schedule(topologies, cluster);
//merge with existing statuses
- idToSchedStatus.set(merge(idToSchedStatus.get(), cluster.getStatusMap()));
+ idToSchedStatus.set(Utils.merge(idToSchedStatus.get(), cluster.getStatusMap()));
nodeIdToResources.set(cluster.getSupervisorsResourcesMap());
// This is a hack for non-ras scheduler topology and worker resources
Map<String, TopologyResources> resources = cluster.getTopologyResourcesMap();
- idToResources.getAndAccumulate(resources, (orig, update) -> merge(orig, update));
+ idToResources.getAndAccumulate(resources, (orig, update) -> Utils.merge(orig, update));
Map<String, Map<WorkerSlot, WorkerResources>> workerResources = new HashMap<>();
for (Entry<String, Map<WorkerSlot, WorkerResources>> uglyWorkerResources: cluster.getWorkerResourcesMap().entrySet()) {
@@ -1644,7 +1621,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
workerResources.put(uglyWorkerResources.getKey(), slotToResources);
}
- idToWorkerResources.getAndAccumulate(workerResources, (orig, update) -> merge(orig, update));
+ idToWorkerResources.getAndAccumulate(workerResources, (orig, update) -> Utils.merge(orig, update));
return cluster.getAssignments();
}
@@ -1971,7 +1948,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private boolean isAuthorized(String operation, String topoId) throws NotAliveException, AuthorizationException, IOException {
Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
try {
checkAuthorization(topoName, topoConf, operation);
@@ -2165,7 +2142,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
String id = entry.getKey();
String ownerPrincipal = entry.getValue().get_principal();
- Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, topoCache)));
+ Map<String, Object> topoConf = Collections.unmodifiableMap(Utils.merge(conf, tryReadTopoConf(id, topoCache)));
synchronized(lock) {
Credentials origCreds = state.credentials(id, null);
if (origCreds != null) {
@@ -2521,7 +2498,22 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
return largestMemoryOperator;
}
-
+
+ //get the number of acker executors. We need to estimate it if it's on RAS cluster.
+ private int getNumOfAckerExecs(Map<String, Object> totalConf, StormTopology topology) throws InvalidTopologyException {
+ Object ackerNum = totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+ if (ackerNum != null) {
+ return ObjectReader.getInt(ackerNum);
+ } else {
+ // if it's resource aware scheduler, estimates the number of acker executors.
+ if (ServerUtils.isRAS(totalConf)) {
+ return ServerUtils.getEstimatedWorkerCountForRASTopo(totalConf, topology);
+ } else {
+ return ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_WORKERS));
+ }
+ }
+ }
+
@Override
public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, String jsonConf, StormTopology topology,
SubmitOptions options)
@@ -2596,13 +2588,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
+ " but could not find a configured compatible version to use " + supervisorClasspaths.keySet());
}
Map<String, Object> otherConf = Utils.getConfigFromClasspath(cp, conf);
- Map<String, Object> totalConfToSave = merge(otherConf, topoConf);
- Map<String, Object> totalConf = merge(totalConfToSave, conf);
+ Map<String, Object> totalConfToSave = Utils.merge(otherConf, topoConf);
+ Map<String, Object> totalConf = Utils.merge(totalConfToSave, conf);
//When reading the conf in nimbus we want to fall back to our own settings
// if the other config does not have it set.
topology = normalizeTopology(totalConf, topology);
+
+ //set the number of acker executors;
+ totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, getNumOfAckerExecs(totalConf, topology));
+ LOG.debug("Config.TOPOLOGY_ACKER_EXECUTORS set to: {}", totalConfToSave.get(Config.TOPOLOGY_ACKER_EXECUTORS));
+
IStormClusterState state = stormClusterState;
-
+
if (creds != null) {
Map<String, Object> finalConf = Collections.unmodifiableMap(topoConf);
for (INimbusCredentialPlugin autocred: nimbusAutocredPlugins) {
@@ -2676,7 +2673,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
assertTopoActive(topoName, true);
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
final String operation = "killTopology";
checkAuthorization(topoName, topoConf, operation);
Integer waitAmount = null;
@@ -2700,7 +2697,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
activateCalls.mark();
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
final String operation = "activate";
checkAuthorization(topoName, topoConf, operation);
transitionName(topoName, TopologyActions.ACTIVATE, null, true);
@@ -2719,7 +2716,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
deactivateCalls.mark();
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
final String operation = "deactivate";
checkAuthorization(topoName, topoConf, operation);
transitionName(topoName, TopologyActions.INACTIVATE, null, true);
@@ -2740,7 +2737,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
assertTopoActive(topoName, true);
try {
Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
final String operation = "rebalance";
checkAuthorization(topoName, topoConf, operation);
Map<String, Integer> execOverrides = options.is_set_num_executors() ? options.get_num_executors() : Collections.emptyMap();
@@ -2765,7 +2762,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
setLogConfigCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "setLogConfig");
IStormClusterState state = stormClusterState;
@@ -2822,7 +2819,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getLogConfigCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getLogConfig");
IStormClusterState state = stormClusterState;
@@ -2848,7 +2845,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
IStormClusterState state = stormClusterState;
String topoId = toTopoId(topoName);
Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
// make sure samplingPct is within bounds.
double spct = Math.max(Math.min(samplingPercentage, 100.0), 0.0);
// while disabling we retain the sampling pct.
@@ -2889,7 +2886,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
setWorkerProfilerCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "setWorkerProfiler");
IStormClusterState state = stormClusterState;
@@ -2961,7 +2958,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
throw new NotAliveException(topoName + " is not alive");
}
Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
if (credentials == null) {
credentials = new Credentials(Collections.emptyMap());
}
@@ -3505,7 +3502,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
Map<List<Integer>, Map<String, Object>> beats = common.beats;
Map<Integer, String> taskToComp = common.taskToComponent;
StormTopology topology = common.topology;
- Map<String, Object> topoConf = merge(conf, common.topoConf);
+ Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
StormBase base = common.base;
if (base == null) {
throw new NotAliveException(topoId);
@@ -3691,7 +3688,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
StormTopology topology = info.topology;
Map<String, Object> topoConf = info.topoConf;
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
Assignment assignment = info.assignment;
Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
Map<String, String> nodeToHost;
@@ -3768,7 +3765,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getTopologyConfCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
- Map<String, Object> checkConf = merge(conf, topoConf);
+ Map<String, Object> checkConf = Utils.merge(conf, topoConf);
String topoName = (String) checkConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, checkConf, "getTopologyConf");
return JSONValue.toJSONString(topoConf);
@@ -3786,7 +3783,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getTopologyCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getTopology");
return StormCommon.systemTopology(topoConf, tryReadTopology(id, topoCache));
@@ -3804,7 +3801,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
getUserTopologyCalls.mark();
Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getUserTopology");
return tryReadTopology(id, topoCache);
@@ -3828,7 +3825,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
boolean isAdmin = adminUsers.contains(user);
for (String topoId: assignedIds) {
Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
- topoConf = merge(conf, topoConf);
+ topoConf = Utils.merge(conf, topoConf);
List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf);
if (user == null || isAdmin ||
http://git-wip-us.apache.org/repos/asf/storm/blob/bb86dcc8/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 9c5bb51..311acda 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -32,14 +32,11 @@ import org.apache.storm.blobstore.BlobStoreAclHandler;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.blobstore.LocalFsBlobStore;
-import org.apache.storm.generated.AccessControl;
-import org.apache.storm.generated.AccessControlType;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.generated.*;
import org.apache.storm.localizer.Localizer;
import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.thrift.TException;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
@@ -738,4 +735,60 @@ public class ServerUtils {
return isSuccess;
}
+ /**
+ * Check if the scheduler is resource aware or not.
+ * @param conf The configuration
+ * @return True if it's resource aware; false otherwise
+ */
+ public static boolean isRAS(Map<String, Object> conf) {
+ if (conf.containsKey(DaemonConfig.STORM_SCHEDULER)) {
+ if (conf.get(DaemonConfig.STORM_SCHEDULER).equals("org.apache.storm.scheduler.resource.ResourceAwareScheduler")) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static int getEstimatedWorkerCountForRASTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+ return (int) Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) /
+ ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)));
+ }
+
+ public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+ Map<String, Integer> componentParallelism = getComponentParallelism(topoConf, topology);
+ double totalMemoryRequired = 0.0;
+
+ for(Map.Entry<String, Map<String, Double>> entry: ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) {
+ int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1);
+ double memoryRequirement = entry.getValue().get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ totalMemoryRequired += memoryRequirement * parallelism;
+ }
+
+ for(Map.Entry<String, Map<String, Double>> entry: ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) {
+ int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1);
+ double memoryRequirement = entry.getValue().get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ totalMemoryRequired += memoryRequirement * parallelism;
+ }
+ return totalMemoryRequired;
+ }
+
+ public static Map<String, Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+ Map<String, Integer> ret = new HashMap<>();
+ Map<String, Object> components = StormCommon.allComponents(topology);
+ for (Map.Entry<String, Object> entry : components.entrySet()) {
+ ret.put(entry.getKey(), getComponentParallelism(topoConf, entry.getValue()));
+ }
+ return ret;
+ }
+
+ public static int getComponentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException {
+ Map<String, Object> combinedConf = Utils.merge(topoConf, StormCommon.componentConf(component));
+ int numTasks = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_TASKS), StormCommon.numStartExecutors(component));
+ Integer maxParallel = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM), null);
+ int ret = numTasks;
+ if (maxParallel != null) {
+ ret = Math.min(maxParallel, numTasks);
+ }
+ return ret;
+ }
}
[2/2] storm git commit: Merge branch 'STORM-2738' of
https://github.com/Ethanlm/storm into STORM-2738
Posted by bo...@apache.org.
Merge branch 'STORM-2738' of https://github.com/Ethanlm/storm into STORM-2738
STORM-2738: The number of ackers should default to the number of actual
running workers on RAS cluster
This closes #2325
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2da6b819
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2da6b819
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2da6b819
Branch: refs/heads/master
Commit: 2da6b81978407bf4765a194e1fd6c0502fa7c4ed
Parents: f7443a8 bb86dcc
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue Sep 19 15:20:20 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue Sep 19 15:20:20 2017 -0500
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/utils/Utils.java | 8 ++
.../org/apache/storm/daemon/nimbus/Nimbus.java | 105 +++++++++----------
.../org/apache/storm/utils/ServerUtils.java | 65 ++++++++++--
3 files changed, 118 insertions(+), 60 deletions(-)
----------------------------------------------------------------------