You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/10/21 02:44:52 UTC
incubator-eagle git commit: [EAGLE-656] Integrate metric system with
application framework
Repository: incubator-eagle
Updated Branches:
refs/heads/master 49db82c5e -> 4472fa1f5
[EAGLE-656] Integrate metric system with application framework
Integrate metric system with application framework
https://issues.apache.org/jira/browse/EAGLE-656
Author: Hao Chen <ha...@apache.org>
Closes #544 from haoch/IntegrateMetricSystem.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4472fa1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4472fa1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4472fa1f
Branch: refs/heads/master
Commit: 4472fa1f5a5a664acc6b9f486d6fe5c46a3b01ac
Parents: 49db82c
Author: Hao Chen <ha...@apache.org>
Authored: Fri Oct 21 10:44:40 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Fri Oct 21 10:44:40 2016 +0800
----------------------------------------------------------------------
.../eagle/alert/metric/MetricConfigs.java | 25 +++++++++++
.../apache/eagle/alert/metric/MetricSystem.java | 34 ++++++++------
.../alert/metric/sink/ElasticSearchSink.java | 40 ++++++++++++-----
.../eagle/alert/metric/sink/KafkaSink.java | 5 ++-
.../src/test/resources/application.conf | 5 ++-
.../engine/runner/StormMetricConsumer.java | 6 +--
.../runner/StormMetricTaggedConsumer.java | 47 +++++++++++++-------
.../environment/impl/StormExecutionRuntime.java | 7 +++
.../eagle/app/service/ApplicationAction.java | 34 ++++++++------
.../app/test/ApplicationSimulatorImpl.java | 11 ++++-
.../eagle/app/TestApplicationSimulatorImpl.java | 41 +++++++++++++++++
11 files changed, 196 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
new file mode 100644
index 0000000..81aa75c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricConfigs.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.metric;
+
+public class MetricConfigs {
+ public static final String METRIC_PREFIX_CONF = "metric.prefix";
+ public static final String METRIC_SINK_CONF = "metric.sink";
+ public static final String DURATION_SECONDS_CONF = "metric.durationSeconds";
+ public static final String TAGS_FIELD_NAME = "tags";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
index 555c4ec..255f1e6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
@@ -16,12 +16,12 @@
*/
package org.apache.eagle.alert.metric;
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
-import org.apache.eagle.alert.metric.source.MetricSource;
import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.metric.sink.MetricSink;
+import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
+import org.apache.eagle.alert.metric.source.MetricSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,15 +32,19 @@ import java.util.concurrent.TimeUnit;
public class MetricSystem implements IMetricSystem {
private final Config config;
private Map<MetricSink, Config> sinks = new HashMap<>();
- // private Map<String,MetricSource> sources = new HashMap<>();
private MetricRegistry registry = new MetricRegistry();
- private boolean running;
- private boolean initialized;
+ private volatile boolean running;
+ private volatile boolean initialized;
private static final Logger LOG = LoggerFactory.getLogger(MetricSystem.class);
private final Map<String, Object> metricTags = new HashMap<>();
+ private int scheduleDurationSeconds = 10;
public MetricSystem(Config config) {
this.config = config;
+ if (this.config.hasPath(MetricConfigs.DURATION_SECONDS_CONF)) {
+ this.scheduleDurationSeconds = this.config.getInt(MetricConfigs.DURATION_SECONDS_CONF);
+ LOG.info("Override {}: {}",MetricConfigs.DURATION_SECONDS_CONF, this.scheduleDurationSeconds);
+ }
}
public static MetricSystem load(Config config) {
@@ -49,6 +53,9 @@ public class MetricSystem implements IMetricSystem {
return instance;
}
+ /**
+ * Add additional tags.
+ */
@Override
public void tags(Map<String, Object> metricTags) {
this.metricTags.putAll(metricTags);
@@ -57,9 +64,11 @@ public class MetricSystem implements IMetricSystem {
@Override
public void start() {
if (initialized) {
- throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already intialized");
+ throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already initialized");
}
- sinks.forEach((sink, conf) -> sink.prepare(conf.withValue("tags", ConfigFactory.parseMap(metricTags).root()), registry));
+ sinks.forEach((sink, conf) -> {
+ sink.prepare(conf.withValue(MetricConfigs.TAGS_FIELD_NAME, ConfigFactory.parseMap(metricTags).root()), registry);
+ });
initialized = true;
}
@@ -68,8 +77,7 @@ public class MetricSystem implements IMetricSystem {
if (running) {
throw new IllegalStateException("Attempting to start a MetricsSystem that is already running");
}
-
- sinks.keySet().forEach((sink) -> sink.start(5, TimeUnit.SECONDS));
+ sinks.keySet().forEach((sink) -> sink.start(this.scheduleDurationSeconds, TimeUnit.SECONDS));
running = true;
}
@@ -78,12 +86,12 @@ public class MetricSystem implements IMetricSystem {
}
private void loadSinksFromConfig() {
- Config sinkCls = config.hasPath("metric.sink") ? config.getConfig("metric.sink") : null;
+ Config sinkCls = config.hasPath(MetricConfigs.METRIC_SINK_CONF) ? config.getConfig(MetricConfigs.METRIC_SINK_CONF) : null;
if (sinkCls == null) {
// do nothing
} else {
for (String sinkType : sinkCls.root().unwrapped().keySet()) {
- register(MetricSinkRepository.createSink(sinkType), config.getConfig("metric.sink." + sinkType));
+ register(MetricSinkRepository.createSink(sinkType), config.getConfig(MetricConfigs.METRIC_SINK_CONF + "." + sinkType));
}
}
}
@@ -100,8 +108,8 @@ public class MetricSystem implements IMetricSystem {
@Override
public void register(MetricSink sink, Config config) {
- LOG.debug("Register {}", sink);
sinks.put(sink, config);
+ LOG.info("Registered {}", sink);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
index 4de98cf..3015be2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ElasticSearchSink.java
@@ -18,6 +18,7 @@ package org.apache.eagle.alert.metric.sink;
import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
+import org.apache.eagle.alert.metric.MetricConfigs;
import org.elasticsearch.metrics.ElasticsearchReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,41 +28,60 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
public class ElasticSearchSink implements MetricSink {
+
private ElasticsearchReporter reporter = null;
private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchSink.class);
+ private static final String INDEX_DATEFORMAT_CONF = "indexDateFormat";
+ private static final String TIMESTAMP_FIELD_CONF = "timestampField";
+ private static final String HOSTS_CONF = "hosts";
+ private static final String INDEX_CONF = "index";
+
+ private static final String DEFAULT_INDEX_DATE_FORMAT = "yyyy-MM-dd";
+ private static final String DEFAULT_TIMESTAMP_FIELD = "@timestamp";
+
@Override
public void prepare(Config config, MetricRegistry registry) {
- LOG.debug("Preparing elasticsearch-sink");
+ LOG.info("Preparing elasticsearch-sink");
try {
ElasticsearchReporter.Builder builder = ElasticsearchReporter.forRegistry(registry);
- if (config.hasPath("hosts")) {
- List<String> hosts = config.getStringList("hosts");
+
+ if (config.hasPath(HOSTS_CONF)) {
+ List<String> hosts = config.getStringList(HOSTS_CONF);
builder.hosts(hosts.toArray(new String[hosts.size()]));
}
- if (config.hasPath("index")) {
- builder.index(config.getString("index"));
+
+ if (config.hasPath(INDEX_CONF)) {
+ builder.index(config.getString(INDEX_CONF));
}
- builder.indexDateFormat("yyyy-MM-dd");
- builder.timestampFieldname(config.hasPath("timestampField") ? config.getString("timestampField") : "@timestamp");
- if (config.hasPath("tags")) {
- builder.additionalFields(config.getConfig("tags").root().unwrapped());
+ builder.indexDateFormat(config.hasPath(INDEX_DATEFORMAT_CONF)
+ ? config.getString(INDEX_DATEFORMAT_CONF) : DEFAULT_INDEX_DATE_FORMAT);
+
+ builder.timestampFieldname(config.hasPath(TIMESTAMP_FIELD_CONF)
+ ? config.getString(TIMESTAMP_FIELD_CONF) : DEFAULT_TIMESTAMP_FIELD);
+
+ if (config.hasPath(MetricConfigs.TAGS_FIELD_NAME)) {
+ builder.additionalFields(config.getConfig(MetricConfigs.TAGS_FIELD_NAME).root().unwrapped());
}
reporter = builder.build();
} catch (IOException e) {
LOG.error(e.getMessage(), e);
+ throw new IllegalStateException(e.getMessage(), e);
}
+ LOG.info("Initialized elasticsearch-sink");
}
@Override
public void start(long period, TimeUnit unit) {
- reporter.start(period, TimeUnit.SECONDS);
+ LOG.info("Starting elasticsearch-sink");
+ reporter.start(period, unit);
}
@Override
public void stop() {
+ LOG.info("Stopping elasticsearch-sink");
reporter.stop();
reporter.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
index 88b22c5..f8bc8f3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/KafkaSink.java
@@ -17,6 +17,7 @@
package org.apache.eagle.alert.metric.sink;
+import org.apache.eagle.alert.metric.MetricConfigs;
import org.apache.eagle.alert.metric.reporter.KafkaReporter;
import com.codahale.metrics.MetricRegistry;
import com.typesafe.config.Config;
@@ -35,8 +36,8 @@ public class KafkaSink implements MetricSink {
.topic(config.getString("topic"))
.config(config);
- if (config.hasPath("tags")) {
- builder.addFields(config.getConfig("tags").root().unwrapped());
+ if (config.hasPath(MetricConfigs.TAGS_FIELD_NAME)) {
+ builder.addFields(config.getConfig(MetricConfigs.TAGS_FIELD_NAME).root().unwrapped());
}
reporter = builder.build();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
index b54a5ac..71735d4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/resources/application.conf
@@ -16,6 +16,9 @@
{
metric {
sink {
+ stdout {
+ // console metric sink
+ }
kafka {
"topic": "alert_metric_test"
"bootstrap.servers": "localhost:9092"
@@ -24,7 +27,7 @@
level = "INFO"
}
elasticsearch {
- hosts = ["10.64.223.222:9200"]
+ hosts = ["localhost:9200"]
index = "alert_metric_test"
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
index caa59b3..771a667 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java
@@ -44,13 +44,13 @@ public class StormMetricConsumer implements IMetricsConsumer {
@Override
public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
- topologyName = config.getString("topology.name");
+ topologyName = config.getString("appId");
topologyId = context.getStormId();
metricSystem = MetricSystem.load(config);
metricSystem.tags(new HashMap<String, Object>() {
{
- put("topologyName", topologyName);
- put("topologyId", topologyId);
+ put("appId", topologyName);
+ put("stormId", topologyId);
}
});
metricSystem.start();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
index e060d1f..3c13ff7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -26,6 +26,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.metric.MetricConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,18 +37,29 @@ import java.util.*;
*/
public class StormMetricTaggedConsumer implements IMetricsConsumer {
public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class);
- private String topologyName;
- private Map<String, MetricSystem> metricSystems;
- private String stormId;
+ private final Map<String, MetricSystem> metricSystems = new HashMap<>();
private Config config;
+ private String metricNamePrefix;
+ private Map<String, Object> baseTags = new HashMap<>();
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) {
this.config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults());
- topologyName = config.getString("topology.name");
- stormId = context.getStormId();
- metricSystems = new HashMap<>();
+
+ if (config.hasPath("appId")) {
+ baseTags.put("appId", config.getString("appId"));
+ }
+
+ if (config.hasPath("siteId")) {
+ baseTags.put("siteId", config.getString("siteId"));
+ }
+
+ baseTags.put("appExecId", context.getStormId());
+
+ if (config.hasPath(MetricConfigs.METRIC_PREFIX_CONF)) {
+ metricNamePrefix = config.getString(MetricConfigs.METRIC_PREFIX_CONF);
+ }
}
@SuppressWarnings("serial")
@@ -59,12 +71,11 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
if (metricSystem == null) {
metricSystem = MetricSystem.load(config);
metricSystems.put(uniqueTaskKey, metricSystem);
+ metricSystem.tags(baseTags);
metricSystem.tags(new HashMap<String, Object>() {
{
- put("topology", topologyName);
- put("stormId", stormId);
- put("component", taskInfo.srcComponentId);
- put("task", taskInfo.srcTaskId);
+ put("componentId", taskInfo.srcComponentId);
+ put("taskId", taskInfo.srcTaskId);
}
});
metricSystem.start();
@@ -84,7 +95,7 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
if (dataPoint.value instanceof Map) {
Map<String, Object> values = (Map<String, Object>) dataPoint.value;
for (Map.Entry<String, Object> entry : values.entrySet()) {
- String metricName = buildSimpleMetricName(taskInfo, dataPoint.name, entry.getKey());
+ String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name, entry.getKey());
metricList.add(metricName);
Gauge gauge = metricSystem.registry().getGauges().get(metricName);
if (gauge == null) {
@@ -96,7 +107,7 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
}
}
} else {
- String metricName = buildSimpleMetricName(taskInfo, dataPoint.name);
+ String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name);
metricList.add(metricName);
Gauge gauge = metricSystem.registry().getGauges().get(metricName);
if (gauge == null) {
@@ -142,12 +153,18 @@ public class StormMetricTaggedConsumer implements IMetricsConsumer {
return String.format("%s[%s]", taskInfo.srcComponentId, taskInfo.srcTaskId);
}
- private static String buildSimpleMetricName(TaskInfo taskInfo, String... name) {
- return String.join(".", StringUtils.join(name, ".").replace("/", "."));
+ private static String buildSimpleMetricName(String prefix, TaskInfo taskInfo, String... name) {
+ String metricName = String.join(".", StringUtils.join(name, ".").replace("/", ".")).replace("__", "");
+ if (prefix == null) {
+ return metricName;
+ } else {
+ return String.format("%s%s", prefix, metricName);
+ }
}
@Override
public void cleanup() {
metricSystems.values().forEach(IMetricSystem::stop);
+ metricSystems.clear();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index fb4aff9..ccfcd93 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -21,6 +21,8 @@ import backtype.storm.LocalCluster;
import backtype.storm.generated.*;
import backtype.storm.utils.NimbusClient;
import com.google.common.base.Preconditions;
+import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.alert.engine.runner.StormMetricTaggedConsumer;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.environment.ExecutionRuntime;
import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
@@ -59,6 +61,7 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
}
public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
+ public static final String METRIC_CONFIG_PREFIX = "metric";
private static final String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
private static final String STORM_NIMBUS_HOST_DEFAULT = "localhost";
@@ -99,6 +102,10 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
if (config.hasPath(TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
conf.put(TOPOLOGY_MESSAGE_TIMEOUT_SECS, config.getInt(TOPOLOGY_MESSAGE_TIMEOUT_SECS));
}
+
+ if (config.hasPath(METRIC_CONFIG_PREFIX)) {
+ conf.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1);
+ }
return conf;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index aea52c8..0b389b6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -25,6 +25,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.scheme.JsonScheme;
import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metric.MetricConfigs;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.environment.ExecutionRuntime;
import org.apache.eagle.app.environment.ExecutionRuntimeManager;
@@ -32,6 +33,8 @@ import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.metadata.model.StreamDesc;
import org.apache.eagle.metadata.model.StreamSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Collections;
@@ -49,28 +52,33 @@ import java.util.stream.Collectors;
* </ul>
*/
public class ApplicationAction implements Serializable {
- private final Config config;
+ private final Config effectiveConfig;
private final Application application;
private final ExecutionRuntime runtime;
private final ApplicationEntity metadata;
private final IMetadataDao alertMetadataService;
+ private static final String APP_METRIC_PREFIX = "eagle.";
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationAction.class);
/**
* @param metadata ApplicationEntity.
* @param application Application.
*/
- public ApplicationAction(Application application, ApplicationEntity metadata, Config envConfig, IMetadataDao alertMetadataService) {
+ public ApplicationAction(Application application, ApplicationEntity metadata, Config serverConfig, IMetadataDao alertMetadataService) {
Preconditions.checkNotNull(application, "Application is null");
Preconditions.checkNotNull(metadata, "ApplicationEntity is null");
this.application = application;
this.metadata = metadata;
- this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(), envConfig);
+ this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(), serverConfig);
Map<String, Object> executionConfig = metadata.getConfiguration();
if (executionConfig == null) {
executionConfig = Collections.emptyMap();
}
-
- this.config = ConfigFactory.parseMap(executionConfig).withFallback(envConfig).withFallback(ConfigFactory.parseMap(metadata.getContext()));
+ if (serverConfig.hasPath(MetricConfigs.METRIC_PREFIX_CONF)) {
+ LOG.warn("Ignored sever config {} = {}", MetricConfigs.METRIC_PREFIX_CONF, serverConfig.getString(MetricConfigs.METRIC_PREFIX_CONF));
+ }
+ executionConfig.put(MetricConfigs.METRIC_PREFIX_CONF, APP_METRIC_PREFIX);
+ this.effectiveConfig = ConfigFactory.parseMap(executionConfig).withFallback(serverConfig).withFallback(ConfigFactory.parseMap(metadata.getContext()));
this.alertMetadataService = alertMetadataService;
}
@@ -78,17 +86,17 @@ public class ApplicationAction implements Serializable {
* Generate global unique streamId to install.
* TODO refactor with streamId and siteId
*/
- private static String generateUniqueStreamId(String siteId,String streamTypeId) {
- return String.format("%s_%s",streamTypeId,siteId).toUpperCase();
- }
+ private static String generateUniqueStreamId(String siteId, String streamTypeId) {
+ return String.format("%s_%s", streamTypeId, siteId).toUpperCase();
+ }
public void doInstall() {
if (metadata.getDescriptor().getStreams() != null) {
List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
StreamDefinition copied = streamDefinition.copy();
copied.setSiteId(metadata.getSite().getSiteId());
- copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(),copied.getStreamId()));
- StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), this.config);
+ copied.setStreamId(generateUniqueStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
+ StreamSinkConfig streamSinkConfig = this.runtime.environment().streamSink().getSinkConfig(copied.getStreamId(), this.effectiveConfig);
StreamDesc streamDesc = new StreamDesc();
streamDesc.setSchema(copied);
streamDesc.setSink(streamSinkConfig);
@@ -138,16 +146,16 @@ public class ApplicationAction implements Serializable {
}
public void doStart() {
- this.runtime.start(this.application, this.config);
+ this.runtime.start(this.application, this.effectiveConfig);
}
@SuppressWarnings("unchecked")
public void doStop() {
- this.runtime.stop(this.application, this.config);
+ this.runtime.stop(this.application, this.effectiveConfig);
}
public ApplicationEntity.Status getStatus() {
- return this.runtime.status(this.application, this.config);
+ return this.runtime.status(this.application, this.effectiveConfig);
}
public ApplicationEntity getMetadata() {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
index bc66ab4..51ee2cf 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java
@@ -24,6 +24,7 @@ import org.apache.eagle.app.spi.ApplicationProvider;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.metadata.model.SiteEntity;
import org.apache.eagle.metadata.resource.SiteResource;
+import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
import org.junit.Assert;
import java.util.HashMap;
@@ -36,6 +37,9 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator {
private final ApplicationResource applicationResource;
@Inject
+ ApplicationStatusUpdateService statusUpdateService;
+
+ @Inject
public ApplicationSimulatorImpl(Config config, SiteResource siteResource, ApplicationResource applicationResource) {
this.config = config;
this.siteResource = siteResource;
@@ -66,10 +70,13 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator {
ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(), appType, ApplicationEntity.Mode.LOCAL);
installOperation.setConfiguration(appConfig);
// Install application
- ApplicationEntity applicationEntity =
- applicationResource.installApplication(installOperation).getData();
+ ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
// Start application
applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
+ statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+ applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
+ statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+ applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4472fa1f/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java
new file mode 100644
index 0000000..f2a4bce
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationSimulatorImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.ws.rs.WebApplicationException;
+
+@Ignore
+public class TestApplicationSimulatorImpl extends ApplicationTestBase {
+ @Inject
+ ApplicationSimulator simulator;
+
+ @Test
+ public void testSimulatorValid() {
+ simulator.start(TestStormApplication.Provider.class);
+ }
+
+ @Test(expected = WebApplicationException.class)
+ public void testSimulatorWithStaticApp() {
+ simulator.start(TestWebApplication.Provider.class);
+ }
+}