You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/09 10:08:53 UTC

incubator-eagle git commit: [EAGLE-833] Add MetricSchemaGenerator and MetricSchemaService

Repository: incubator-eagle
Updated Branches:
  refs/heads/master aa8d3c9b9 -> 0fd2d7774


[EAGLE-833] Add MetricSchemaGenerator and MetricSchemaService

* Add MetricSchemaService to store metric schema aside GenericMetric
* Add MetricSchemaGenerator to automatically generate metric schema based on metric stream and metric definition.

Author: Hao Chen <ha...@apache.org>

Closes #725 from haoch/AddMetricSchemaService.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0fd2d777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0fd2d777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0fd2d777

Branch: refs/heads/master
Commit: 0fd2d7774acba6a33d192e3d81627ec947b626a1
Parents: aa8d3c9
Author: Hao Chen <ha...@apache.org>
Authored: Fri Dec 9 18:08:41 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Fri Dec 9 18:08:41 2016 +0800

----------------------------------------------------------------------
 .../environment/builder/ApplicationBuilder.java |   7 +-
 .../environment/builder/MetricDefinition.java   |  27 +++--
 .../app/environment/impl/StormEnvironment.java  |   4 +
 .../app/messaging/MetricSchemaGenerator.java    | 107 +++++++++++++++++++
 .../app/messaging/MetricStreamPersist.java      |   4 +-
 .../org/apache/eagle/common/DateTimeUtil.java   |  19 ++++
 .../metadata/model/AlertEntityRepository.java   |  27 -----
 .../model/MetadataEntityRepository.java         |  27 +++++
 .../metadata/model/MetricSchemaEntity.java      |  86 +++++++++++++++
 .../eagle/metric/HadoopMetricMonitorApp.java    |   4 +-
 10 files changed, 275 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
index 83f00db..95cf491 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
@@ -19,9 +19,11 @@ package org.apache.eagle.app.environment.builder;
 
 import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.MetricSchemaGenerator;
 import org.apache.eagle.app.messaging.MetricStreamPersist;
 import org.apache.eagle.app.messaging.StormStreamSource;
 
@@ -65,7 +67,10 @@ public class ApplicationBuilder {
          * Persist source data stream as metric.
          */
         public BuilderContext saveAsMetric(MetricDefinition metricDefinition) {
-            topologyBuilder.setBolt(generateId("MetricPersist"), new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
+            String metricDataID = generateId("MetricDataSink");
+            String metricSchemaID = generateId("MetricSchemaGenerator");
+            topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
+            topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDefinition,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD));
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
index d45ad2b..639d27f 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
@@ -45,6 +45,8 @@ public class MetricDefinition implements Serializable {
      */
     private int granularity = Calendar.MINUTE;
 
+    private String metricType = "DEFAULT";
+
     /**
      * Metric value field name.
      */
@@ -90,6 +92,15 @@ public class MetricDefinition implements Serializable {
         this.granularity = granularity;
     }
 
+    public String getMetricType() {
+        return metricType;
+    }
+
+    public void setMetricType(String metricType) {
+        this.metricType = metricType;
+    }
+
+
     @FunctionalInterface
     public interface NameSelector extends Serializable {
         String getMetricName(Map event);
@@ -100,10 +111,9 @@ public class MetricDefinition implements Serializable {
         Long getTimestamp(Map event);
     }
 
-    public static MetricDefinition namedBy(NameSelector nameSelector) {
-        MetricDefinition metricDefinition = new MetricDefinition();
-        metricDefinition.setNameSelector(nameSelector);
-        return metricDefinition;
+    public MetricDefinition namedBy(NameSelector nameSelector) {
+        this.setNameSelector(nameSelector);
+        return this;
     }
 
     /**
@@ -114,9 +124,14 @@ public class MetricDefinition implements Serializable {
         return this;
     }
 
-    public static MetricDefinition namedByField(String nameField) {
+    public MetricDefinition namedByField(String nameField) {
+        this.setNameSelector(new FieldNameSelector(nameField));
+        return this;
+    }
+
+    public static MetricDefinition metricType(String metricType) {
         MetricDefinition metricDefinition = new MetricDefinition();
-        metricDefinition.setNameSelector(new FieldNameSelector(nameField));
+        metricDefinition.setMetricType(metricType);
         return metricDefinition;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index ae52cd0..59c8277 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -48,6 +48,10 @@ public class StormEnvironment extends AbstractEnvironment {
         return new MetricStreamPersist(metricDefinition, config);
     }
 
+    public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
+        return new MetricSchemaGenerator(metricDefinition, config);
+    }
+
     public TransformFunctionBolt getTransformer(TransformFunction function) {
         return new TransformFunctionBolt(function);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
new file mode 100644
index 0000000..6563cd7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.MetricSchemaEntity;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.impl.BatchSender;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class MetricSchemaGenerator extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(MetricSchemaGenerator.class);
+    private static int MAX_CACHE_LENGTH = 1000;
+    public static final String GENERIC_METRIC_VALUE_NAME = "value";
+
+    private final HashSet<String> metricNameCache = new HashSet<>(MAX_CACHE_LENGTH);
+    private final MetricDefinition metricDefinition;
+    private final Config config;
+
+    private OutputCollector collector;
+    private BatchSender client;
+
+    public MetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
+        this.metricDefinition = metricDefinition;
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.client = new EagleServiceClientImpl(config).batch(100);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            String metricName = input.getStringByField(MetricStreamPersist.METRIC_NAME_FIELD);
+            synchronized (metricNameCache) {
+                if (!metricNameCache.contains(metricName)) {
+                    createMetricSchemaEntity(metricName, this.metricDefinition);
+                    metricNameCache.add(metricName);
+                }
+                if (metricNameCache.size() > MAX_CACHE_LENGTH) {
+                    this.metricNameCache.clear();
+                }
+            }
+            this.collector.ack(input);
+        } catch (Throwable throwable) {
+            LOG.warn(throwable.getMessage(), throwable);
+            this.collector.reportError(throwable);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        if (this.client != null) {
+            try {
+                this.client.close();
+            } catch (IOException e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+    }
+
+    private void createMetricSchemaEntity(String metricName, MetricDefinition metricDefinition) throws IOException, EagleServiceClientException {
+        MetricSchemaEntity schemaEntity = new MetricSchemaEntity();
+        Map<String,String> schemaTags = new HashMap<>();
+        schemaEntity.setTags(schemaTags);
+        schemaTags.put(MetricSchemaEntity.METRIC_NAME_TAG, metricName);
+        schemaTags.put(MetricSchemaEntity.METRIC_TYPE_TAG, metricDefinition.getMetricType());
+        schemaEntity.setGranularityByField(metricDefinition.getGranularity());
+        schemaEntity.setDimensionFields(metricDefinition.getDimensionFields());
+        schemaEntity.setMetricFields(Collections.singletonList(GENERIC_METRIC_VALUE_NAME));
+        schemaEntity.setModifiedTimestamp(System.currentTimeMillis());
+        this.client.send(Collections.singletonList(schemaEntity));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
index 2a1d8a8..d656827 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -43,6 +43,7 @@ import java.util.Map;
 
 public class MetricStreamPersist extends BaseRichBolt  {
     private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
+    public static final String METRIC_NAME_FIELD = "metricName";
 
     private final Config config;
     private final MetricMapper mapper;
@@ -82,6 +83,7 @@ public class MetricStreamPersist extends BaseRichBolt  {
                     LOG.error("Service side error: {}", response.getException());
                     collector.reportError(new IllegalStateException(response.getException()));
                 } else {
+                    collector.emit(Collections.singletonList(metricEntity.getPrefix()));
                     collector.ack(input);
                 }
             } else {
@@ -96,7 +98,7 @@ public class MetricStreamPersist extends BaseRichBolt  {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("f1"));
+        declarer.declare(new Fields(METRIC_NAME_FIELD));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
index e7ef3f8..a69feda 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
@@ -142,6 +142,25 @@ public class DateTimeUtil {
         }
     }
 
+    public static String getCalendarFieldName(int field) {
+        switch (field) {
+            case Calendar.DAY_OF_MONTH:
+                return "DAY_OF_MONTH";
+            case Calendar.DAY_OF_WEEK:
+                return "DAY_OF_WEEK";
+            case Calendar.DAY_OF_YEAR:
+                return "DAY_OF_YEAR";
+            case Calendar.HOUR:
+                return "HOUR";
+            case Calendar.MINUTE:
+                return "MINUTE";
+            case Calendar.SECOND:
+                return "SECOND";
+            default:
+                throw new IllegalArgumentException("Unknown field code: " + field);
+        }
+    }
+
     public static String format(long milliseconds, String format) {
         SimpleDateFormat sdf = new SimpleDateFormat(format);
         sdf.setTimeZone(CURRENT_TIME_ZONE);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java
deleted file mode 100644
index c8219b6..0000000
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.metadata.model;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class AlertEntityRepository extends EntityRepository {
-
-    public AlertEntityRepository() {
-        registerEntity(AlertEntity.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java
new file mode 100644
index 0000000..f13e157
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.model;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class MetadataEntityRepository extends EntityRepository {
+    public MetadataEntityRepository() {
+        registerEntity(AlertEntity.class);
+        registerEntity(MetricSchemaEntity.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
new file mode 100644
index 0000000..b64b1f6
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.model;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.List;
+
+@Table("eagle_metric")
+@ColumnFamily("f")
+@Prefix("eagle_metric_schema")
+@Service(MetricSchemaEntity.METRIC_SCHEMA_SERVICE)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"metricName","metricType"})
+public class MetricSchemaEntity extends TaggedLogAPIEntity {
+    static final String METRIC_SCHEMA_SERVICE = "MetricSchemaService";
+    public static final String METRIC_NAME_TAG = "metricName";
+    public static final String METRIC_TYPE_TAG = "metricType";
+
+    @Column("a")
+    private List<String> dimensionFields;
+    @Column("b")
+    private List<String> metricFields;
+    @Column("c")
+    private String granularity;
+    @Column("d")
+    private Long modifiedTimestamp;
+
+    public List<String> getDimensionFields() {
+        return dimensionFields;
+    }
+
+    public void setDimensionFields(List<String> dimensionFields) {
+        this.dimensionFields = dimensionFields;
+        this.valueChanged("dimensionFields");
+    }
+
+    public List<String> getMetricFields() {
+        return metricFields;
+    }
+
+    public void setMetricFields(List<String> metricFields) {
+        this.metricFields = metricFields;
+        this.valueChanged("metricFields");
+    }
+
+    public String getGranularity() {
+        return granularity;
+    }
+
+    public void setGranularity(String granularity) {
+        this.granularity = granularity;
+        this.valueChanged("granularity");
+    }
+
+    public void setGranularityByField(int granularity) {
+        setGranularity(DateTimeUtil.getCalendarFieldName(granularity));
+    }
+
+    public Long getModifiedTimestamp() {
+        return modifiedTimestamp;
+    }
+
+    public void setModifiedTimestamp(Long modifiedTimestamp) {
+        this.modifiedTimestamp = modifiedTimestamp;
+        this.valueChanged("modifiedTimestamp");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
index 323e5fe..5aa27a3 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -19,9 +19,8 @@ package org.apache.eagle.metric;
 import backtype.storm.generated.StormTopology;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.builder.CEPFunction;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
 
 import java.util.Calendar;
 
@@ -31,6 +30,7 @@ public class HadoopMetricMonitorApp extends StormApplication {
         return environment.newApp(config)
             .fromStream("HADOOP_JMX_METRIC_STREAM")
             .saveAsMetric(MetricDefinition
+                .metricType("HADOOP_JMX_METRICS")
                 .namedByField("metric")
                 .eventTimeByField("timestamp")
                 .dimensionFields("host","component","site")