You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/21 02:44:52 UTC

incubator-eagle git commit: [EAGLE-656] Integrate metric system with application framework

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 49db82c5e -> 4472fa1f5


[EAGLE-656] Integrate metric system with application framework

Integrate metric system with application framework

https://issues.apache.org/jira/browse/EAGLE-656

Author: Hao Chen <ha...@apache.org>

Closes #544 from haoch/IntegrateMetricSystem.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4472fa1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4472fa1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4472fa1f

Branch: refs/heads/master
Commit: 4472fa1f5a5a664acc6b9f486d6fe5c46a3b01ac
Parents: 49db82c
Author: Hao Chen <ha...@apache.org>
Authored: Fri Oct 21 10:44:40 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Fri Oct 21 10:44:40 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/metric/MetricConfigs.java       | 25 +++++++++++
 .../apache/eagle/alert/metric/MetricSystem.java | 34 ++++++++------
 .../alert/metric/sink/ElasticSearchSink.java    | 40 ++++++++++++-----
 .../eagle/alert/metric/sink/KafkaSink.java      |  5 ++-
 .../src/test/resources/application.conf         |  5 ++-
 .../engine/runner/StormMetricConsumer.java      |  6 +--
 .../runner/StormMetricTaggedConsumer.java       | 47 +++++++++++++-------
 .../environment/impl/StormExecutionRuntime.java |  7 +++
 .../eagle/app/service/ApplicationAction.java    | 34 ++++++++------
 .../app/test/ApplicationSimulatorImpl.java      | 11 ++++-
 .../eagle/app/TestApplicationSimulatorImpl.java | 41 +++++++++++++++++
 11 files changed, 196 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
new file mode 100644
index 0000000..81aa75c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.metric;
+
+public class MetricConfigs {
+    public static final String METRIC_PREFIX_CONF = "metric.prefix";
+    public static final String METRIC_SINK_CONF = "metric.sink";
+    public static final String DURATION_SECONDS_CONF = "metric.durationSeconds";
+    public static final String TAGS_FIELD_NAME = "tags";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
index 555c4ec..255f1e6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
@@ -16,12 +16,12 @@
  */
 package org.apache.eagle.alert.metric;
 
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
-import org.apache.eagle.alert.metric.source.MetricSource;
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.metric.sink.MetricSink;
+import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
+import org.apache.eagle.alert.metric.source.MetricSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,15 +32,19 @@ import java.util.concurrent.TimeUnit;
 public class MetricSystem implements IMetricSystem {
     private final Config config;
     private Map<MetricSink, Config> sinks = new HashMap<>();
-    //    private Map<String,MetricSource> sources = new HashMap<>();
     private MetricRegistry registry = new MetricRegistry();
-    private boolean running;
-    private boolean initialized;
+    private volatile boolean running;
+    private volatile boolean initialized;
     private static final Logger LOG = LoggerFactory.getLogger(MetricSystem.class);
     private final Map<String, Object> metricTags = new HashMap<>();
+    private int scheduleDurationSeconds = 10;
 
     public MetricSystem(Config config) {
         this.config = config;
+        if (this.config.hasPath(MetricConfigs.DURATION_SECONDS_CONF)) {
+            this.scheduleDurationSeconds = this.config.getInt(MetricConfigs.DURATION_SECONDS_CONF);
+            LOG.info("Override {}: {}",MetricConfigs.DURATION_SECONDS_CONF, this.scheduleDurationSeconds);
+        }
     }
 
     public static MetricSystem load(Config config) {
@@ -49,6 +53,9 @@ public class MetricSystem implements IMetricSystem {
         return instance;
     }
 
+    /**
+     * Add additional tags.
+     */
     @Override
     public void tags(Map<String, Object> metricTags) {
         this.metricTags.putAll(metricTags);
@@ -57,9 +64,11 @@ public class MetricSystem implements IMetricSystem {
     @Override
     public void start() {
         if (initialized) {
-            throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already intialized");
+            throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already initialized");
         }
-        sinks.forEach((sink, conf) -> sink.prepare(conf.withValue("tags", ConfigFactory.parseMap(metricTags).root()), registry));
+        sinks.forEach((sink, conf) -> {
+            sink.prepare(conf.withValue(MetricConfigs.TAGS_FIELD_NAME, ConfigFactory.parseMap(metricTags).root()), registry);
+        });
         initialized = true;
     }
 
@@ -68,8 +77,7 @@ public class MetricSystem implements IMetricSystem {
         if (running) {
             throw new IllegalStateException("Attempting to start a MetricsSystem that is already running");
         }
-
-        sinks.keySet().forEach((sink) -> sink.start(5, TimeUnit.SECONDS));
+        sinks.keySet().forEach((sink) -> sink.start(this.scheduleDurationSeconds, TimeUnit.SECONDS));
         running = true;
     }
 
@@ -78,12 +86,12 @@ public class MetricSystem implements IMetricSystem {
     }
 
     private void loadSinksFromConfig() {
-        Config sinkCls = config.hasPath("metric.sink") ? config.getConfig("metric.sink") : null;
+        Config sinkCls = config.hasPath(MetricConfigs.METRIC_SINK_CONF) ? config.getConfig(MetricConfigs.METRIC_SINK_CONF) : null;
         if (sinkCls == null) {
             // do nothing
         } else {
             for (String sinkType : sinkCls.root().unwrapped().keySet()) {
-                register(MetricSinkRepository.createSink(sinkType), config.getConfig("metric.sink." + sinkType));
+                register(MetricSinkRepository.createSink(sinkType), config.getConfig(MetricConfigs.METRIC_SINK_CONF + "." + sinkType));
             }
         }
     }
@@ -100,8 +108,8 @@ public class MetricSystem implements IMetricSystem {
 
     @Override
     public void register(MetricSink sink, Config config) {
-        LOG.debug("Register {}", sink);
         sinks.put(sink, config);
+        LOG.info("Registered {}", sink);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
index 4de98cf..3015be2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
@@ -18,6 +18,7 @@ package org.apache.eagle.alert.metric.sink;
 
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
+import org.apache.eagle.alert.metric.MetricConfigs;
 import org.elasticsearch.metrics.ElasticsearchReporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,41 +28,60 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 public class ElasticSearchSink implements MetricSink {
+
     private ElasticsearchReporter reporter = null;
     private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchSink.class);
 
+    private static final String INDEX_DATEFORMAT_CONF = "indexDateFormat";
+    private static final String TIMESTAMP_FIELD_CONF = "timestampField";
+    private static final String HOSTS_CONF = "hosts";
+    private static final String INDEX_CONF = "index";
+
+    private static final String DEFAULT_INDEX_DATE_FORMAT = "yyyy-MM-dd";
+    private static final String DEFAULT_TIMESTAMP_FIELD = "@timestamp";
+
     @Override
     public void prepare(Config config, MetricRegistry registry) {
-        LOG.debug("Preparing elasticsearch-sink");
+        LOG.info("Preparing elasticsearch-sink");
         try {
             ElasticsearchReporter.Builder builder = ElasticsearchReporter.forRegistry(registry);
-            if (config.hasPath("hosts")) {
-                List<String> hosts = config.getStringList("hosts");
+
+            if (config.hasPath(HOSTS_CONF)) {
+                List<String> hosts = config.getStringList(HOSTS_CONF);
                 builder.hosts(hosts.toArray(new String[hosts.size()]));
             }
-            if (config.hasPath("index")) {
-                builder.index(config.getString("index"));
+
+            if (config.hasPath(INDEX_CONF)) {
+                builder.index(config.getString(INDEX_CONF));
             }
-            builder.indexDateFormat("yyyy-MM-dd");
-            builder.timestampFieldname(config.hasPath("timestampField") ? config.getString("timestampField") : "@timestamp");
 
-            if (config.hasPath("tags")) {
-                builder.additionalFields(config.getConfig("tags").root().unwrapped());
+            builder.indexDateFormat(config.hasPath(INDEX_DATEFORMAT_CONF)
+                ? config.getString(INDEX_DATEFORMAT_CONF) : DEFAULT_INDEX_DATE_FORMAT);
+
+            builder.timestampFieldname(config.hasPath(TIMESTAMP_FIELD_CONF)
+                ? config.getString(TIMESTAMP_FIELD_CONF) : DEFAULT_TIMESTAMP_FIELD);
+
+            if (config.hasPath(MetricConfigs.TAGS_FIELD_NAME)) {
+                builder.additionalFields(config.getConfig(MetricConfigs.TAGS_FIELD_NAME).root().unwrapped());
             }
 
             reporter = builder.build();
         } catch (IOException e) {
             LOG.error(e.getMessage(), e);
+            throw new IllegalStateException(e.getMessage(), e);
         }
+        LOG.info("Initialized elasticsearch-sink");
     }
 
     @Override
     public void start(long period, TimeUnit unit) {
-        reporter.start(period, TimeUnit.SECONDS);
+        LOG.info("Starting elasticsearch-sink");
+        reporter.start(period, unit);
     }
 
     @Override
     public void stop() {
+        LOG.info("Stopping elasticsearch-sink");
         reporter.stop();
         reporter.close();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
index 88b22c5..f8bc8f3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.eagle.alert.metric.sink;
 
+import org.apache.eagle.alert.metric.MetricConfigs;
 import org.apache.eagle.alert.metric.reporter.KafkaReporter;
 import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
@@ -35,8 +36,8 @@ public class KafkaSink implements MetricSink {
             .topic(config.getString("topic"))
             .config(config);
 
-        if (config.hasPath("tags")) {
-            builder.addFields(config.getConfig("tags").root().unwrapped());
+        if (config.hasPath(MetricConfigs.TAGS_FIELD_NAME)) {
+            builder.addFields(config.getConfig(MetricConfigs.TAGS_FIELD_NAME).root().unwrapped());
         }
 
         reporter = builder.build();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
index b54a5ac..71735d4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
@@ -16,6 +16,9 @@
 {
   metric {
     sink {
+      stdout {
+        // console metric sink
+      }
       kafka {
         "topic": "alert_metric_test"
         "bootstrap.servers": "localhost:9092"
@@ -24,7 +27,7 @@
         level = "INFO"
       }
       elasticsearch {
-        hosts = ["10.64.223.222:9200"]
+        hosts = ["localhost:9200"]
         index = "alert_metric_test"
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
index caa59b3..771a667 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
@@ -44,13 +44,13 @@ public class StormMetricConsumer implements IMetricsConsumer {
     @Override
     public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
         Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
-        topologyName = config.getString("topology.name");
+        topologyName = config.getString("appId");
         topologyId = context.getStormId();
         metricSystem = MetricSystem.load(config);
         metricSystem.tags(new HashMap<String, Object>() {
             {
-                put("topologyName", topologyName);
-                put("topologyId", topologyId);
+                put("appId", topologyName);
+                put("stormId", topologyId);
             }
         });
         metricSystem.start();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
index e060d1f..3c13ff7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -26,6 +26,7 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.metric.MetricConfigs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,18 +37,29 @@ import java.util.*;
  */
 public class StormMetricTaggedConsumer implements IMetricsConsumer {
     public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class);
-    private String topologyName;
-    private Map<String, MetricSystem> metricSystems;
-    private String stormId;
+    private final Map<String, MetricSystem> metricSystems = new HashMap<>();
     private Config config;
+    private String metricNamePrefix;
+    private Map<String, Object> baseTags = new HashMap<>();
 
     @SuppressWarnings("rawtypes")
     @Override
     public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
         this.config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
-        topologyName = config.getString("topology.name");
-        stormId = context.getStormId();
-        metricSystems = new HashMap<>();
+
+        if (config.hasPath("appId")) {
+            baseTags.put("appId", config.getString("appId"));
+        }
+
+        if (config.hasPath("siteId")) {
+            baseTags.put("siteId", config.getString("siteId"));
+        }
+
+        baseTags.put("appExecId", context.getStormId());
+
+        if (config.hasPath(MetricConfigs.METRIC_PREFIX_CONF)) {
+            metricNamePrefix = config.getString(MetricConfigs.METRIC_PREFIX_CONF);
+        }
     }
 
     @SuppressWarnings("serial")
@@ -59,12 +71,11 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
             if (metricSystem == null) {
                 metricSystem = MetricSystem.load(config);
                 metricSystems.put(uniqueTaskKey, metricSystem);
+                metricSystem.tags(baseTags);
                 metricSystem.tags(new HashMap<String, Object>() {
                     {
-                        put("topology", topologyName);
-                        put("stormId", stormId);
-                        put("component", taskInfo.srcComponentId);
-                        put("task", taskInfo.srcTaskId);
+                        put("componentId", taskInfo.srcComponentId);
+                        put("taskId", taskInfo.srcTaskId);
                     }
                 });
                 metricSystem.start();
@@ -84,7 +95,7 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
             if (dataPoint.value instanceof Map) {
                 Map<String, Object> values = (Map<String, Object>) dataPoint.value;
                 for (Map.Entry<String, Object> entry : values.entrySet()) {
-                    String metricName = buildSimpleMetricName(taskInfo, dataPoint.name, entry.getKey());
+                    String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name, entry.getKey());
                     metricList.add(metricName);
                     Gauge gauge = metricSystem.registry().getGauges().get(metricName);
                     if (gauge == null) {
@@ -96,7 +107,7 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
                     }
                 }
             } else {
-                String metricName = buildSimpleMetricName(taskInfo, dataPoint.name);
+                String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name);
                 metricList.add(metricName);
                 Gauge gauge = metricSystem.registry().getGauges().get(metricName);
                 if (gauge == null) {
@@ -142,12 +153,18 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
         return String.format("%s[%s]", taskInfo.srcComponentId, taskInfo.srcTaskId);
     }
 
-    private static String buildSimpleMetricName(TaskInfo taskInfo, String... name) {
-        return String.join(".", StringUtils.join(name, ".").replace("/", "."));
+    private static String buildSimpleMetricName(String prefix, TaskInfo taskInfo, String... name) {
+        String metricName = String.join(".", StringUtils.join(name, ".").replace("/", ".")).replace("__", "");
+        if (prefix == null) {
+            return metricName;
+        } else {
+            return String.format("%s%s", prefix, metricName);
+        }
     }
 
     @Override
     public void cleanup() {
         metricSystems.values().forEach(IMetricSystem::stop);
+        metricSystems.clear();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index fb4aff9..ccfcd93 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -21,6 +21,8 @@ import backtype.storm.LocalCluster;
 import backtype.storm.generated.*;
 import backtype.storm.utils.NimbusClient;
 import com.google.common.base.Preconditions;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.alert.engine.runner.StormMetricTaggedConsumer;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.environment.ExecutionRuntime;
 import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
@@ -59,6 +61,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
     }
 
     public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
+    public static final String METRIC_CONFIG_PREFIX = "metric";
 
     private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
     private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
@@ -99,6 +102,10 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         if (config.hasPath(TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
             conf.put(TOPOLOGY_MESSAGE_TIMEOUT_SECS, config.getInt(TOPOLOGY_MESSAGE_TIMEOUT_SECS));
         }
+
+        if (config.hasPath(METRIC_CONFIG_PREFIX)) {
+            conf.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
+        }
         return conf;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index aea52c8..0b389b6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -25,6 +25,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.scheme.JsonScheme;
 import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
 import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metric.MetricConfigs;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.environment.ExecutionRuntime;
 import org.apache.eagle.app.environment.ExecutionRuntimeManager;
@@ -32,6 +33,8 @@ import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
 import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.StreamDesc;
 import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.Collections;
@@ -49,28 +52,33 @@ import java.util.stream.Collectors;
  * </ul>
  */
 public class ApplicationAction implements Serializable {
-    private final Config config;
+    private final Config effectiveConfig;
     private final Application application;
     private final ExecutionRuntime runtime;
     private final ApplicationEntity metadata;
     private final IMetadataDao alertMetadataService;
+    private static final String APP_METRIC_PREFIX = "eagle.";
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationAction.class);
 
     /**
      * @param metadata    ApplicationEntity.
      * @param application Application.
      */
-    public ApplicationAction(Application application, ApplicationEntity metadata, Config envConfig, IMetadataDao alertMetadataService) {
+    public ApplicationAction(Application application, ApplicationEntity metadata, Config serverConfig, IMetadataDao alertMetadataService) {
         Preconditions.checkNotNull(application, "Application is null");
         Preconditions.checkNotNull(metadata, "ApplicationEntity is null");
         this.application = application;
         this.metadata = metadata;
-        this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(), envConfig);
+        this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(), serverConfig);
         Map<String, Object> executionConfig = metadata.getConfiguration();
         if (executionConfig == null) {
             executionConfig = Collections.emptyMap();
         }
-
-        this.config = ConfigFactory.parseMap(executionConfig).withFallback(envConfig).withFallback(ConfigFactory.parseMap(metadata.getContext()));
+        if (serverConfig.hasPath(MetricConfigs.METRIC_PREFIX_CONF)) {
+            LOG.warn("Ignored sever config {} = {}", MetricConfigs.METRIC_PREFIX_CONF, serverConfig.getString(MetricConfigs.METRIC_PREFIX_CONF));
+        }
+        executionConfig.put(MetricConfigs.METRIC_PREFIX_CONF, APP_METRIC_PREFIX);
+        this.effectiveConfig = ConfigFactory.parseMap(executionConfig).withFallback(serverConfig).withFallback(ConfigFactory.parseMap(metadata.getContext()));
         this.alertMetadataService = alertMetadataService;
     }
 
@@ -78,17 +86,17 @@ public class ApplicationAction implements Serializable {
      * Generate global unique streamId to install.
      * TODO refactor with streamId and siteId
      */
-    private static String generateUniqueStreamId(String siteId,String streamTypeId) {
-        return String.format("%s_%s",streamTypeId,siteId).toUpperCase();
-    }   
+    private static String generateUniqueStreamId(String siteId, String streamTypeId) {
+        return String.format("%s_%s", streamTypeId, siteId).toUpperCase();
+    }
 
     public void doInstall() {
         if (metadata.getDescriptor().getStreams() != null) {
             List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
                 StreamDefinition copied = streamDefinition.copy();
                 copied.setSiteId(metadata.getSite().getSiteId());
-                copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(),copied.getStreamId()));
-                StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), this.config);
+                copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
+                StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), this.effectiveConfig);
                 StreamDesc streamDesc = new StreamDesc();
                 streamDesc.setSchema(copied);
                 streamDesc.setSink(streamSinkConfig);
@@ -138,16 +146,16 @@ public class ApplicationAction implements Serializable {
     }
 
     public void doStart() {
-        this.runtime.start(this.application, this.config);
+        this.runtime.start(this.application, this.effectiveConfig);
     }
 
     @SuppressWarnings("unchecked")
     public void doStop() {
-        this.runtime.stop(this.application, this.config);
+        this.runtime.stop(this.application, this.effectiveConfig);
     }
 
     public ApplicationEntity.Status getStatus() {
-        return this.runtime.status(this.application, this.config);
+        return this.runtime.status(this.application, this.effectiveConfig);
     }
 
     public ApplicationEntity getMetadata() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
index bc66ab4..51ee2cf 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -24,6 +24,7 @@ import org.apache.eagle.app.spi.ApplicationProvider;
 import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.SiteEntity;
 import org.apache.eagle.metadata.resource.SiteResource;
+import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
 import org.junit.Assert;
 
 import java.util.HashMap;
@@ -36,6 +37,9 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator {
     private final ApplicationResource applicationResource;
 
     @Inject
+    ApplicationStatusUpdateService statusUpdateService;
+
+    @Inject
     public ApplicationSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource) {
         this.config = config;
         this.siteResource = siteResource;
@@ -66,10 +70,13 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator {
         ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(), appType, ApplicationEntity.Mode.LOCAL);
         installOperation.setConfiguration(appConfig);
         // Install application
-        ApplicationEntity applicationEntity =
-            applicationResource.installApplication(installOperation).getData();
+        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
+        statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+        applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
+        statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java
new file mode 100644
index 0000000..f2a4bce
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.ws.rs.WebApplicationException;
+
+@Ignore
+public class TestApplicationSimulatorImpl extends ApplicationTestBase {
+    @Inject
+    ApplicationSimulator simulator;
+
+    @Test
+    public void testSimulatorValid() {
+        simulator.start(TestStormApplication.Provider.class);
+    }
+
+    @Test(expected = WebApplicationException.class)
+    public void testSimulatorWithStaticApp() {
+        simulator.start(TestWebApplication.Provider.class);
+    }
+}