You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/02/28 06:34:05 UTC

eagle git commit: [EAGLE-928] Refine system metric schema design and fix system metric collector

Repository: eagle
Updated Branches:
  refs/heads/master 97ae1da52 -> 6e0fc410d


[EAGLE-928] Refine system metric schema design and fix system metric collector

https://issues.apache.org/jira/browse/EAGLE-928

* Support new stream `SYSTEM_METRIC_STREAM`:

        <stream>
                <streamId>SYSTEM_METRIC_STREAM</streamId>
                <description>System Metrics Stream including CPU, Network, Disk, etc.</description>
                <columns>
                    <column>
                        <name>host</name>
                        <type>string</type>
                    </column>
                    <column>
                        <name>timestamp</name>
                        <type>long</type>
                    </column>
                    <column>
                        <name>metric</name>
                        <type>string</type>
                    </column>
                    <column>
                        <name>group</name>
                        <type>string</type>
                    </column>
                    <column>
                        <name>site</name>
                        <type>string</type>
                    </column>
                    <column>
                        <name>device</name>
                        <type>string</type>
                    </column>
                    <column>
                        <name>value</name>
                        <type>double</type>
                        <defaultValue>0.0</defaultValue>
                    </column>
                </columns>
            </stream>

* Sample Metric Event

        {
            'timestamp': 1487918913569,
            'metric': 'system.nic.transmitdrop',
            'site': 'sandbox',
            'value': 7724.0,
            'host': 'sandbox.hortonworks.com',
            'device': 'eth0'
        }

* Add `system_metric_collector.py`
* Support to persist system metric collector for query
* Refactor MetricSchemaEntity and MetricDescriptor

Author: Hao Chen <ha...@apache.org>

Closes #842 from haoch/FixSystemMetricCollector.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/6e0fc410
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/6e0fc410
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/6e0fc410

Branch: refs/heads/master
Commit: 6e0fc410d4d148eba238c86d3bb7b2be507f6d82
Parents: 97ae1da
Author: Hao Chen <ha...@apache.org>
Authored: Tue Feb 28 14:33:55 2017 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Feb 28 14:33:55 2017 +0800

----------------------------------------------------------------------
 .../environment/builder/ApplicationBuilder.java | 256 ++++++++--------
 .../environment/builder/MetricDefinition.java   | 201 -------------
 .../environment/builder/MetricDescriptor.java   | 297 +++++++++++++++++++
 .../app/environment/impl/StormEnvironment.java  |  11 +-
 .../app/messaging/MetricSchemaGenerator.java    |  19 +-
 .../app/messaging/MetricStreamPersist.java      |  38 +--
 .../apache/eagle/app/utils/AppConfigUtils.java  |  30 ++
 .../metadata/model/MetricSchemaEntity.java      |   7 +-
 .../hadoop_jmx_collector/metric_collector.py    |  15 +
 .../system_metric_collector.py                  |  24 +-
 .../system_metric_config-sample.json            |  11 +-
 .../eagle/metric/HadoopMetricMonitorApp.java    |  38 ++-
 ...le.metric.HadoopMetricMonitorAppProdiver.xml |  42 ++-
 .../metric/HadoopMetricMonitorAppDebug.java     |   3 +
 .../src/test/resources/application.conf         |  18 +-
 .../app/dev/public/js/ctrls/metricCtrl.js       |   2 +-
 16 files changed, 611 insertions(+), 401 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
index 95cf491..88f0886 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
@@ -1,127 +1,131 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.messaging.MetricSchemaGenerator;
-import org.apache.eagle.app.messaging.MetricStreamPersist;
-import org.apache.eagle.app.messaging.StormStreamSource;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Storm Application Builder DSL.
- */
-public class ApplicationBuilder {
-    private final StormEnvironment environment;
-    private final Config appConfig;
-    private final TopologyBuilder topologyBuilder;
-    private final AtomicInteger identifier;
-
-    public ApplicationBuilder(Config appConfig, StormEnvironment environment) {
-        this.appConfig = appConfig;
-        this.environment = environment;
-        this.identifier = new AtomicInteger(0);
-        this.topologyBuilder = new TopologyBuilder();
-    }
-
-    public class BuilderContext {
-        public StormTopology toTopology() {
-            return topologyBuilder.createTopology();
-        }
-    }
-
-    public abstract class InitializedStream extends BuilderContext {
-        private String id;
-
-        InitializedStream(String id) {
-            Preconditions.checkNotNull(id);
-            this.id = id;
-        }
-
-        String getId() {
-            return this.id;
-        }
-
-        /**
-         * Persist source data stream as metric.
-         */
-        public BuilderContext saveAsMetric(MetricDefinition metricDefinition) {
-            String metricDataID = generateId("MetricDataSink");
-            String metricSchemaID = generateId("MetricSchemaGenerator");
-            topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
-            topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDefinition,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD));
-            return this;
-        }
-
-        public TransformedStream transformBy(TransformFunction function) {
-            String componentId = generateId(function.getName());
-            topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId());
-            return new TransformedStream(componentId);
-        }
-    }
-
-    public class SourcedStream extends InitializedStream {
-        private final Config appConfig;
-        private final StormStreamSource streamSource;
-
-        private SourcedStream(SourcedStream withSourcedStream) {
-            this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource);
-        }
-
-        private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) {
-            super(componentId);
-            this.appConfig = appConfig;
-            this.streamSource = streamSource;
-            topologyBuilder.setSpout(componentId, streamSource);
-        }
-    }
-
-    public class TransformedStream extends InitializedStream {
-        public TransformedStream(String id) {
-            super(id);
-            throw new IllegalStateException("TODO: Not implemented yet");
-        }
-    }
-
-    public TopologyBuilder getTopologyBuilder() {
-        return this.topologyBuilder;
-    }
-
-    public StormTopology createTopology() {
-        return topologyBuilder.createTopology();
-    }
-
-
-    public SourcedStream fromStream(String streamId) {
-        return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig));
-    }
-
-    public SourcedStream fromStream(SourcedStream sourcedStream) {
-        return new SourcedStream(sourcedStream);
-    }
-
-    private String generateId(String prefix) {
-        return String.format("%s_%s", prefix, this.identifier.getAndIncrement());
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.MetricSchemaGenerator;
+import org.apache.eagle.app.messaging.MetricStreamPersist;
+import org.apache.eagle.app.messaging.StormStreamSource;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Storm Application Builder DSL.
+ */
+public class ApplicationBuilder {
+    private final StormEnvironment environment;
+    private final Config appConfig;
+    private final TopologyBuilder topologyBuilder;
+    private final AtomicInteger identifier;
+
+    public ApplicationBuilder(Config appConfig, StormEnvironment environment) {
+        this.appConfig = appConfig;
+        this.environment = environment;
+        this.identifier = new AtomicInteger(0);
+        this.topologyBuilder = new TopologyBuilder();
+    }
+
+    public class BuilderContext {
+        public StormTopology toTopology() {
+            return topologyBuilder.createTopology();
+        }
+
+        public SourcedStream fromStream(String streamId) {
+            return ApplicationBuilder.this.fromStream(streamId);
+        }
+    }
+
+    public abstract class InitializedStream extends BuilderContext {
+        private String id;
+
+        InitializedStream(String id) {
+            Preconditions.checkNotNull(id);
+            this.id = id;
+        }
+
+        String getId() {
+            return this.id;
+        }
+
+        /**
+         * Persist source data stream as metric.
+         */
+        public BuilderContext saveAsMetric(MetricDescriptor metricDescriptor) {
+            String metricDataID = generateId("MetricDataSink");
+            String metricSchemaID = generateId("MetricSchemaGenerator");
+            topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDescriptor, appConfig)).shuffleGrouping(getId());
+            topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDescriptor,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD));
+            return this;
+        }
+
+        public TransformedStream transformBy(TransformFunction function) {
+            String componentId = generateId(function.getName());
+            topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId());
+            return new TransformedStream(componentId);
+        }
+    }
+
+    public class SourcedStream extends InitializedStream {
+        private final Config appConfig;
+        private final StormStreamSource streamSource;
+
+        private SourcedStream(SourcedStream withSourcedStream) {
+            this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource);
+        }
+
+        private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) {
+            super(componentId);
+            this.appConfig = appConfig;
+            this.streamSource = streamSource;
+            topologyBuilder.setSpout(componentId, streamSource);
+        }
+    }
+
+    public class TransformedStream extends InitializedStream {
+        public TransformedStream(String id) {
+            super(id);
+            throw new IllegalStateException("TODO: Not implemented yet");
+        }
+    }
+
+    public TopologyBuilder getTopologyBuilder() {
+        return this.topologyBuilder;
+    }
+
+    public StormTopology createTopology() {
+        return topologyBuilder.createTopology();
+    }
+
+
+    public SourcedStream fromStream(String streamId) {
+        return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig));
+    }
+
+    public SourcedStream fromStream(SourcedStream sourcedStream) {
+        return new SourcedStream(sourcedStream);
+    }
+
+    private String generateId(String prefix) {
+        return String.format("%s_%s", prefix, this.identifier.getAndIncrement());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
deleted file mode 100644
index 639d27f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.builder;
-
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-
-public class MetricDefinition implements Serializable {
-
-    /**
-     * Support simple and complex name format, by default using "metric" field.
-     */
-    private NameSelector nameSelector = new FieldNameSelector("metric");
-
-    /**
-     * Support event/system time, by default using system time.
-     */
-    private TimestampSelector timestampSelector = new SystemTimestampSelector();
-
-    /**
-     * Metric dimension field name.
-     */
-    private List<String> dimensionFields;
-
-    /**
-     * Metric granularity.
-     */
-    private int granularity = Calendar.MINUTE;
-
-    private String metricType = "DEFAULT";
-
-    /**
-     * Metric value field name.
-     */
-    private String valueField = "value";
-
-    public NameSelector getNameSelector() {
-        return nameSelector;
-    }
-
-    public void setNameSelector(NameSelector nameSelector) {
-        this.nameSelector = nameSelector;
-    }
-
-    public String getValueField() {
-        return valueField;
-    }
-
-    public void setValueField(String valueField) {
-        this.valueField = valueField;
-    }
-
-    public List<String> getDimensionFields() {
-        return dimensionFields;
-    }
-
-    public void setDimensionFields(List<String> dimensionFields) {
-        this.dimensionFields = dimensionFields;
-    }
-
-    public TimestampSelector getTimestampSelector() {
-        return timestampSelector;
-    }
-
-    public void setTimestampSelector(TimestampSelector timestampSelector) {
-        this.timestampSelector = timestampSelector;
-    }
-
-    public int getGranularity() {
-        return granularity;
-    }
-
-    public void setGranularity(int granularity) {
-        this.granularity = granularity;
-    }
-
-    public String getMetricType() {
-        return metricType;
-    }
-
-    public void setMetricType(String metricType) {
-        this.metricType = metricType;
-    }
-
-
-    @FunctionalInterface
-    public interface NameSelector extends Serializable {
-        String getMetricName(Map event);
-    }
-
-    @FunctionalInterface
-    public interface TimestampSelector extends Serializable {
-        Long getTimestamp(Map event);
-    }
-
-    public MetricDefinition namedBy(NameSelector nameSelector) {
-        this.setNameSelector(nameSelector);
-        return this;
-    }
-
-    /**
-     * @see java.util.Calendar
-     */
-    public MetricDefinition granularity(int granularity) {
-        this.setGranularity(granularity);
-        return this;
-    }
-
-    public MetricDefinition namedByField(String nameField) {
-        this.setNameSelector(new FieldNameSelector(nameField));
-        return this;
-    }
-
-    public static MetricDefinition metricType(String metricType) {
-        MetricDefinition metricDefinition = new MetricDefinition();
-        metricDefinition.setMetricType(metricType);
-        return metricDefinition;
-    }
-
-    public MetricDefinition eventTimeByField(String timestampField) {
-        this.setTimestampSelector(new EventTimestampSelector(timestampField));
-        return this;
-    }
-
-    public MetricDefinition dimensionFields(String... dimensionFields) {
-        this.setDimensionFields(Arrays.asList(dimensionFields));
-        return this;
-    }
-
-    public MetricDefinition valueField(String valueField) {
-        this.setValueField(valueField);
-        return this;
-    }
-
-    public class EventTimestampSelector implements TimestampSelector {
-        private final String timestampField;
-
-        EventTimestampSelector(String timestampField) {
-            this.timestampField = timestampField;
-        }
-
-        @Override
-        public Long getTimestamp(Map event) {
-            if (event.containsKey(timestampField)) {
-                Object timestampValue = event.get(timestampField);
-                if (timestampValue instanceof Integer) {
-                    return Long.valueOf((Integer) timestampValue);
-                }
-                if (timestampValue instanceof String) {
-                    return Long.valueOf((String) timestampValue);
-                } else {
-                    return (Long) timestampValue;
-                }
-            } else {
-                throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists");
-            }
-        }
-    }
-
-    public static class SystemTimestampSelector implements TimestampSelector {
-        @Override
-        public Long getTimestamp(Map event) {
-            return System.currentTimeMillis();
-        }
-    }
-
-    public static class FieldNameSelector implements NameSelector {
-        private final String fieldName;
-
-        FieldNameSelector(String fieldName) {
-            this.fieldName = fieldName;
-        }
-
-        @Override
-        public String getMetricName(Map event) {
-            if (event.containsKey(fieldName)) {
-                return (String) event.get(fieldName);
-            } else {
-                throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
new file mode 100644
index 0000000..e79e4d7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.builder;
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+public class MetricDescriptor implements Serializable {
+
+    /**
+     * Support simple and complex name format, by default using "metric" field.
+     */
+    private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric");
+
+    public MetricGroupSelector getMetricGroupSelector() {
+        return metricGroupSelector;
+    }
+
+    public void setMetricGroupSelector(MetricGroupSelector metricGroupSelector) {
+        this.metricGroupSelector = metricGroupSelector;
+    }
+
+
+    private static final String DEFAULT_METRIC_GROUP_NAME = "Default";
+
+    private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME);
+    private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site");
+
+    /**
+     * Support event/system time, by default using system time.
+     */
+    private TimestampSelector timestampSelector = new SystemTimestampSelector();
+
+    /**
+     * Metric dimension field name.
+     */
+    private List<String> dimensionFields;
+
+    /**
+     * Metric granularity.
+     */
+    private int granularity = Calendar.MINUTE;
+
+    /**
+     * Metric value field name.
+     */
+    private String valueField = "value";
+
+    public MetricNameSelector getMetricNameSelector() {
+        return metricNameSelector;
+    }
+
+    public void setMetricNameSelector(MetricNameSelector metricNameSelector) {
+        this.metricNameSelector = metricNameSelector;
+    }
+
+    public String getValueField() {
+        return valueField;
+    }
+
+    public void setValueField(String valueField) {
+        this.valueField = valueField;
+    }
+
+    public List<String> getDimensionFields() {
+        return dimensionFields;
+    }
+
+    public void setDimensionFields(List<String> dimensionFields) {
+        this.dimensionFields = dimensionFields;
+    }
+
+    public TimestampSelector getTimestampSelector() {
+        return timestampSelector;
+    }
+
+    public void setTimestampSelector(TimestampSelector timestampSelector) {
+        this.timestampSelector = timestampSelector;
+    }
+
+    public int getGranularity() {
+        return granularity;
+    }
+
+    public void setGranularity(int granularity) {
+        this.granularity = granularity;
+    }
+
+    public SiteIdSelector getSiteIdSelector() {
+        return siteIdSelector;
+    }
+
+    public void setSiteIdSelector(SiteIdSelector siteIdSelector) {
+        this.siteIdSelector = siteIdSelector;
+    }
+
+
+    @FunctionalInterface
+    public interface MetricNameSelector extends Serializable {
+        String getMetricName(Map event);
+    }
+
+    @FunctionalInterface
+    public interface MetricGroupSelector extends Serializable {
+        String getMetricGroup(Map event);
+    }
+
+    public static class FixedMetricGroupSelector implements MetricGroupSelector {
+        private final String groupName;
+
+        private FixedMetricGroupSelector(String groupName) {
+            this.groupName = groupName;
+        }
+
+        @Override
+        public String getMetricGroup(Map event) {
+            return groupName;
+        }
+    }
+
+    @FunctionalInterface
+    public interface TimestampSelector extends Serializable {
+        Long getTimestamp(Map event);
+    }
+
+    @FunctionalInterface
+    public interface SiteIdSelector extends Serializable {
+        String getSiteId(Map event);
+    }
+
+    public class FixedSiteIdSelector implements SiteIdSelector {
+        private final String siteId;
+
+        private FixedSiteIdSelector(String siteId) {
+            this.siteId = siteId;
+        }
+
+        @Override
+        public String getSiteId(Map event) {
+            return this.siteId;
+        }
+    }
+
+    private class FieldSiteIdSelector implements SiteIdSelector {
+        private final String siteIdFieldName;
+
+        public FieldSiteIdSelector(String siteIdFieldName) {
+            this.siteIdFieldName = siteIdFieldName;
+        }
+
+        @Override
+        public String getSiteId(Map event) {
+            return (String) event.getOrDefault(this.siteIdFieldName, "UNKNOWN");
+        }
+    }
+
+    public MetricDescriptor namedBy(MetricNameSelector metricNameSelector) {
+        this.setMetricNameSelector(metricNameSelector);
+        return this;
+    }
+
+    public MetricDescriptor siteAs(SiteIdSelector siteIdSelector) {
+        this.setSiteIdSelector(siteIdSelector);
+        return this;
+    }
+
+    public MetricDescriptor siteAs(String siteId) {
+        this.setSiteIdSelector(new FixedSiteIdSelector(siteId));
+        return this;
+    }
+
+    public MetricDescriptor siteByField(String fieldName) {
+        this.setMetricNameSelector(new FieldMetricNameSelector(fieldName));
+        return this;
+    }
+
+    /**
+     * @see java.util.Calendar
+     */
+    public MetricDescriptor granularity(int granularity) {
+        this.setGranularity(granularity);
+        return this;
+    }
+
+    public MetricDescriptor namedByField(String nameField) {
+        this.setMetricNameSelector(new FieldMetricNameSelector(nameField));
+        return this;
+    }
+
+    public static MetricDescriptor metricGroupAs(String metricGroupName) {
+        return metricGroupAs(new FixedMetricGroupSelector(metricGroupName));
+    }
+
+    public static MetricDescriptor metricGroupAs(MetricGroupSelector groupSelector) {
+        MetricDescriptor metricDescriptor = new MetricDescriptor();
+        metricDescriptor.setMetricGroupSelector(groupSelector);
+        return metricDescriptor;
+    }
+
+    public static MetricDescriptor metricGroupByField(String fieldName, String defaultGroupName) {
+        MetricDescriptor metricDescriptor = new MetricDescriptor();
+        metricDescriptor.setMetricGroupSelector((MetricGroupSelector) event -> {
+            if (event.containsKey(fieldName)) {
+                return (String) event.get(fieldName);
+            } else {
+                return defaultGroupName;
+            }
+        });
+        return metricDescriptor;
+    }
+
+    public static MetricDescriptor metricGroupByField(String fieldName) {
+        return metricGroupByField(fieldName, DEFAULT_METRIC_GROUP_NAME);
+    }
+
+    public MetricDescriptor eventTimeByField(String timestampField) {
+        this.setTimestampSelector(new EventTimestampSelector(timestampField));
+        return this;
+    }
+
+    public MetricDescriptor dimensionFields(String... dimensionFields) {
+        this.setDimensionFields(Arrays.asList(dimensionFields));
+        return this;
+    }
+
+    public MetricDescriptor valueField(String valueField) {
+        this.setValueField(valueField);
+        return this;
+    }
+
+    public class EventTimestampSelector implements TimestampSelector {
+        private final String timestampField;
+
+        EventTimestampSelector(String timestampField) {
+            this.timestampField = timestampField;
+        }
+
+        @Override
+        public Long getTimestamp(Map event) {
+            if (event.containsKey(timestampField)) {
+                Object timestampValue = event.get(timestampField);
+                if (timestampValue instanceof Integer) {
+                    return Long.valueOf((Integer) timestampValue);
+                }
+                if (timestampValue instanceof String) {
+                    return Long.valueOf((String) timestampValue);
+                } else {
+                    return (Long) timestampValue;
+                }
+            } else {
+                throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists");
+            }
+        }
+    }
+
+    public static class SystemTimestampSelector implements TimestampSelector {
+        @Override
+        public Long getTimestamp(Map event) {
+            return System.currentTimeMillis();
+        }
+    }
+
+    public static class FieldMetricNameSelector implements MetricNameSelector {
+        private final String fieldName;
+
+        FieldMetricNameSelector(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        @Override
+        public String getMetricName(Map event) {
+            if (event.containsKey(fieldName)) {
+                return (String) event.get(fieldName);
+            } else {
+                throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 942a0ac..6827eef 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -18,12 +18,11 @@ package org.apache.eagle.app.environment.impl;
 
 import org.apache.eagle.app.environment.AbstractEnvironment;
 import org.apache.eagle.app.environment.builder.ApplicationBuilder;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
 import org.apache.eagle.app.environment.builder.TransformFunction;
 import org.apache.eagle.app.environment.builder.TransformFunctionBolt;
 import org.apache.eagle.app.messaging.*;
 import com.typesafe.config.Config;
-import org.apache.eagle.metadata.model.StreamSourceConfig;
 
 /**
  * Storm Execution Environment Context.
@@ -44,16 +43,16 @@ public class StormEnvironment extends AbstractEnvironment {
         return (StormStreamSource) stream().getSource(streamId,config);
     }
 
-    public MetricStreamPersist getMetricPersist(MetricDefinition metricDefinition, Config config) {
-        return new MetricStreamPersist(metricDefinition, config);
+    public MetricStreamPersist getMetricPersist(MetricDescriptor metricDescriptor, Config config) {
+        return new MetricStreamPersist(metricDescriptor, config);
     }
 
     public EntityStreamPersist getEntityPersist(Config config) {
         return new EntityStreamPersist(config);
     }
 
-    public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
-        return new MetricSchemaGenerator(metricDefinition, config);
+    public MetricSchemaGenerator getMetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) {
+        return new MetricSchemaGenerator(metricDescriptor, config);
     }
 
     public TransformFunctionBolt getTransformer(TransformFunction function) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
index bb29cea..90e6481 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
@@ -24,7 +24,7 @@ import backtype.storm.tuple.Tuple;
 import com.typesafe.config.Config;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.metadata.model.MetricSchemaEntity;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
 import org.apache.eagle.service.client.EagleServiceClientException;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -40,14 +40,14 @@ public class MetricSchemaGenerator extends BaseRichBolt {
     public static final String GENERIC_METRIC_VALUE_NAME = "value";
 
     private final HashSet<String> metricNameCache = new HashSet<>(MAX_CACHE_LENGTH);
-    private final MetricDefinition metricDefinition;
+    private final MetricDescriptor metricDescriptor;
     private final Config config;
 
     private OutputCollector collector;
     private IEagleServiceClient client;
 
-    public MetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
-        this.metricDefinition = metricDefinition;
+    public MetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) {
+        this.metricDescriptor = metricDescriptor;
         this.config = config;
     }
 
@@ -63,7 +63,7 @@ public class MetricSchemaGenerator extends BaseRichBolt {
             String metricName = input.getStringByField(MetricStreamPersist.METRIC_NAME_FIELD);
             synchronized (metricNameCache) {
                 if (!metricNameCache.contains(metricName)) {
-                    createMetricSchemaEntity(metricName, this.metricDefinition);
+                    createMetricSchemaEntity(metricName, (Map) input.getValueByField(MetricStreamPersist.METRIC_EVENT_FIELD),this.metricDescriptor);
                     metricNameCache.add(metricName);
                 }
                 if (metricNameCache.size() > MAX_CACHE_LENGTH) {
@@ -93,14 +93,15 @@ public class MetricSchemaGenerator extends BaseRichBolt {
         }
     }
 
-    private void createMetricSchemaEntity(String metricName, MetricDefinition metricDefinition) throws IOException, EagleServiceClientException {
+    private void createMetricSchemaEntity(String metricName, Map event, MetricDescriptor metricDescriptor) throws IOException, EagleServiceClientException {
         MetricSchemaEntity schemaEntity = new MetricSchemaEntity();
         Map<String, String> schemaTags = new HashMap<>();
         schemaEntity.setTags(schemaTags);
+        schemaTags.put(MetricSchemaEntity.METRIC_SITE_TAG, metricDescriptor.getSiteIdSelector().getSiteId(event));
         schemaTags.put(MetricSchemaEntity.METRIC_NAME_TAG, metricName);
-        schemaTags.put(MetricSchemaEntity.METRIC_TYPE_TAG, metricDefinition.getMetricType());
-        schemaEntity.setGranularityByField(metricDefinition.getGranularity());
-        schemaEntity.setDimensionFields(metricDefinition.getDimensionFields());
+        schemaTags.put(MetricSchemaEntity.METRIC_GROUP_TAG, metricDescriptor.getMetricGroupSelector().getMetricGroup(event));
+        schemaEntity.setGranularityByField(metricDescriptor.getGranularity());
+        schemaEntity.setDimensionFields(metricDescriptor.getDimensionFields());
         schemaEntity.setMetricFields(Collections.singletonList(GENERIC_METRIC_VALUE_NAME));
         schemaEntity.setModifiedTimestamp(System.currentTimeMillis());
         GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(schemaEntity));

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
index ba99911..c9b43e5 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -24,7 +24,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
 import org.apache.eagle.app.utils.StreamConvertHelper;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.log.entity.GenericMetricEntity;
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -44,6 +45,7 @@ import java.util.Map;
 public class MetricStreamPersist extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
     public static final String METRIC_NAME_FIELD = "metricName";
+    public static final String METRIC_EVENT_FIELD = "metricEvent";
 
     private final Config config;
     private final MetricMapper mapper;
@@ -52,9 +54,9 @@ public class MetricStreamPersist extends BaseRichBolt {
     private OutputCollector collector;
     private BatchSender batchSender;
 
-    public MetricStreamPersist(MetricDefinition metricDefinition, Config config) {
+    public MetricStreamPersist(MetricDescriptor metricDescriptor, Config config) {
         this.config = config;
-        this.mapper = new StructuredMetricMapper(metricDefinition);
+        this.mapper = new StructuredMetricMapper(metricDescriptor);
         this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
     }
 
@@ -76,8 +78,10 @@ public class MetricStreamPersist extends BaseRichBolt {
     @Override
     public void execute(Tuple input) {
         GenericMetricEntity metricEntity = null;
+        Map event = null;
         try {
-            metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
+            event = StreamConvertHelper.tupleToEvent(input).f1();
+            metricEntity = this.mapper.map(event);
             if (batchSize <= 1) {
                 GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity));
                 if (!response.isSuccess()) {
@@ -91,8 +95,8 @@ public class MetricStreamPersist extends BaseRichBolt {
             LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);
         } finally {
-            if (metricEntity != null) {
-                collector.emit(Collections.singletonList(metricEntity.getPrefix()));
+            if (metricEntity != null && event != null) {
+                collector.emit(Arrays.asList(metricEntity.getPrefix(), event));
             }
             collector.ack(input);
         }
@@ -100,7 +104,7 @@ public class MetricStreamPersist extends BaseRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(METRIC_NAME_FIELD));
+        declarer.declare(new Fields(METRIC_NAME_FIELD, METRIC_EVENT_FIELD));
     }
 
     @Override
@@ -120,35 +124,35 @@ public class MetricStreamPersist extends BaseRichBolt {
     }
 
     public class StructuredMetricMapper implements MetricMapper {
-        private final MetricDefinition metricDefinition;
+        private final MetricDescriptor metricDescriptor;
 
-        private StructuredMetricMapper(MetricDefinition metricDefinition) {
-            this.metricDefinition = metricDefinition;
+        private StructuredMetricMapper(MetricDescriptor metricDescriptor) {
+            this.metricDescriptor = metricDescriptor;
         }
 
         @Override
         public GenericMetricEntity map(Map event) {
-            String metricName = metricDefinition.getNameSelector().getMetricName(event);
+            String metricName = metricDescriptor.getMetricNameSelector().getMetricName(event);
             Preconditions.checkNotNull(metricName, "Metric name is null");
-            Long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event);
+            Long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event);
             Preconditions.checkNotNull(timestamp, "Timestamp is null");
             Map<String, String> tags = new HashMap<>();
-            for (String dimensionField : metricDefinition.getDimensionFields()) {
+            for (String dimensionField : metricDescriptor.getDimensionFields()) {
                 Preconditions.checkNotNull(dimensionField, "Dimension field name is null");
                 tags.put(dimensionField, (String) event.get(dimensionField));
             }
 
             double[] values;
-            if (event.containsKey(metricDefinition.getValueField())) {
-                values = new double[] {(double) event.get(metricDefinition.getValueField())};
+            if (event.containsKey(metricDescriptor.getValueField())) {
+                values = new double[] {(double) event.get(metricDescriptor.getValueField())};
             } else {
-                LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event);
+                LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event);
                 values = new double[] {0};
             }
 
             GenericMetricEntity entity = new GenericMetricEntity();
             entity.setPrefix(metricName);
-            entity.setTimestamp(DateTimeUtil.roundDown(metricDefinition.getGranularity(), timestamp));
+            entity.setTimestamp(DateTimeUtil.roundDown(metricDescriptor.getGranularity(), timestamp));
             entity.setTags(tags);
             entity.setValue(values);
             return entity;

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java
new file mode 100644
index 0000000..7bad0bd
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.utils;
+
+import com.typesafe.config.Config;
+
+public class AppConfigUtils {
+    public static String getSiteId(Config config) {
+        return config.getString("siteId");
+    }
+
+    public static String getAppId(Config config) {
+        return config.getString("appId");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
index f18932c..3bd5825 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
@@ -30,11 +30,12 @@ import java.util.List;
 @Service(MetricSchemaEntity.METRIC_SCHEMA_SERVICE)
 @JsonIgnoreProperties(ignoreUnknown = true)
 @TimeSeries(false)
-@Tags({"metricName","metricType"})
+@Tags({"site","site","group"})
 public class MetricSchemaEntity extends TaggedLogAPIEntity {
     static final String METRIC_SCHEMA_SERVICE = "MetricSchemaService";
-    public static final String METRIC_NAME_TAG = "metricName";
-    public static final String METRIC_TYPE_TAG = "metricType";
+    public static final String METRIC_NAME_TAG = "name";
+    public static final String METRIC_SITE_TAG = "site";
+    public static final String METRIC_GROUP_TAG = "group";
 
     @Column("a")
     private List<String> dimensionFields;

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index bf1d4df..c3fdb43 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -210,6 +210,7 @@ class KafkaMetricSender(MetricSender):
         self.default_topic = None
         if kafka_config.has_key("default_topic"):
             self.default_topic = kafka_config["default_topic"].encode('utf-8')
+            logging.info("Using default topic: %s" % self.default_topic)
         self.component_topic_mapping = {}
         if kafka_config.has_key("component_topic_mapping"):
             self.component_topic_mapping = kafka_config["component_topic_mapping"]
@@ -266,12 +267,18 @@ class MetricCollector(threading.Thread):
     filters = []
     config = None
     closed = False
+    collected_event_count = 0
+    ignored_event_count = 0
+    emit_event_count = 0
 
     def __init__(self, config=None):
         threading.Thread.__init__(self)
         self.config = None
         self.sender = None
         self.fqdn = socket.getfqdn()
+        self.ignored_event_count = 0
+        self.collected_event_count = 0
+        self.emit_event_count = 0
 
     def init(self, config):
         self.config = config
@@ -296,6 +303,7 @@ class MetricCollector(threading.Thread):
 
     def collect(self, msg, type='float'):
         try:
+            self.collected_event_count = self.collected_event_count + 1
             if not msg.has_key("timestamp"):
                 msg["timestamp"] = int(round(time.time() * 1000))
             if msg.has_key("value") and type == 'float':
@@ -304,21 +312,28 @@ class MetricCollector(threading.Thread):
                 msg["value"] = str(msg["value"])
             if not msg.has_key("host") or len(msg["host"]) == 0:
                 raise Exception("host is null: " + str(msg))
+
             if not msg.has_key("site"):
                 msg["site"] = self.config["env"]["site"]
+
             if len(self.filters) == 0:
+                self.emit_event_count = self.emit_event_count + 1
                 self.sender.send(msg)
                 return
             else:
                 for filter in self.filters:
                     if filter.filter_metric(msg):
+                        self.emit_event_count = self.emit_event_count + 1
                         self.sender.send(msg)
                         return
+                self.ignored_event_count = self.ignored_event_count + 1
         except Exception as e:
             logging.error("Failed to emit metric: %s" % msg)
             logging.exception(e)
 
     def close(self):
+        logging.info("Collected %s events (emitted: %s, ignored: %s)"
+                     % (self.collected_event_count, self.emit_event_count, self.ignored_event_count))
         self.sender.close()
         self.closed = True
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/system_metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_collector.py b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
index e0ffecc..4c95a6c 100644
--- a/eagle-external/hadoop_jmx_collector/system_metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
@@ -17,8 +17,7 @@
 #
 
 from metric_collector import MetricCollector, Runner
-import logging, socket, string, os, re, time
-
+import logging, socket, string, os, re, time, json
 
 class SystemMetricCollector(MetricCollector):
     METRIC_PREFIX = "system"
@@ -86,7 +85,6 @@ class SystemMetricCollector(MetricCollector):
 
         """
 
-        cpu_metric = self.new_metric()
         cpu_info = os.popen('cat /proc/stat').readlines()
         dimensions = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"]
 
@@ -109,6 +107,7 @@ class SystemMetricCollector(MetricCollector):
             metric_event = dict()
             for i in range(1, demens):
                 metric_event[dimensions[i]] = int(items[i])
+                cpu_metric = self.new_metric("system.cpu")
                 cpu_metric['timestamp'] = int(round(time.time() * 1000))
                 cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + dimensions[i]
                 cpu_metric['device'] = items[0]
@@ -123,6 +122,7 @@ class SystemMetricCollector(MetricCollector):
             total_cpu_usage += per_cpu_usage
 
             # system.cpu.usage
+            cpu_metric = self.new_metric("system.cpu")
             cpu_metric['timestamp'] = int(round(time.time() * 1000))
             cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "usage"
             cpu_metric['device'] = items[0]
@@ -141,6 +141,7 @@ class SystemMetricCollector(MetricCollector):
             result = re.split("\s+", cpu_stat_pre.rstrip())
             pre_total_cpu_usage = int(result[0])
             pre_total_cpu = int(result[1])
+
         cpu_metric['timestamp'] = int(round(time.time() * 1000))
         cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "totalusage"
         cpu_metric['device'] = "cpu"
@@ -153,7 +154,7 @@ class SystemMetricCollector(MetricCollector):
     # ====================================
 
     def collect_uptime_metric(self):
-        metric = self.new_metric()
+        metric = self.new_metric("system.os")
         demension = ["uptime.day", "idletime.day"]
         output = os.popen('cat /proc/uptime').readlines()
 
@@ -170,7 +171,7 @@ class SystemMetricCollector(MetricCollector):
     # ====================================
 
     def collect_memory_metric(self):
-        event = self.new_metric()
+        event = self.new_metric("system.memory")
         event["host"] = self.fqdn
         output = os.popen('cat /proc/meminfo').readlines()
         mem_info = dict()
@@ -208,7 +209,7 @@ class SystemMetricCollector(MetricCollector):
             items = re.split("\s+", item.rstrip())
             demens = min(len(demension), len(items))
             for i in range(demens):
-                event = self.new_metric()
+                event = self.new_metric("system.cpu")
                 event["timestamp"] = int(round(time.time() * 1000))
                 event["metric"] = self.METRIC_PREFIX + "." + demension[i]
                 event["value"] = items[i]
@@ -223,7 +224,7 @@ class SystemMetricCollector(MetricCollector):
         output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
         for item in output:
             items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip())
-            event = self.new_metric()
+            event = self.new_metric("System.CPU")
             event["timestamp"] = int(round(time.time() * 1000))
             event["metric"] = DATA_TYPE + "." + 'cpu.temp'
             event["value"] = items[2]
@@ -247,7 +248,7 @@ class SystemMetricCollector(MetricCollector):
             filtered_items = items[1:5] + items[9:13]
 
             for i in range(len(demension)):
-                kafka_dict = self.new_metric()
+                kafka_dict = self.new_metric("system.network")
                 kafka_dict["timestamp"] = int(round(time.time() * 1000))
                 kafka_dict['metric'] = self.METRIC_PREFIX + "." + 'nic.' + demension[i]
                 kafka_dict["value"] = filtered_items[i]
@@ -270,7 +271,7 @@ class SystemMetricCollector(MetricCollector):
                     continue
                 lineitems = re.split("\s+", line)
                 metric = 'smartdisk.' + lineitems[1]
-                kafka_dict = self.new_metric()
+                kafka_dict = self.new_metric("system.disk")
                 kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
                 kafka_dict["timestamp"] = int(round(time.time() * 1000))
                 kafka_dict["value"] = lineitems[-1]
@@ -308,7 +309,7 @@ class SystemMetricCollector(MetricCollector):
         for key, metrics in iostat_dict.iteritems():
             for i in range(len(metrics)):
                 metric = 'disk.' + demension[i]
-                kafka_dict = self.new_metric()
+                kafka_dict = self.new_metric("system.disk")
                 kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
                 kafka_dict["timestamp"] = int(round(time.time() * 1000))
                 kafka_dict["value"] = metrics[i]
@@ -326,9 +327,10 @@ class SystemMetricCollector(MetricCollector):
         event["device"] = device
         self.collect(event)
 
-    def new_metric(self):
+    def new_metric(self, group):
         metric = dict()
         metric["host"] = self.fqdn
+        metric["group"] = group
         return metric
 
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
index 6fcd43b..27ad5bb 100644
--- a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
@@ -1,20 +1,21 @@
 {
   "env": {
     "site": "sandbox",
-    "log_file": "/tmp/hadoop-jmx-collector.log",
     "cpu_stat_file": "/tmp/eagle_cpu_usage_state"
   },
-  "input": [
-  ],
   "filter": {
+    "bean_group_filter": ["hadoop","java.lang","java.nio"],
+    "metric_name_filter": [
+      "system.*"
+    ]
   },
   "output": {
     "kafka": {
-      "debug": false,
+      "debug": true,
       "default_topic": "system_metric_sandbox",
       "broker_list": [
         "sandbox.hortonworks.com:6667"
       ]
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
index 304e500..0c49d82 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
+++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -19,8 +19,10 @@ package org.apache.eagle.metric;
 import backtype.storm.generated.StormTopology;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
+import org.apache.eagle.app.environment.builder.MetricDescriptor.MetricGroupSelector;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.utils.AppConfigUtils;
 
 import java.util.Calendar;
 
@@ -28,14 +30,30 @@ public class HadoopMetricMonitorApp extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         return environment.newApp(config)
-            .fromStream("HADOOP_JMX_METRIC_STREAM")
-            .saveAsMetric(MetricDefinition
-                .metricType("HADOOP_JMX_METRICS")
-                .namedByField("metric")
-                .eventTimeByField("timestamp")
-                .dimensionFields("host","component","site")
-                .granularity(Calendar.MINUTE)
-                .valueField("value"))
-            .toTopology();
+                .fromStream("HADOOP_JMX_METRIC_STREAM")
+                .saveAsMetric(
+                        MetricDescriptor.metricGroupAs((MetricGroupSelector) event -> {
+                            if (event.containsKey("component")) {
+                                return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase());
+                            } else {
+                                return "hadoop.metrics";
+                            }
+                        })
+                        .siteAs(AppConfigUtils.getSiteId(config))
+                        .namedByField("metric")
+                        .eventTimeByField("timestamp")
+                        .dimensionFields("host", "component", "site")
+                        .granularity(Calendar.MINUTE)
+                        .valueField("value"))
+                .fromStream("SYSTEM_METRIC_STREAM")
+                .saveAsMetric(MetricDescriptor.metricGroupByField("group")
+                        .siteAs(AppConfigUtils.getSiteId(config))
+                        .namedByField("metric")
+                        .eventTimeByField("timestamp")
+                        .dimensionFields("host", "group", "site", "device")
+                        .granularity(Calendar.MINUTE)
+                        .valueField("value")
+                )
+                .toTopology();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
index 073c900..51a9257 100644
--- a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+++ b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
@@ -20,7 +20,6 @@
     <type>HADOOP_METRIC_MONITOR</type>
     <name>Hadoop Metrics Monitor</name>
     <configuration>
-        <!-- data fromStream configurations -->
         <property>
             <name>dataSinkConfig.HADOOP_JMX_METRIC_STREAM.topic</name>
             <displayName>JMX Metric Kafka Topic</displayName>
@@ -29,6 +28,12 @@
             <required>true</required>
         </property>
         <property>
+            <name>dataSinkConfig.SYSTEM_METRIC_STREAM.topic</name>
+            <displayName>System Metric Kafka Topic</displayName>
+            <value>system_metric_${siteId}</value>
+            <description>System JMX metric kafka topic name for stream: SYSTEM_METRIC_STREAM</description>
+        </property>
+        <property>
             <name>dataSinkConfig.HADOOP_JMX_RESOURCE_STREAM.topic</name>
             <displayName>JMX Resource Kafka Topic</displayName>
             <value>hadoop_jmx_resource_${siteId}</value>
@@ -134,6 +139,41 @@
             </columns>
         </stream>
         <stream>
+            <streamId>SYSTEM_METRIC_STREAM</streamId>
+            <description>System Metrics Stream including CPU, Network, Disk, etc.</description>
+            <columns>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>metric</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>group</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>device</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>value</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+            </columns>
+        </stream>
+        <stream>
             <streamId>HADOOP_JMX_RESOURCE_STREAM</streamId>
             <description>Hadoop JMX Resource Stream including name node, resource manager, etc.</description>
             <columns>

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java b/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
index e4589b7..e6f1463 100644
--- a/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
+++ b/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java
@@ -16,6 +16,9 @@
  */
 package org.apache.eagle.metric;
 
+import org.junit.Ignore;
+
+@Ignore
 public class HadoopMetricMonitorAppDebug {
     public static void main(String[] args) {
         new HadoopMetricMonitorApp().run(args);

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf b/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
index 4d74666..c864bf4 100644
--- a/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
+++ b/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf
@@ -32,18 +32,14 @@
   "mode" : "LOCAL",
   "siteId" : "testsite",
   "dataSourceConfig": {
-    "topic" : "hadoop_jmx_metric",
+    "HADOOP_JMX_METRIC_STREAM": {
+      "topic": "hadoop_jmx_metric_sandbox",
+    }
+    "SYSTEM_METRIC_STREAM": {
+      "topic": "system_metric_sandbox",
+    }
+    // "topic" : "hadoop_jmx_metric_sandbox",
     "zkConnection" : "localhost:2181",
     "txZkServers" : "localhost:2181"
   }
-  "dataSinkConfig": {
-    "topic" : "hadoop_jmx_metric",
-    "brokerList" : "localhost:6667",
-    "serializerClass" : "kafka.serializer.StringEncoder",
-    "keySerializerClass" : "kafka.serializer.StringEncoder"
-    "producerType" : "async",
-    "numBatchMessages" : "4096",
-    "maxQueueBufferMs" : "5000",
-    "requestRequiredAcks" : "0"
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
index e0f0f15..b310738 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js
@@ -40,7 +40,7 @@
 		$scope.metricList = [$scope.metricName];
 		CompatibleEntity.groups({
 			query: 'MetricSchemaService',
-			groups: 'metricName',
+			groups: 'name',
 			fields: 'count',
 			limit: 9999,
 		})._promise.then(function (res) {