You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ja...@apache.org on 2017/05/07 18:13:04 UTC

[15/17] eagle git commit: [EAGLE-1015] add an interface to add storm configuration in an application

[EAGLE-1015] add an interface to add storm configuration in an application

https://issues.apache.org/jira/browse/EAGLE-1015

Support to add storm config value of type number or string in an application. However, to make storm overrides these custom values,  one rule is the configuration must start with 'application.storm.'. For example:

`application.storm.workers` to override `workers`
`application.storm.nimbus.host` to override `nimbus.host`

Author: Zhao, Qingwen <qi...@apache.org>

Closes #928 from qingwen220/EAGLE-1015.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/47f00f15
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/47f00f15
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/47f00f15

Branch: refs/heads/branch-0.5
Commit: 47f00f159231958fb39748b4a6a01c4520371dec
Parents: eaad6cf
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri May 5 12:54:21 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri May 5 12:54:21 2017 +0800

----------------------------------------------------------------------
 .../environment/impl/StormExecutionRuntime.java | 31 +++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/47f00f15/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 2b4180d..8045e43 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -22,6 +22,8 @@ import backtype.storm.generated.*;
 import backtype.storm.utils.NimbusClient;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.ConfigRenderOptions;
+import com.typesafe.config.ConfigValue;
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.eagle.alert.engine.runner.StormMetricTaggedConsumer;
 import org.apache.eagle.alert.metric.MetricConfigs;
 import org.apache.eagle.app.Application;
@@ -36,6 +38,7 @@ import scala.Int;
 import storm.trident.spout.RichSpoutBatchExecutor;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
@@ -67,14 +70,16 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         return this.environment;
     }
 
-    public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
+    private static final String WORKERS = "workers";
+    private static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
 
     private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
-    private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
     private static final String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
 
-    private static final String WORKERS = "workers";
+    private static final String APP_STORM_CONF_PATH_DEFAULT = "application.storm";
+
+    private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+    private static final Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
 
     private backtype.storm.Config getStormConfig(com.typesafe.config.Config config) {
         backtype.storm.Config conf = new backtype.storm.Config();
@@ -85,16 +90,17 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
         conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
         String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
+
         if (environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
             nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH, nimbusHost);
         } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
+            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH, STORM_NIMBUS_HOST_DEFAULT);
         }
         Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
         if (environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
             nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH, nimbusThriftPort);
         } else {
             LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
         }
@@ -112,6 +118,17 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         if (config.hasPath(MetricConfigs.METRIC_SINK_CONF)) {
             conf.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
         }
+
+        if (config.hasPath(APP_STORM_CONF_PATH_DEFAULT)) {
+            com.typesafe.config.Config appStormConf = config.getConfig(APP_STORM_CONF_PATH_DEFAULT);
+            for (Map.Entry<String, ConfigValue> entry: appStormConf.entrySet()) {
+                if (NumberUtils.isNumber(entry.getValue().unwrapped().toString())) {
+                    conf.put(entry.getKey(), appStormConf.getNumber(entry.getKey()));
+                } else {
+                    conf.put(entry.getKey(), entry.getValue().unwrapped());
+                }
+            }
+        }
         return conf;
     }