You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/02/28 06:34:05 UTC
eagle git commit: [EAGLE-928] Refine system metric schema design and
fix system metric collector
Repository: eagle
Updated Branches:
refs/heads/master 97ae1da52 -> 6e0fc410d
[EAGLE-928] Refine system metric schema design and fix system metric collector
https://issues.apache.org/jira/browse/EAGLE-928
* Support new stream `SYSTEM_METRIC_STREAM`:
<stream>
<streamId>SYSTEM_METRIC_STREAM</streamId>
<description>System Metrics Stream including CPU, Network, Disk, etc.</description>
<columns>
<column>
<name>host</name>
<type>string</type>
</column>
<column>
<name>timestamp</name>
<type>long</type>
</column>
<column>
<name>metric</name>
<type>string</type>
</column>
<column>
<name>group</name>
<type>string</type>
</column>
<column>
<name>site</name>
<type>string</type>
</column>
<column>
<name>device</name>
<type>string</type>
</column>
<column>
<name>value</name>
<type>double</type>
<defaultValue>0.0</defaultValue>
</column>
</columns>
</stream>
* Sample Metric Event
{
'timestamp': 1487918913569,
'metric': 'system.nic.transmitdrop',
'site': 'sandbox',
'value': 7724.0,
'host': 'sandbox.hortonworks.com',
'device': 'eth0'
}
* Add `system_metric_collector.py`
* Support to persist system metric collector for query
* Refactor MetricSchemaEntity and MetricDescriptor
Author: Hao Chen <ha...@apache.org>
Closes #842 from haoch/FixSystemMetricCollector.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/6e0fc410
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/6e0fc410
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/6e0fc410
Branch: refs/heads/master
Commit: 6e0fc410d4d148eba238c86d3bb7b2be507f6d82
Parents: 97ae1da
Author: Hao Chen <ha...@apache.org>
Authored: Tue Feb 28 14:33:55 2017 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Feb 28 14:33:55 2017 +0800
----------------------------------------------------------------------
.../environment/builder/ApplicationBuilder.java | 256 ++++++++--------
.../environment/builder/MetricDefinition.java | 201 -------------
.../environment/builder/MetricDescriptor.java | 297 +++++++++++++++++++
.../app/environment/impl/StormEnvironment.java | 11 +-
.../app/messaging/MetricSchemaGenerator.java | 19 +-
.../app/messaging/MetricStreamPersist.java | 38 +--
.../apache/eagle/app/utils/AppConfigUtils.java | 30 ++
.../metadata/model/MetricSchemaEntity.java | 7 +-
.../hadoop_jmx_collector/metric_collector.py | 15 +
.../system_metric_collector.py | 24 +-
.../system_metric_config-sample.json | 11 +-
.../eagle/metric/HadoopMetricMonitorApp.java | 38 ++-
...le.metric.HadoopMetricMonitorAppProdiver.xml | 42 ++-
.../metric/HadoopMetricMonitorAppDebug.java | 3 +
.../src/test/resources/application.conf | 18 +-
.../app/dev/public/js/ctrls/metricCtrl.js | 2 +-
16 files changed, 611 insertions(+), 401 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
index 95cf491..88f0886 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
@@ -1,127 +1,131 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.messaging.MetricSchemaGenerator;
-import org.apache.eagle.app.messaging.MetricStreamPersist;
-import org.apache.eagle.app.messaging.StormStreamSource;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Storm Application Builder DSL.
- */
-public class ApplicationBuilder {
- private final StormEnvironment environment;
- private final Config appConfig;
- private final TopologyBuilder topologyBuilder;
- private final AtomicInteger identifier;
-
- public ApplicationBuilder(Config appConfig, StormEnvironment environment) {
- this.appConfig = appConfig;
- this.environment = environment;
- this.identifier = new AtomicInteger(0);
- this.topologyBuilder = new TopologyBuilder();
- }
-
- public class BuilderContext {
- public StormTopology toTopology() {
- return topologyBuilder.createTopology();
- }
- }
-
- public abstract class InitializedStream extends BuilderContext {
- private String id;
-
- InitializedStream(String id) {
- Preconditions.checkNotNull(id);
- this.id = id;
- }
-
- String getId() {
- return this.id;
- }
-
- /**
- * Persist source data stream as metric.
- */
- public BuilderContext saveAsMetric(MetricDefinition metricDefinition) {
- String metricDataID = generateId("MetricDataSink");
- String metricSchemaID = generateId("MetricSchemaGenerator");
- topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
- topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDefinition,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD));
- return this;
- }
-
- public TransformedStream transformBy(TransformFunction function) {
- String componentId = generateId(function.getName());
- topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId());
- return new TransformedStream(componentId);
- }
- }
-
- public class SourcedStream extends InitializedStream {
- private final Config appConfig;
- private final StormStreamSource streamSource;
-
- private SourcedStream(SourcedStream withSourcedStream) {
- this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource);
- }
-
- private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) {
- super(componentId);
- this.appConfig = appConfig;
- this.streamSource = streamSource;
- topologyBuilder.setSpout(componentId, streamSource);
- }
- }
-
- public class TransformedStream extends InitializedStream {
- public TransformedStream(String id) {
- super(id);
- throw new IllegalStateException("TODO: Not implemented yet");
- }
- }
-
- public TopologyBuilder getTopologyBuilder() {
- return this.topologyBuilder;
- }
-
- public StormTopology createTopology() {
- return topologyBuilder.createTopology();
- }
-
-
- public SourcedStream fromStream(String streamId) {
- return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig));
- }
-
- public SourcedStream fromStream(SourcedStream sourcedStream) {
- return new SourcedStream(sourcedStream);
- }
-
- private String generateId(String prefix) {
- return String.format("%s_%s", prefix, this.identifier.getAndIncrement());
- }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.MetricSchemaGenerator;
+import org.apache.eagle.app.messaging.MetricStreamPersist;
+import org.apache.eagle.app.messaging.StormStreamSource;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Storm Application Builder DSL.
+ */
+public class ApplicationBuilder {
+ private final StormEnvironment environment;
+ private final Config appConfig;
+ private final TopologyBuilder topologyBuilder;
+ private final AtomicInteger identifier;
+
+ public ApplicationBuilder(Config appConfig, StormEnvironment environment) {
+ this.appConfig = appConfig;
+ this.environment = environment;
+ this.identifier = new AtomicInteger(0);
+ this.topologyBuilder = new TopologyBuilder();
+ }
+
+ public class BuilderContext {
+ public StormTopology toTopology() {
+ return topologyBuilder.createTopology();
+ }
+
+ public SourcedStream fromStream(String streamId) {
+ return ApplicationBuilder.this.fromStream(streamId);
+ }
+ }
+
+ public abstract class InitializedStream extends BuilderContext {
+ private String id;
+
+ InitializedStream(String id) {
+ Preconditions.checkNotNull(id);
+ this.id = id;
+ }
+
+ String getId() {
+ return this.id;
+ }
+
+ /**
+ * Persist source data stream as metric.
+ */
+ public BuilderContext saveAsMetric(MetricDescriptor metricDescriptor) {
+ String metricDataID = generateId("MetricDataSink");
+ String metricSchemaID = generateId("MetricSchemaGenerator");
+ topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDescriptor, appConfig)).shuffleGrouping(getId());
+ topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDescriptor,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD));
+ return this;
+ }
+
+ public TransformedStream transformBy(TransformFunction function) {
+ String componentId = generateId(function.getName());
+ topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId());
+ return new TransformedStream(componentId);
+ }
+ }
+
+ public class SourcedStream extends InitializedStream {
+ private final Config appConfig;
+ private final StormStreamSource streamSource;
+
+ private SourcedStream(SourcedStream withSourcedStream) {
+ this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource);
+ }
+
+ private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) {
+ super(componentId);
+ this.appConfig = appConfig;
+ this.streamSource = streamSource;
+ topologyBuilder.setSpout(componentId, streamSource);
+ }
+ }
+
+ public class TransformedStream extends InitializedStream {
+ public TransformedStream(String id) {
+ super(id);
+ throw new IllegalStateException("TODO: Not implemented yet");
+ }
+ }
+
+ public TopologyBuilder getTopologyBuilder() {
+ return this.topologyBuilder;
+ }
+
+ public StormTopology createTopology() {
+ return topologyBuilder.createTopology();
+ }
+
+
+ public SourcedStream fromStream(String streamId) {
+ return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig));
+ }
+
+ public SourcedStream fromStream(SourcedStream sourcedStream) {
+ return new SourcedStream(sourcedStream);
+ }
+
+ private String generateId(String prefix) {
+ return String.format("%s_%s", prefix, this.identifier.getAndIncrement());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
deleted file mode 100644
index 639d27f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-
-public class MetricDefinition implements Serializable {
-
- /**
- * Support simple and complex name format, by default using "metric" field.
- */
- private NameSelector nameSelector = new FieldNameSelector("metric");
-
- /**
- * Support event/system time, by default using system time.
- */
- private TimestampSelector timestampSelector = new SystemTimestampSelector();
-
- /**
- * Metric dimension field name.
- */
- private List<String> dimensionFields;
-
- /**
- * Metric granularity.
- */
- private int granularity = Calendar.MINUTE;
-
- private String metricType = "DEFAULT";
-
- /**
- * Metric value field name.
- */
- private String valueField = "value";
-
- public NameSelector getNameSelector() {
- return nameSelector;
- }
-
- public void setNameSelector(NameSelector nameSelector) {
- this.nameSelector = nameSelector;
- }
-
- public String getValueField() {
- return valueField;
- }
-
- public void setValueField(String valueField) {
- this.valueField = valueField;
- }
-
- public List<String> getDimensionFields() {
- return dimensionFields;
- }
-
- public void setDimensionFields(List<String> dimensionFields) {
- this.dimensionFields = dimensionFields;
- }
-
- public TimestampSelector getTimestampSelector() {
- return timestampSelector;
- }
-
- public void setTimestampSelector(TimestampSelector timestampSelector) {
- this.timestampSelector = timestampSelector;
- }
-
- public int getGranularity() {
- return granularity;
- }
-
- public void setGranularity(int granularity) {
- this.granularity = granularity;
- }
-
- public String getMetricType() {
- return metricType;
- }
-
- public void setMetricType(String metricType) {
- this.metricType = metricType;
- }
-
-
- @FunctionalInterface
- public interface NameSelector extends Serializable {
- String getMetricName(Map event);
- }
-
- @FunctionalInterface
- public interface TimestampSelector extends Serializable {
- Long getTimestamp(Map event);
- }
-
- public MetricDefinition namedBy(NameSelector nameSelector) {
- this.setNameSelector(nameSelector);
- return this;
- }
-
- /**
- * @see java.util.Calendar
- */
- public MetricDefinition granularity(int granularity) {
- this.setGranularity(granularity);
- return this;
- }
-
- public MetricDefinition namedByField(String nameField) {
- this.setNameSelector(new FieldNameSelector(nameField));
- return this;
- }
-
- public static MetricDefinition metricType(String metricType) {
- MetricDefinition metricDefinition = new MetricDefinition();
- metricDefinition.setMetricType(metricType);
- return metricDefinition;
- }
-
- public MetricDefinition eventTimeByField(String timestampField) {
- this.setTimestampSelector(new EventTimestampSelector(timestampField));
- return this;
- }
-
- public MetricDefinition dimensionFields(String... dimensionFields) {
- this.setDimensionFields(Arrays.asList(dimensionFields));
- return this;
- }
-
- public MetricDefinition valueField(String valueField) {
- this.setValueField(valueField);
- return this;
- }
-
- public class EventTimestampSelector implements TimestampSelector {
- private final String timestampField;
-
- EventTimestampSelector(String timestampField) {
- this.timestampField = timestampField;
- }
-
- @Override
- public Long getTimestamp(Map event) {
- if (event.containsKey(timestampField)) {
- Object timestampValue = event.get(timestampField);
- if (timestampValue instanceof Integer) {
- return Long.valueOf((Integer) timestampValue);
- }
- if (timestampValue instanceof String) {
- return Long.valueOf((String) timestampValue);
- } else {
- return (Long) timestampValue;
- }
- } else {
- throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists");
- }
- }
- }
-
- public static class SystemTimestampSelector implements TimestampSelector {
- @Override
- public Long getTimestamp(Map event) {
- return System.currentTimeMillis();
- }
- }
-
- public static class FieldNameSelector implements NameSelector {
- private final String fieldName;
-
- FieldNameSelector(String fieldName) {
- this.fieldName = fieldName;
- }
-
- @Override
- public String getMetricName(Map event) {
- if (event.containsKey(fieldName)) {
- return (String) event.get(fieldName);
- } else {
- throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
new file mode 100644
index 0000000..e79e4d7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+public class MetricDescriptor implements Serializable {
+
+ /**
+ * Support simple and complex name format, by default using "metric" field.
+ */
+ private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric");
+
+ public MetricGroupSelector getMetricGroupSelector() {
+ return metricGroupSelector;
+ }
+
+ public void setMetricGroupSelector(MetricGroupSelector metricGroupSelector) {
+ this.metricGroupSelector = metricGroupSelector;
+ }
+
+
+ private static final String DEFAULT_METRIC_GROUP_NAME = "Default";
+
+ private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME);
+ private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site");
+
+ /**
+ * Support event/system time, by default using system time.
+ */
+ private TimestampSelector timestampSelector = new SystemTimestampSelector();
+
+ /**
+ * Metric dimension field name.
+ */
+ private List<String> dimensionFields;
+
+ /**
+ * Metric granularity.
+ */
+ private int granularity = Calendar.MINUTE;
+
+ /**
+ * Metric value field name.
+ */
+ private String valueField = "value";
+
+ public MetricNameSelector getMetricNameSelector() {
+ return metricNameSelector;
+ }
+
+ public void setMetricNameSelector(MetricNameSelector metricNameSelector) {
+ this.metricNameSelector = metricNameSelector;
+ }
+
+ public String getValueField() {
+ return valueField;
+ }
+
+ public void setValueField(String valueField) {
+ this.valueField = valueField;
+ }
+
+ public List<String> getDimensionFields() {
+ return dimensionFields;
+ }
+
+ public void setDimensionFields(List<String> dimensionFields) {
+ this.dimensionFields = dimensionFields;
+ }
+
+ public TimestampSelector getTimestampSelector() {
+ return timestampSelector;
+ }
+
+ public void setTimestampSelector(TimestampSelector timestampSelector) {
+ this.timestampSelector = timestampSelector;
+ }
+
+ public int getGranularity() {
+ return granularity;
+ }
+
+ public void setGranularity(int granularity) {
+ this.granularity = granularity;
+ }
+
+ public SiteIdSelector getSiteIdSelector() {
+ return siteIdSelector;
+ }
+
+ public void setSiteIdSelector(SiteIdSelector siteIdSelector) {
+ this.siteIdSelector = siteIdSelector;
+ }
+
+
+ @FunctionalInterface
+ public interface MetricNameSelector extends Serializable {
+ String getMetricName(Map event);
+ }
+
+ @FunctionalInterface
+ public interface MetricGroupSelector extends Serializable {
+ String getMetricGroup(Map event);
+ }
+
+ public static class FixedMetricGroupSelector implements MetricGroupSelector {
+ private final String groupName;
+
+ private FixedMetricGroupSelector(String groupName) {
+ this.groupName = groupName;
+ }
+
+ @Override
+ public String getMetricGroup(Map event) {
+ return groupName;
+ }
+ }
+
+ @FunctionalInterface
+ public interface TimestampSelector extends Serializable {
+ Long getTimestamp(Map event);
+ }
+
+ @FunctionalInterface
+ public interface SiteIdSelector extends Serializable {
+ String getSiteId(Map event);
+ }
+
+ public class FixedSiteIdSelector implements SiteIdSelector {
+ private final String siteId;
+
+ private FixedSiteIdSelector(String siteId) {
+ this.siteId = siteId;
+ }
+
+ @Override
+ public String getSiteId(Map event) {
+ return this.siteId;
+ }
+ }
+
+ private class FieldSiteIdSelector implements SiteIdSelector {
+ private final String siteIdFieldName;
+
+ public FieldSiteIdSelector(String siteIdFieldName) {
+ this.siteIdFieldName = siteIdFieldName;
+ }
+
+ @Override
+ public String getSiteId(Map event) {
+ return (String) event.getOrDefault(this.siteIdFieldName, "UNKNOWN");
+ }
+ }
+
+ public MetricDescriptor namedBy(MetricNameSelector metricNameSelector) {
+ this.setMetricNameSelector(metricNameSelector);
+ return this;
+ }
+
+ public MetricDescriptor siteAs(SiteIdSelector siteIdSelector) {
+ this.setSiteIdSelector(siteIdSelector);
+ return this;
+ }
+
+ public MetricDescriptor siteAs(String siteId) {
+ this.setSiteIdSelector(new FixedSiteIdSelector(siteId));
+ return this;
+ }
+
+ public MetricDescriptor siteByField(String fieldName) {
+ this.setMetricNameSelector(new FieldMetricNameSelector(fieldName));
+ return this;
+ }
+
+ /**
+ * @see java.util.Calendar
+ */
+ public MetricDescriptor granularity(int granularity) {
+ this.setGranularity(granularity);
+ return this;
+ }
+
+ public MetricDescriptor namedByField(String nameField) {
+ this.setMetricNameSelector(new FieldMetricNameSelector(nameField));
+ return this;
+ }
+
+ public static MetricDescriptor metricGroupAs(String metricGroupName) {
+ return metricGroupAs(new FixedMetricGroupSelector(metricGroupName));
+ }
+
+ public static MetricDescriptor metricGroupAs(MetricGroupSelector groupSelector) {
+ MetricDescriptor metricDescriptor = new MetricDescriptor();
+ metricDescriptor.setMetricGroupSelector(groupSelector);
+ return metricDescriptor;
+ }
+
+ public static MetricDescriptor metricGroupByField(String fieldName, String defaultGroupName) {
+ MetricDescriptor metricDescriptor = new MetricDescriptor();
+ metricDescriptor.setMetricGroupSelector((MetricGroupSelector) event -> {
+ if (event.containsKey(fieldName)) {
+ return (String) event.get(fieldName);
+ } else {
+ return defaultGroupName;
+ }
+ });
+ return metricDescriptor;
+ }
+
+ public static MetricDescriptor metricGroupByField(String fieldName) {
+ return metricGroupByField(fieldName, DEFAULT_METRIC_GROUP_NAME);
+ }
+
+ public MetricDescriptor eventTimeByField(String timestampField) {
+ this.setTimestampSelector(new EventTimestampSelector(timestampField));
+ return this;
+ }
+
+ public MetricDescriptor dimensionFields(String... dimensionFields) {
+ this.setDimensionFields(Arrays.asList(dimensionFields));
+ return this;
+ }
+
+ public MetricDescriptor valueField(String valueField) {
+ this.setValueField(valueField);
+ return this;
+ }
+
+ public class EventTimestampSelector implements TimestampSelector {
+ private final String timestampField;
+
+ EventTimestampSelector(String timestampField) {
+ this.timestampField = timestampField;
+ }
+
+ @Override
+ public Long getTimestamp(Map event) {
+ if (event.containsKey(timestampField)) {
+ Object timestampValue = event.get(timestampField);
+ if (timestampValue instanceof Integer) {
+ return Long.valueOf((Integer) timestampValue);
+ }
+ if (timestampValue instanceof String) {
+ return Long.valueOf((String) timestampValue);
+ } else {
+ return (Long) timestampValue;
+ }
+ } else {
+ throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists");
+ }
+ }
+ }
+
+ public static class SystemTimestampSelector implements TimestampSelector {
+ @Override
+ public Long getTimestamp(Map event) {
+ return System.currentTimeMillis();
+ }
+ }
+
+ public static class FieldMetricNameSelector implements MetricNameSelector {
+ private final String fieldName;
+
+ FieldMetricNameSelector(String fieldName) {
+ this.fieldName = fieldName;
+ }
+
+ @Override
+ public String getMetricName(Map event) {
+ if (event.containsKey(fieldName)) {
+ return (String) event.get(fieldName);
+ } else {
+ throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 942a0ac..6827eef 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -18,12 +18,11 @@ package org.apache.eagle.app.environment.impl;
import org.apache.eagle.app.environment.AbstractEnvironment;
import org.apache.eagle.app.environment.builder.ApplicationBuilder;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
import org.apache.eagle.app.environment.builder.TransformFunction;
import org.apache.eagle.app.environment.builder.TransformFunctionBolt;
import org.apache.eagle.app.messaging.*;
import com.typesafe.config.Config;
-import org.apache.eagle.metadata.model.StreamSourceConfig;
/**
* Storm Execution Environment Context.
@@ -44,16 +43,16 @@ public class StormEnvironment extends AbstractEnvironment {
return (StormStreamSource) stream().getSource(streamId,config);
}
- public MetricStreamPersist getMetricPersist(MetricDefinition metricDefinition, Config config) {
- return new MetricStreamPersist(metricDefinition, config);
+ public MetricStreamPersist getMetricPersist(MetricDescriptor metricDescriptor, Config config) {
+ return new MetricStreamPersist(metricDescriptor, config);
}
public EntityStreamPersist getEntityPersist(Config config) {
return new EntityStreamPersist(config);
}
- public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
- return new MetricSchemaGenerator(metricDefinition, config);
+ public MetricSchemaGenerator getMetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) {
+ return new MetricSchemaGenerator(metricDescriptor, config);
}
public TransformFunctionBolt getTransformer(TransformFunction function) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
index bb29cea..90e6481 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
@@ -24,7 +24,7 @@ import backtype.storm.tuple.Tuple;
import com.typesafe.config.Config;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
import org.apache.eagle.metadata.model.MetricSchemaEntity;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
import org.apache.eagle.service.client.EagleServiceClientException;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -40,14 +40,14 @@ public class MetricSchemaGenerator extends BaseRichBolt {
public static final String GENERIC_METRIC_VALUE_NAME = "value";
private final HashSet<String> metricNameCache = new HashSet<>(MAX_CACHE_LENGTH);
- private final MetricDefinition metricDefinition;
+ private final MetricDescriptor metricDescriptor;
private final Config config;
private OutputCollector collector;
private IEagleServiceClient client;
- public MetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
- this.metricDefinition = metricDefinition;
+ public MetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) {
+ this.metricDescriptor = metricDescriptor;
this.config = config;
}
@@ -63,7 +63,7 @@ public class MetricSchemaGenerator extends BaseRichBolt {
String metricName = input.getStringByField(MetricStreamPersist.METRIC_NAME_FIELD);
synchronized (metricNameCache) {
if (!metricNameCache.contains(metricName)) {
- createMetricSchemaEntity(metricName, this.metricDefinition);
+ createMetricSchemaEntity(metricName, (Map) input.getValueByField(MetricStreamPersist.METRIC_EVENT_FIELD),this.metricDescriptor);
metricNameCache.add(metricName);
}
if (metricNameCache.size() > MAX_CACHE_LENGTH) {
@@ -93,14 +93,15 @@ public class MetricSchemaGenerator extends BaseRichBolt {
}
}
- private void createMetricSchemaEntity(String metricName, MetricDefinition metricDefinition) throws IOException, EagleServiceClientException {
+ private void createMetricSchemaEntity(String metricName, Map event, MetricDescriptor metricDescriptor) throws IOException, EagleServiceClientException {
MetricSchemaEntity schemaEntity = new MetricSchemaEntity();
Map<String, String> schemaTags = new HashMap<>();
schemaEntity.setTags(schemaTags);
+ schemaTags.put(MetricSchemaEntity.METRIC_SITE_TAG, metricDescriptor.getSiteIdSelector().getSiteId(event));
schemaTags.put(MetricSchemaEntity.METRIC_NAME_TAG, metricName);
- schemaTags.put(MetricSchemaEntity.METRIC_TYPE_TAG, metricDefinition.getMetricType());
- schemaEntity.setGranularityByField(metricDefinition.getGranularity());
- schemaEntity.setDimensionFields(metricDefinition.getDimensionFields());
+ schemaTags.put(MetricSchemaEntity.METRIC_GROUP_TAG, metricDescriptor.getMetricGroupSelector().getMetricGroup(event));
+ schemaEntity.setGranularityByField(metricDescriptor.getGranularity());
+ schemaEntity.setDimensionFields(metricDescriptor.getDimensionFields());
schemaEntity.setMetricFields(Collections.singletonList(GENERIC_METRIC_VALUE_NAME));
schemaEntity.setModifiedTimestamp(System.currentTimeMillis());
GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(schemaEntity));
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
index ba99911..c9b43e5 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -24,7 +24,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
import org.apache.eagle.app.utils.StreamConvertHelper;
import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.log.entity.GenericMetricEntity;
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -44,6 +45,7 @@ import java.util.Map;
public class MetricStreamPersist extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
public static final String METRIC_NAME_FIELD = "metricName";
+ public static final String METRIC_EVENT_FIELD = "metricEvent";
private final Config config;
private final MetricMapper mapper;
@@ -52,9 +54,9 @@ public class MetricStreamPersist extends BaseRichBolt {
private OutputCollector collector;
private BatchSender batchSender;
- public MetricStreamPersist(MetricDefinition metricDefinition, Config config) {
+ public MetricStreamPersist(MetricDescriptor metricDescriptor, Config config) {
this.config = config;
- this.mapper = new StructuredMetricMapper(metricDefinition);
+ this.mapper = new StructuredMetricMapper(metricDescriptor);
this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
}
@@ -76,8 +78,10 @@ public class MetricStreamPersist extends BaseRichBolt {
@Override
public void execute(Tuple input) {
GenericMetricEntity metricEntity = null;
+ Map event = null;
try {
- metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
+ event = StreamConvertHelper.tupleToEvent(input).f1();
+ metricEntity = this.mapper.map(event);
if (batchSize <= 1) {
GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity));
if (!response.isSuccess()) {
@@ -91,8 +95,8 @@ public class MetricStreamPersist extends BaseRichBolt {
LOG.error(ex.getMessage(), ex);
collector.reportError(ex);
} finally {
- if (metricEntity != null) {
- collector.emit(Collections.singletonList(metricEntity.getPrefix()));
+ if (metricEntity != null && event != null) {
+ collector.emit(Arrays.asList(metricEntity.getPrefix(), event));
}
collector.ack(input);
}
@@ -100,7 +104,7 @@ public class MetricStreamPersist extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(METRIC_NAME_FIELD));
+ declarer.declare(new Fields(METRIC_NAME_FIELD, METRIC_EVENT_FIELD));
}
@Override
@@ -120,35 +124,35 @@ public class MetricStreamPersist extends BaseRichBolt {
}
public class StructuredMetricMapper implements MetricMapper {
- private final MetricDefinition metricDefinition;
+ private final MetricDescriptor metricDescriptor;
- private StructuredMetricMapper(MetricDefinition metricDefinition) {
- this.metricDefinition = metricDefinition;
+ private StructuredMetricMapper(MetricDescriptor metricDescriptor) {
+ this.metricDescriptor = metricDescriptor;
}
@Override
public GenericMetricEntity map(Map event) {
- String metricName = metricDefinition.getNameSelector().getMetricName(event);
+ String metricName = metricDescriptor.getMetricNameSelector().getMetricName(event);
Preconditions.checkNotNull(metricName, "Metric name is null");
- Long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event);
+ Long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);
Preconditions.checkNotNull(timestamp, "Timestamp is null");
Map<String, String> tags = new HashMap<>();
- for (String dimensionField : metricDefinition.getDimensionFields()) {
+ for (String dimensionField : metricDescriptor.getDimensionFields()) {
Preconditions.checkNotNull(dimensionField, "Dimension field name is null");
tags.put(dimensionField, (String) event.get(dimensionField));
}
double[] values;
- if (event.containsKey(metricDefinition.getValueField())) {
- values = new double[] {(double) event.get(metricDefinition.getValueField())};
+ if (event.containsKey(metricDescriptor.getValueField())) {
+ values = new double[] {(double) event.get(metricDescriptor.getValueField())};
} else {
- LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event);
+ LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event);
values = new double[] {0};
}
GenericMetricEntity entity = new GenericMetricEntity();
entity.setPrefix(metricName);
- entity.setTimestamp(DateTimeUtil.roundDown(metricDefinition.getGranularity(), timestamp));
+ entity.setTimestamp(DateTimeUtil.roundDown(metricDescriptor.getGranularity(), timestamp));
entity.setTags(tags);
entity.setValue(values);
return entity;
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java
new file mode 100644
index 0000000..7bad0bd
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.utils;
+
+import com.typesafe.config.Config;
+
+public class AppConfigUtils {
+ public static String getSiteId(Config config) {
+ return config.getString("siteId");
+ }
+
+ public static String getAppId(Config config) {
+ return config.getString("appId");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
index f18932c..3bd5825 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
@@ -30,11 +30,12 @@ import java.util.List;
@Service(MetricSchemaEntity.METRIC_SCHEMA_SERVICE)
@JsonIgnoreProperties(ignoreUnknown = true)
@TimeSeries(false)
-@Tags({"metricName","metricType"})
+@Tags({"site","site","group"})
public class MetricSchemaEntity extends TaggedLogAPIEntity {
static final String METRIC_SCHEMA_SERVICE = "MetricSchemaService";
- public static final String METRIC_NAME_TAG = "metricName";
- public static final String METRIC_TYPE_TAG = "metricType";
+ public static final String METRIC_NAME_TAG = "name";
+ public static final String METRIC_SITE_TAG = "site";
+ public static final String METRIC_GROUP_TAG = "group";
@Column("a")
private List<String> dimensionFields;
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index bf1d4df..c3fdb43 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -210,6 +210,7 @@ class KafkaMetricSender(MetricSender):
self.default_topic = None
if kafka_config.has_key("default_topic"):
self.default_topic = kafka_config["default_topic"].encode('utf-8')
+ logging.info("Using default topic: %s" % self.default_topic)
self.component_topic_mapping = {}
if kafka_config.has_key("component_topic_mapping"):
self.component_topic_mapping = kafka_config["component_topic_mapping"]
@@ -266,12 +267,18 @@ class MetricCollector(threading.Thread):
filters = []
config = None
closed = False
+ collected_event_count = 0
+ ignored_event_count = 0
+ emit_event_count = 0
def __init__(self, config=None):
threading.Thread.__init__(self)
self.config = None
self.sender = None
self.fqdn = socket.getfqdn()
+ self.ignored_event_count = 0
+ self.collected_event_count = 0
+ self.emit_event_count = 0
def init(self, config):
self.config = config
@@ -296,6 +303,7 @@ class MetricCollector(threading.Thread):
def collect(self, msg, type='float'):
try:
+ self.collected_event_count = self.collected_event_count + 1
if not msg.has_key("timestamp"):
msg["timestamp"] = int(round(time.time() * 1000))
if msg.has_key("value") and type == 'float':
@@ -304,21 +312,28 @@ class MetricCollector(threading.Thread):
msg["value"] = str(msg["value"])
if not msg.has_key("host") or len(msg["host"]) == 0:
raise Exception("host is null: " + str(msg))
+
if not msg.has_key("site"):
msg["site"] = self.config["env"]["site"]
+
if len(self.filters) == 0:
+ self.emit_event_count = self.emit_event_count + 1
self.sender.send(msg)
return
else:
for filter in self.filters:
if filter.filter_metric(msg):
+ self.emit_event_count = self.emit_event_count + 1
self.sender.send(msg)
return
+ self.ignored_event_count = self.ignored_event_count + 1
except Exception as e:
logging.error("Failed to emit metric: %s" % msg)
logging.exception(e)
def close(self):
+ logging.info("Collected %s events (emitted: %s, ignored: %s)"
+ % (self.collected_event_count, self.emit_event_count, self.ignored_event_count))
self.sender.close()
self.closed = True
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/system_metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_collector.py b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
index e0ffecc..4c95a6c 100644
--- a/eagle-external/hadoop_jmx_collector/system_metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
@@ -17,8 +17,7 @@
#
from metric_collector import MetricCollector, Runner
-import logging, socket, string, os, re, time
-
+import logging, socket, string, os, re, time, json
class SystemMetricCollector(MetricCollector):
METRIC_PREFIX = "system"
@@ -86,7 +85,6 @@ class SystemMetricCollector(MetricCollector):
"""
- cpu_metric = self.new_metric()
cpu_info = os.popen('cat /proc/stat').readlines()
dimensions = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"]
@@ -109,6 +107,7 @@ class SystemMetricCollector(MetricCollector):
metric_event = dict()
for i in range(1, demens):
metric_event[dimensions[i]] = int(items[i])
+ cpu_metric = self.new_metric("system.cpu")
cpu_metric['timestamp'] = int(round(time.time() * 1000))
cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + dimensions[i]
cpu_metric['device'] = items[0]
@@ -123,6 +122,7 @@ class SystemMetricCollector(MetricCollector):
total_cpu_usage += per_cpu_usage
# system.cpu.usage
+ cpu_metric = self.new_metric("system.cpu")
cpu_metric['timestamp'] = int(round(time.time() * 1000))
cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "usage"
cpu_metric['device'] = items[0]
@@ -141,6 +141,7 @@ class SystemMetricCollector(MetricCollector):
result = re.split("\s+", cpu_stat_pre.rstrip())
pre_total_cpu_usage = int(result[0])
pre_total_cpu = int(result[1])
+
cpu_metric['timestamp'] = int(round(time.time() * 1000))
cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "totalusage"
cpu_metric['device'] = "cpu"
@@ -153,7 +154,7 @@ class SystemMetricCollector(MetricCollector):
# ====================================
def collect_uptime_metric(self):
- metric = self.new_metric()
+ metric = self.new_metric("system.os")
demension = ["uptime.day", "idletime.day"]
output = os.popen('cat /proc/uptime').readlines()
@@ -170,7 +171,7 @@ class SystemMetricCollector(MetricCollector):
# ====================================
def collect_memory_metric(self):
- event = self.new_metric()
+ event = self.new_metric("system.memory")
event["host"] = self.fqdn
output = os.popen('cat /proc/meminfo').readlines()
mem_info = dict()
@@ -208,7 +209,7 @@ class SystemMetricCollector(MetricCollector):
items = re.split("\s+", item.rstrip())
demens = min(len(demension), len(items))
for i in range(demens):
- event = self.new_metric()
+ event = self.new_metric("system.cpu")
event["timestamp"] = int(round(time.time() * 1000))
event["metric"] = self.METRIC_PREFIX + "." + demension[i]
event["value"] = items[i]
@@ -223,7 +224,7 @@ class SystemMetricCollector(MetricCollector):
output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
for item in output:
items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip())
- event = self.new_metric()
+ event = self.new_metric("System.CPU")
event["timestamp"] = int(round(time.time() * 1000))
event["metric"] = DATA_TYPE + "." + 'cpu.temp'
event["value"] = items[2]
@@ -247,7 +248,7 @@ class SystemMetricCollector(MetricCollector):
filtered_items = items[1:5] + items[9:13]
for i in range(len(demension)):
- kafka_dict = self.new_metric()
+ kafka_dict = self.new_metric("system.network")
kafka_dict["timestamp"] = int(round(time.time() * 1000))
kafka_dict['metric'] = self.METRIC_PREFIX + "." + 'nic.' + demension[i]
kafka_dict["value"] = filtered_items[i]
@@ -270,7 +271,7 @@ class SystemMetricCollector(MetricCollector):
continue
lineitems = re.split("\s+", line)
metric = 'smartdisk.' + lineitems[1]
- kafka_dict = self.new_metric()
+ kafka_dict = self.new_metric("system.disk")
kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
kafka_dict["timestamp"] = int(round(time.time() * 1000))
kafka_dict["value"] = lineitems[-1]
@@ -308,7 +309,7 @@ class SystemMetricCollector(MetricCollector):
for key, metrics in iostat_dict.iteritems():
for i in range(len(metrics)):
metric = 'disk.' + demension[i]
- kafka_dict = self.new_metric()
+ kafka_dict = self.new_metric("system.disk")
kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
kafka_dict["timestamp"] = int(round(time.time() * 1000))
kafka_dict["value"] = metrics[i]
@@ -326,9 +327,10 @@ class SystemMetricCollector(MetricCollector):
event["device"] = device
self.collect(event)
- def new_metric(self):
+ def new_metric(self, group):
metric = dict()
metric["host"] = self.fqdn
+ metric["group"] = group
return metric
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
index 6fcd43b..27ad5bb 100644
--- a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
@@ -1,20 +1,21 @@
{
"env": {
"site": "sandbox",
- "log_file": "/tmp/hadoop-jmx-collector.log",
"cpu_stat_file": "/tmp/eagle_cpu_usage_state"
},
- "input": [
- ],
"filter": {
+ "bean_group_filter": ["hadoop","java.lang","java.nio"],
+ "metric_name_filter": [
+ "system.*"
+ ]
},
"output": {
"kafka": {
- "debug": false,
+ "debug": true,
"default_topic": "system_metric_sandbox",
"broker_list": [
"sandbox.hortonworks.com:6667"
]
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
index 304e500..0c49d82 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
+++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -19,8 +19,10 @@ package org.apache.eagle.metric;
import backtype.storm.generated.StormTopology;
import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
+import org.apache.eagle.app.environment.builder.MetricDescriptor.MetricGroupSelector;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.utils.AppConfigUtils;
import java.util.Calendar;
@@ -28,14 +30,30 @@ public class HadoopMetricMonitorApp extends StormApplication {
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
return environment.newApp(config)
- .fromStream("HADOOP_JMX_METRIC_STREAM")
- .saveAsMetric(MetricDefinition
- .metricType("HADOOP_JMX_METRICS")
- .namedByField("metric")
- .eventTimeByField("timestamp")
- .dimensionFields("host","component","site")
- .granularity(Calendar.MINUTE)
- .valueField("value"))
- .toTopology();
+ .fromStream("HADOOP_JMX_METRIC_STREAM")
+ .saveAsMetric(
+ MetricDescriptor.metricGroupAs((MetricGroupSelector) event -> {
+ if (event.containsKey("component")) {
+ return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase());
+ } else {
+ return "hadoop.metrics";
+ }
+ })
+ .siteAs(AppConfigUtils.getSiteId(config))
+ .namedByField("metric")
+ .eventTimeByField("timestamp")
+ .dimensionFields("host", "component", "site")
+ .granularity(Calendar.MINUTE)
+ .valueField("value"))
+ .fromStream("SYSTEM_METRIC_STREAM")
+ .saveAsMetric(MetricDescriptor.metricGroupByField("group")
+ .siteAs(AppConfigUtils.getSiteId(config))
+ .namedByField("metric")
+ .eventTimeByField("timestamp")
+ .dimensionFields("host", "group", "site", "device")
+ .granularity(Calendar.MINUTE)
+ .valueField("value")
+ )
+ .toTopology();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
index 073c900..51a9257 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+++ b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
@@ -20,7 +20,6 @@
<type>HADOOP_METRIC_MONITOR</type>
<name>Hadoop Metrics Monitor</name>
<configuration>
- <!-- data fromStream configurations -->
<property>
<name>dataSinkConfig.HADOOP_JMX_METRIC_STREAM.topic</name>
<displayName>JMX Metric Kafka Topic</displayName>
@@ -29,6 +28,12 @@
<required>true</required>
</property>
<property>
+ <name>dataSinkConfig.SYSTEM_METRIC_STREAM.topic</name>
+ <displayName>System Metric Kafka Topic</displayName>
+ <value>system_metric_${siteId}</value>
+ <description>System JMX metric kafka topic name for stream: SYSTEM_METRIC_STREAM</description>
+ </property>
+ <property>
<name>dataSinkConfig.HADOOP_JMX_RESOURCE_STREAM.topic</name>
<displayName>JMX Resource Kafka Topic</displayName>
<value>hadoop_jmx_resource_${siteId}</value>
@@ -134,6 +139,41 @@
</columns>
</stream>
<stream>
+ <streamId>SYSTEM_METRIC_STREAM</streamId>
+ <description>System Metrics Stream including CPU, Network, Disk, etc.</description>
+ <columns>
+ <column>
+ <name>host</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>timestamp</name>
+ <type>long</type>
+ </column>
+ <column>
+ <name>metric</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>group</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>site</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>device</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>value</name>
+ <type>double</type>
+ <defaultValue>0.0</defaultValue>
+ </column>
+ </columns>
+ </stream>
+ <stream>
<streamId>HADOOP_JMX_RESOURCE_STREAM</streamId>
<description>Hadoop JMX Resource Stream including name node, resource manager, etc.</description>
<columns>
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java b/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
index e4589b7..e6f1463 100644
--- a/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
+++ b/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
@@ -16,6 +16,9 @@
*/
package org.apache.eagle.metric;
+import org.junit.Ignore;
+
+@Ignore
public class HadoopMetricMonitorAppDebug {
public static void main(String[] args) {
new HadoopMetricMonitorApp().run(args);
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf b/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
index 4d74666..c864bf4 100644
--- a/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
+++ b/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
@@ -32,18 +32,14 @@
"mode" : "LOCAL",
"siteId" : "testsite",
"dataSourceConfig": {
- "topic" : "hadoop_jmx_metric",
+ "HADOOP_JMX_METRIC_STREAM": {
+ "topic": "hadoop_jmx_metric_sandbox",
+ }
+ "SYSTEM_METRIC_STREAM": {
+ "topic": "system_metric_sandbox",
+ }
+ // "topic" : "hadoop_jmx_metric_sandbox",
"zkConnection" : "localhost:2181",
"txZkServers" : "localhost:2181"
}
- "dataSinkConfig": {
- "topic" : "hadoop_jmx_metric",
- "brokerList" : "localhost:6667",
- "serializerClass" : "kafka.serializer.StringEncoder",
- "keySerializerClass" : "kafka.serializer.StringEncoder"
- "producerType" : "async",
- "numBatchMessages" : "4096",
- "maxQueueBufferMs" : "5000",
- "requestRequiredAcks" : "0"
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
index e0f0f15..b310738 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
@@ -40,7 +40,7 @@
$scope.metricList = [$scope.metricName];
CompatibleEntity.groups({
query: 'MetricSchemaService',
- groups: 'metricName',
+ groups: 'name',
fields: 'count',
limit: 9999,
})._promise.then(function (res) {