You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/19 20:42:39 UTC

[1/2] storm git commit: [STORM-2738] The number of ackers should default to the number of actual running workers on RAS cluster

Repository: storm
Updated Branches:
  refs/heads/master f7443a822 -> 2da6b8197


[STORM-2738] The number of ackers should default to the number of actual running workers on RAS cluster


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb86dcc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb86dcc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb86dcc8

Branch: refs/heads/master
Commit: bb86dcc8cb57351b34077a506336f6b5c90f7bbc
Parents: da2f035
Author: Ethan Li <et...@gmail.com>
Authored: Wed Sep 13 13:55:47 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Tue Sep 19 08:40:32 2017 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/utils/Utils.java   |   8 ++
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 105 +++++++++----------
 .../org/apache/storm/utils/ServerUtils.java     |  65 ++++++++++--
 3 files changed, 118 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bb86dcc8/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index a028935..b1f3ca7 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -1501,4 +1501,12 @@ public class Utils {
     public static boolean isLocalhostAddress(String address) {
         return LOCALHOST_ADDRESSES.contains(address);
     }
+
+    public static <K, V> Map<K, V> merge(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> other) {
+        Map<K, V> ret = new HashMap<>(first);
+        if (other != null) {
+            ret.putAll(other);
+        }
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bb86dcc8/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 00397a6..2c81f8e 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -18,6 +18,9 @@
 
 package org.apache.storm.daemon.nimbus;
 
+import com.codahale.metrics.Meter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -47,9 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.UnaryOperator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import javax.security.auth.Subject;
-
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.StormTimer;
@@ -179,11 +180,6 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.codahale.metrics.Meter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
 public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private final static Logger LOG = LoggerFactory.getLogger(Nimbus.class);
     
@@ -451,14 +447,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 });
     }
 
-    private static <K, V> Map<K, V> merge(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> other) {
-        Map<K, V> ret = new HashMap<>(first);
-        if (other != null) {
-            ret.putAll(other);
-        }
-        return ret;
-    }
-    
     private static <K, V> Map<K, V> mapDiff(Map<? extends K, ? extends V> first, Map<? extends K, ? extends V> second) {
         Map<K, V> ret = new HashMap<>();
         for (Entry<? extends K, ? extends V> entry: second.entrySet()) {
@@ -757,23 +745,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         }
         return ret;
     }
-    
-    private static int componentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException {
-        Map<String, Object> combinedConf = merge(topoConf, StormCommon.componentConf(component));
-        int numTasks = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_TASKS), StormCommon.numStartExecutors(component));
-        Integer maxParallel = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM), null);
-        int ret = numTasks;
-        if (maxParallel != null) {
-            ret = Math.min(maxParallel, numTasks);
-        }
-        return ret;
-    }
-    
+
     private static StormTopology normalizeTopology(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
         StormTopology ret = topology.deepCopy();
         for (Object comp: StormCommon.allComponents(ret).values()) {
             Map<String, Object> mergedConf = StormCommon.componentConf(comp);
-            mergedConf.put(Config.TOPOLOGY_TASKS, componentParallelism(topoConf, comp));
+            mergedConf.put(Config.TOPOLOGY_TASKS, ServerUtils.getComponentParallelism(topoConf, comp));
             String jsonConf = JSONValue.toJSONString(mergedConf);
             StormCommon.getComponentCommon(comp).set_json_conf(jsonConf);
         }
@@ -820,7 +797,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         addToSerializers(serializers, (List<Object>)topoConf.getOrDefault(Config.TOPOLOGY_KRYO_REGISTER, 
                 conf.get(Config.TOPOLOGY_KRYO_REGISTER)));
         
-        Map<String, Object> mergedConf = merge(conf, topoConf);
+        Map<String, Object> mergedConf = Utils.merge(conf, topoConf);
         Map<String, Object> ret = new HashMap<>(topoConf);
         ret.put(Config.TOPOLOGY_KRYO_REGISTER, serializers);
         ret.put(Config.TOPOLOGY_KRYO_DECORATORS, new ArrayList<>(decorators));
@@ -1006,7 +983,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     }
     
     public static Nimbus launch(INimbus inimbus) throws Exception {
-        Map<String, Object> conf = merge(Utils.readStormConfig(),
+        Map<String, Object> conf = Utils.merge(Utils.readStormConfig(),
                 ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
         return launchServer(conf, inimbus);
     }
@@ -1628,12 +1605,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         scheduler.schedule(topologies, cluster);
 
         //merge with existing statuses
-        idToSchedStatus.set(merge(idToSchedStatus.get(), cluster.getStatusMap()));
+        idToSchedStatus.set(Utils.merge(idToSchedStatus.get(), cluster.getStatusMap()));
         nodeIdToResources.set(cluster.getSupervisorsResourcesMap());
         
         // This is a hack for non-ras scheduler topology and worker resources
         Map<String, TopologyResources> resources = cluster.getTopologyResourcesMap();
-        idToResources.getAndAccumulate(resources, (orig, update) -> merge(orig, update));
+        idToResources.getAndAccumulate(resources, (orig, update) -> Utils.merge(orig, update));
         
         Map<String, Map<WorkerSlot, WorkerResources>> workerResources = new HashMap<>();
         for (Entry<String, Map<WorkerSlot, WorkerResources>> uglyWorkerResources: cluster.getWorkerResourcesMap().entrySet()) {
@@ -1644,7 +1621,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
             workerResources.put(uglyWorkerResources.getKey(), slotToResources);
         }
-        idToWorkerResources.getAndAccumulate(workerResources, (orig, update) -> merge(orig, update));
+        idToWorkerResources.getAndAccumulate(workerResources, (orig, update) -> Utils.merge(orig, update));
         
         return cluster.getAssignments();
     }
@@ -1971,7 +1948,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     
     private boolean isAuthorized(String operation, String topoId) throws NotAliveException, AuthorizationException, IOException {
         Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
-        topoConf = merge(conf, topoConf);
+        topoConf = Utils.merge(conf, topoConf);
         String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
         try {
             checkAuthorization(topoName, topoConf, operation);
@@ -2165,7 +2142,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
                 String id = entry.getKey();
                 String ownerPrincipal = entry.getValue().get_principal();
-                Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, topoCache)));
+                Map<String, Object> topoConf = Collections.unmodifiableMap(Utils.merge(conf, tryReadTopoConf(id, topoCache)));
                 synchronized(lock) {
                     Credentials origCreds = state.credentials(id, null);
                     if (origCreds != null) {
@@ -2521,7 +2498,22 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         }
         return largestMemoryOperator;
     }
-    
+
+    //get the number of acker executors. We need to estimate it if it's on RAS cluster.
+    private int getNumOfAckerExecs(Map<String, Object> totalConf, StormTopology topology) throws InvalidTopologyException {
+        Object ackerNum = totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+        if (ackerNum != null) {
+            return ObjectReader.getInt(ackerNum);
+        } else {
+            // if it's resource aware scheduler, estimates the number of acker executors.
+            if (ServerUtils.isRAS(totalConf)) {
+                return ServerUtils.getEstimatedWorkerCountForRASTopo(totalConf, topology);
+            } else {
+                return ObjectReader.getInt(totalConf.get(Config.TOPOLOGY_WORKERS));
+            }
+        }
+    }
+
     @Override
     public void submitTopologyWithOpts(String topoName, String uploadedJarLocation, String jsonConf, StormTopology topology,
             SubmitOptions options)
@@ -2596,13 +2588,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                         + " but could not find a configured compatible version to use " + supervisorClasspaths.keySet());
             }
             Map<String, Object> otherConf = Utils.getConfigFromClasspath(cp, conf);
-            Map<String, Object> totalConfToSave = merge(otherConf, topoConf);
-            Map<String, Object> totalConf = merge(totalConfToSave, conf);
+            Map<String, Object> totalConfToSave = Utils.merge(otherConf, topoConf);
+            Map<String, Object> totalConf = Utils.merge(totalConfToSave, conf);
             //When reading the conf in nimbus we want to fall back to our own settings
             // if the other config does not have it set.
             topology = normalizeTopology(totalConf, topology);
+
+            //set the number of acker executors;
+            totalConfToSave.put(Config.TOPOLOGY_ACKER_EXECUTORS, getNumOfAckerExecs(totalConf, topology));
+            LOG.debug("Config.TOPOLOGY_ACKER_EXECUTORS set to: {}", totalConfToSave.get(Config.TOPOLOGY_ACKER_EXECUTORS));
+
             IStormClusterState state = stormClusterState;
-            
+
             if (creds != null) {
                 Map<String, Object> finalConf = Collections.unmodifiableMap(topoConf);
                 for (INimbusCredentialPlugin autocred: nimbusAutocredPlugins) {
@@ -2676,7 +2673,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         assertTopoActive(topoName, true);
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             final String operation = "killTopology";
             checkAuthorization(topoName, topoConf, operation);
             Integer waitAmount = null;
@@ -2700,7 +2697,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         activateCalls.mark();
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             final String operation = "activate";
             checkAuthorization(topoName, topoConf, operation);
             transitionName(topoName, TopologyActions.ACTIVATE, null, true);
@@ -2719,7 +2716,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         deactivateCalls.mark();
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             final String operation = "deactivate";
             checkAuthorization(topoName, topoConf, operation);
             transitionName(topoName, TopologyActions.INACTIVATE, null, true);
@@ -2740,7 +2737,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         assertTopoActive(topoName, true);
         try {
             Map<String, Object> topoConf = tryReadTopoConfFromName(topoName);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             final String operation = "rebalance";
             checkAuthorization(topoName, topoConf, operation);
             Map<String, Integer> execOverrides = options.is_set_num_executors() ? options.get_num_executors() : Collections.emptyMap();
@@ -2765,7 +2762,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             setLogConfigCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "setLogConfig");
             IStormClusterState state = stormClusterState;
@@ -2822,7 +2819,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getLogConfigCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getLogConfig");
             IStormClusterState state = stormClusterState;
@@ -2848,7 +2845,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             IStormClusterState state = stormClusterState;
             String topoId = toTopoId(topoName);
             Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             // make sure samplingPct is within bounds.
             double spct = Math.max(Math.min(samplingPercentage, 100.0), 0.0);
             // while disabling we retain the sampling pct.
@@ -2889,7 +2886,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             setWorkerProfilerCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "setWorkerProfiler");
             IStormClusterState state = stormClusterState;
@@ -2961,7 +2958,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
                 throw new NotAliveException(topoName + " is not alive");
             }
             Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             if (credentials == null) {
                 credentials = new Credentials(Collections.emptyMap());
             }
@@ -3505,7 +3502,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             Map<List<Integer>, Map<String, Object>> beats = common.beats;
             Map<Integer, String> taskToComp = common.taskToComponent;
             StormTopology topology = common.topology;
-            Map<String, Object> topoConf = merge(conf, common.topoConf);
+            Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
             StormBase base = common.base;
             if (base == null) {
                 throw new NotAliveException(topoId);
@@ -3691,7 +3688,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             }
             StormTopology topology = info.topology;
             Map<String, Object> topoConf = info.topoConf;
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             Assignment assignment = info.assignment;
             Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
             Map<String, String> nodeToHost;
@@ -3768,7 +3765,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getTopologyConfCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
-            Map<String, Object> checkConf = merge(conf, topoConf);
+            Map<String, Object> checkConf = Utils.merge(conf, topoConf);
             String topoName = (String) checkConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, checkConf, "getTopologyConf");
             return JSONValue.toJSONString(topoConf);
@@ -3786,7 +3783,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getTopologyCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getTopology");
             return StormCommon.systemTopology(topoConf, tryReadTopology(id, topoCache));
@@ -3804,7 +3801,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
         try {
             getUserTopologyCalls.mark();
             Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
-            topoConf = merge(conf, topoConf);
+            topoConf = Utils.merge(conf, topoConf);
             String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
             checkAuthorization(topoName, topoConf, "getUserTopology");
             return tryReadTopology(id, topoCache);
@@ -3828,7 +3825,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             boolean isAdmin = adminUsers.contains(user);
             for (String topoId: assignedIds) {
                 Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
-                topoConf = merge(conf, topoConf);
+                topoConf = Utils.merge(conf, topoConf);
                 List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
                 List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf);
                 if (user == null || isAdmin ||

http://git-wip-us.apache.org/repos/asf/storm/blob/bb86dcc8/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 9c5bb51..311acda 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -32,14 +32,11 @@ import org.apache.storm.blobstore.BlobStoreAclHandler;
 import org.apache.storm.blobstore.ClientBlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.blobstore.LocalFsBlobStore;
-import org.apache.storm.generated.AccessControl;
-import org.apache.storm.generated.AccessControlType;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
-import org.apache.storm.generated.ReadableBlobMeta;
-import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.generated.*;
 import org.apache.storm.localizer.Localizer;
 import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.scheduler.resource.ResourceUtils;
 import org.apache.thrift.TException;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
@@ -738,4 +735,60 @@ public class ServerUtils {
         return isSuccess;
     }
 
+    /**
+     * Check if the scheduler is resource aware or not.
+     * @param conf The configuration
+     * @return True if it's resource aware; false otherwise
+     */
+    public static boolean isRAS(Map<String, Object> conf) {
+        if (conf.containsKey(DaemonConfig.STORM_SCHEDULER)) {
+            if (conf.get(DaemonConfig.STORM_SCHEDULER).equals("org.apache.storm.scheduler.resource.ResourceAwareScheduler")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static int getEstimatedWorkerCountForRASTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+        return (int) Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) /
+                ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)));
+    }
+
+    public static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+        Map<String, Integer> componentParallelism = getComponentParallelism(topoConf, topology);
+        double totalMemoryRequired = 0.0;
+
+        for(Map.Entry<String, Map<String, Double>> entry: ResourceUtils.getBoltsResources(topology, topoConf).entrySet()) {
+            int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1);
+            double memoryRequirement = entry.getValue().get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            totalMemoryRequired += memoryRequirement * parallelism;
+        }
+
+        for(Map.Entry<String, Map<String, Double>> entry: ResourceUtils.getSpoutsResources(topology, topoConf).entrySet()) {
+            int parallelism = componentParallelism.getOrDefault(entry.getKey(), 1);
+            double memoryRequirement = entry.getValue().get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            totalMemoryRequired += memoryRequirement * parallelism;
+        }
+        return totalMemoryRequired;
+    }
+
+    public static Map<String, Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException {
+        Map<String, Integer> ret = new HashMap<>();
+        Map<String, Object> components = StormCommon.allComponents(topology);
+        for (Map.Entry<String, Object> entry : components.entrySet()) {
+            ret.put(entry.getKey(), getComponentParallelism(topoConf, entry.getValue()));
+        }
+        return ret;
+    }
+
+    public static int getComponentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException {
+        Map<String, Object> combinedConf = Utils.merge(topoConf, StormCommon.componentConf(component));
+        int numTasks = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_TASKS), StormCommon.numStartExecutors(component));
+        Integer maxParallel = ObjectReader.getInt(combinedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM), null);
+        int ret = numTasks;
+        if (maxParallel != null) {
+            ret = Math.min(maxParallel, numTasks);
+        }
+        return ret;
+    }
 }


[2/2] storm git commit: Merge branch 'STORM-2738' of https://github.com/Ethanlm/storm into STORM-2738

Posted by bo...@apache.org.
Merge branch 'STORM-2738' of https://github.com/Ethanlm/storm into STORM-2738

STORM-2738: The number of ackers should default to the number of actual
running workers on RAS cluster

This closes  #2325


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2da6b819
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2da6b819
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2da6b819

Branch: refs/heads/master
Commit: 2da6b81978407bf4765a194e1fd6c0502fa7c4ed
Parents: f7443a8 bb86dcc
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue Sep 19 15:20:20 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue Sep 19 15:20:20 2017 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/utils/Utils.java   |   8 ++
 .../org/apache/storm/daemon/nimbus/Nimbus.java  | 105 +++++++++----------
 .../org/apache/storm/utils/ServerUtils.java     |  65 ++++++++++--
 3 files changed, 118 insertions(+), 60 deletions(-)
----------------------------------------------------------------------