You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/09 10:08:53 UTC
incubator-eagle git commit: [EAGLE-833] Add MetricSchemaGenerator and
MetricSchemaService
Repository: incubator-eagle
Updated Branches:
refs/heads/master aa8d3c9b9 -> 0fd2d7774
[EAGLE-833] Add MetricSchemaGenerator and MetricSchemaService
* Add MetricSchemaService to store metric schema aside GenericMetric
* Add MetricSchemaGenerator to automatically generate metric schema based on metric stream and metric definition.
Author: Hao Chen <ha...@apache.org>
Closes #725 from haoch/AddMetricSchemaService.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/0fd2d777
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/0fd2d777
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/0fd2d777
Branch: refs/heads/master
Commit: 0fd2d7774acba6a33d192e3d81627ec947b626a1
Parents: aa8d3c9
Author: Hao Chen <ha...@apache.org>
Authored: Fri Dec 9 18:08:41 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Fri Dec 9 18:08:41 2016 +0800
----------------------------------------------------------------------
.../environment/builder/ApplicationBuilder.java | 7 +-
.../environment/builder/MetricDefinition.java | 27 +++--
.../app/environment/impl/StormEnvironment.java | 4 +
.../app/messaging/MetricSchemaGenerator.java | 107 +++++++++++++++++++
.../app/messaging/MetricStreamPersist.java | 4 +-
.../org/apache/eagle/common/DateTimeUtil.java | 19 ++++
.../metadata/model/AlertEntityRepository.java | 27 -----
.../model/MetadataEntityRepository.java | 27 +++++
.../metadata/model/MetricSchemaEntity.java | 86 +++++++++++++++
.../eagle/metric/HadoopMetricMonitorApp.java | 4 +-
10 files changed, 275 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
index 83f00db..95cf491 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java
@@ -19,9 +19,11 @@ package org.apache.eagle.app.environment.builder;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.MetricSchemaGenerator;
import org.apache.eagle.app.messaging.MetricStreamPersist;
import org.apache.eagle.app.messaging.StormStreamSource;
@@ -65,7 +67,10 @@ public class ApplicationBuilder {
* Persist source data stream as metric.
*/
public BuilderContext saveAsMetric(MetricDefinition metricDefinition) {
- topologyBuilder.setBolt(generateId("MetricPersist"), new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
+ String metricDataID = generateId("MetricDataSink");
+ String metricSchemaID = generateId("MetricSchemaGenerator");
+ topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId());
+ topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDefinition,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD));
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
index d45ad2b..639d27f 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java
@@ -45,6 +45,8 @@ public class MetricDefinition implements Serializable {
*/
private int granularity = Calendar.MINUTE;
+ private String metricType = "DEFAULT";
+
/**
* Metric value field name.
*/
@@ -90,6 +92,15 @@ public class MetricDefinition implements Serializable {
this.granularity = granularity;
}
+ public String getMetricType() {
+ return metricType;
+ }
+
+ public void setMetricType(String metricType) {
+ this.metricType = metricType;
+ }
+
+
@FunctionalInterface
public interface NameSelector extends Serializable {
String getMetricName(Map event);
@@ -100,10 +111,9 @@ public class MetricDefinition implements Serializable {
Long getTimestamp(Map event);
}
- public static MetricDefinition namedBy(NameSelector nameSelector) {
- MetricDefinition metricDefinition = new MetricDefinition();
- metricDefinition.setNameSelector(nameSelector);
- return metricDefinition;
+ public MetricDefinition namedBy(NameSelector nameSelector) {
+ this.setNameSelector(nameSelector);
+ return this;
}
/**
@@ -114,9 +124,14 @@ public class MetricDefinition implements Serializable {
return this;
}
- public static MetricDefinition namedByField(String nameField) {
+ public MetricDefinition namedByField(String nameField) {
+ this.setNameSelector(new FieldNameSelector(nameField));
+ return this;
+ }
+
+ public static MetricDefinition metricType(String metricType) {
MetricDefinition metricDefinition = new MetricDefinition();
- metricDefinition.setNameSelector(new FieldNameSelector(nameField));
+ metricDefinition.setMetricType(metricType);
return metricDefinition;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index ae52cd0..59c8277 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -48,6 +48,10 @@ public class StormEnvironment extends AbstractEnvironment {
return new MetricStreamPersist(metricDefinition, config);
}
+ public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
+ return new MetricSchemaGenerator(metricDefinition, config);
+ }
+
public TransformFunctionBolt getTransformer(TransformFunction function) {
return new TransformFunctionBolt(function);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
new file mode 100644
index 0000000..6563cd7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.MetricSchemaEntity;
+import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.impl.BatchSender;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class MetricSchemaGenerator extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricSchemaGenerator.class);
+ private static int MAX_CACHE_LENGTH = 1000;
+ public static final String GENERIC_METRIC_VALUE_NAME = "value";
+
+ private final HashSet<String> metricNameCache = new HashSet<>(MAX_CACHE_LENGTH);
+ private final MetricDefinition metricDefinition;
+ private final Config config;
+
+ private OutputCollector collector;
+ private BatchSender client;
+
+ public MetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
+ this.metricDefinition = metricDefinition;
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.client = new EagleServiceClientImpl(config).batch(100);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ try {
+ String metricName = input.getStringByField(MetricStreamPersist.METRIC_NAME_FIELD);
+ synchronized (metricNameCache) {
+ if (!metricNameCache.contains(metricName)) {
+ createMetricSchemaEntity(metricName, this.metricDefinition);
+ metricNameCache.add(metricName);
+ }
+ if (metricNameCache.size() > MAX_CACHE_LENGTH) {
+ this.metricNameCache.clear();
+ }
+ }
+ this.collector.ack(input);
+ } catch (Throwable throwable) {
+ LOG.warn(throwable.getMessage(), throwable);
+ this.collector.reportError(throwable);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void cleanup() {
+ if (this.client != null) {
+ try {
+ this.client.close();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ private void createMetricSchemaEntity(String metricName, MetricDefinition metricDefinition) throws IOException, EagleServiceClientException {
+ MetricSchemaEntity schemaEntity = new MetricSchemaEntity();
+ Map<String,String> schemaTags = new HashMap<>();
+ schemaEntity.setTags(schemaTags);
+ schemaTags.put(MetricSchemaEntity.METRIC_NAME_TAG, metricName);
+ schemaTags.put(MetricSchemaEntity.METRIC_TYPE_TAG, metricDefinition.getMetricType());
+ schemaEntity.setGranularityByField(metricDefinition.getGranularity());
+ schemaEntity.setDimensionFields(metricDefinition.getDimensionFields());
+ schemaEntity.setMetricFields(Collections.singletonList(GENERIC_METRIC_VALUE_NAME));
+ schemaEntity.setModifiedTimestamp(System.currentTimeMillis());
+ this.client.send(Collections.singletonList(schemaEntity));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
index 2a1d8a8..d656827 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -43,6 +43,7 @@ import java.util.Map;
public class MetricStreamPersist extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class);
+ public static final String METRIC_NAME_FIELD = "metricName";
private final Config config;
private final MetricMapper mapper;
@@ -82,6 +83,7 @@ public class MetricStreamPersist extends BaseRichBolt {
LOG.error("Service side error: {}", response.getException());
collector.reportError(new IllegalStateException(response.getException()));
} else {
+ collector.emit(Collections.singletonList(metricEntity.getPrefix()));
collector.ack(input);
}
} else {
@@ -96,7 +98,7 @@ public class MetricStreamPersist extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("f1"));
+ declarer.declare(new Fields(METRIC_NAME_FIELD));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
index e7ef3f8..a69feda 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
@@ -142,6 +142,25 @@ public class DateTimeUtil {
}
}
+ public static String getCalendarFieldName(int field) {
+ switch (field) {
+ case Calendar.DAY_OF_MONTH:
+ return "DAY_OF_MONTH";
+ case Calendar.DAY_OF_WEEK:
+ return "DAY_OF_WEEK";
+ case Calendar.DAY_OF_YEAR:
+ return "DAY_OF_YEAR";
+ case Calendar.HOUR:
+ return "HOUR";
+ case Calendar.MINUTE:
+ return "MINUTE";
+ case Calendar.SECOND:
+ return "SECOND";
+ default:
+ throw new IllegalArgumentException("Unknown field code: " + field);
+ }
+ }
+
public static String format(long milliseconds, String format) {
SimpleDateFormat sdf = new SimpleDateFormat(format);
sdf.setTimeZone(CURRENT_TIME_ZONE);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java
deleted file mode 100644
index c8219b6..0000000
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntityRepository.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.metadata.model;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class AlertEntityRepository extends EntityRepository {
-
- public AlertEntityRepository() {
- registerEntity(AlertEntity.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java
new file mode 100644
index 0000000..f13e157
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetadataEntityRepository.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.metadata.model;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class MetadataEntityRepository extends EntityRepository {
+ public MetadataEntityRepository() {
+ registerEntity(AlertEntity.class);
+ registerEntity(MetricSchemaEntity.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
new file mode 100644
index 0000000..b64b1f6
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.model;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.List;
+
+@Table("eagle_metric")
+@ColumnFamily("f")
+@Prefix("eagle_metric_schema")
+@Service(MetricSchemaEntity.METRIC_SCHEMA_SERVICE)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"metricName","metricType"})
+public class MetricSchemaEntity extends TaggedLogAPIEntity {
+ static final String METRIC_SCHEMA_SERVICE = "MetricSchemaService";
+ public static final String METRIC_NAME_TAG = "metricName";
+ public static final String METRIC_TYPE_TAG = "metricType";
+
+ @Column("a")
+ private List<String> dimensionFields;
+ @Column("b")
+ private List<String> metricFields;
+ @Column("c")
+ private String granularity;
+ @Column("d")
+ private Long modifiedTimestamp;
+
+ public List<String> getDimensionFields() {
+ return dimensionFields;
+ }
+
+ public void setDimensionFields(List<String> dimensionFields) {
+ this.dimensionFields = dimensionFields;
+ this.valueChanged("dimensionFields");
+ }
+
+ public List<String> getMetricFields() {
+ return metricFields;
+ }
+
+ public void setMetricFields(List<String> metricFields) {
+ this.metricFields = metricFields;
+ this.valueChanged("metricFields");
+ }
+
+ public String getGranularity() {
+ return granularity;
+ }
+
+ public void setGranularity(String granularity) {
+ this.granularity = granularity;
+ this.valueChanged("granularity");
+ }
+
+ public void setGranularityByField(int granularity) {
+ setGranularity(DateTimeUtil.getCalendarFieldName(granularity));
+ }
+
+ public Long getModifiedTimestamp() {
+ return modifiedTimestamp;
+ }
+
+ public void setModifiedTimestamp(Long modifiedTimestamp) {
+ this.modifiedTimestamp = modifiedTimestamp;
+ this.valueChanged("modifiedTimestamp");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0fd2d777/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
index 323e5fe..5aa27a3 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java
@@ -19,9 +19,8 @@ package org.apache.eagle.metric;
import backtype.storm.generated.StormTopology;
import com.typesafe.config.Config;
import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.builder.CEPFunction;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
import org.apache.eagle.app.environment.builder.MetricDefinition;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
import java.util.Calendar;
@@ -31,6 +30,7 @@ public class HadoopMetricMonitorApp extends StormApplication {
return environment.newApp(config)
.fromStream("HADOOP_JMX_METRIC_STREAM")
.saveAsMetric(MetricDefinition
+ .metricType("HADOOP_JMX_METRICS")
.namedByField("metric")
.eventTimeByField("timestamp")
.dimensionFields("host","component","site")