You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by li...@apache.org on 2015/12/20 16:47:38 UTC

incubator-eagle git commit: EAGLE-75 Integrate dropwizard metric framework

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 0c583e5d9 -> 628e87537


EAGLE-75 Integrate dropwizard metric framework

https://issues.apache.org/jira/browse/EAGLE-75

- Integrate dropwizard metric framework
- Delete old metric classes

Author: @sunlibin <li...@apache.org>
Reviewer: @RalphSu <su...@gmail.com>, @haoch <ha...@apache.org>

Closes #28


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/628e8753
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/628e8753
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/628e8753

Branch: refs/heads/master
Commit: 628e8753759294d3dfb5a0295493a1f2ef264631
Parents: 0c583e5
Author: sunlibin <ab...@gmail.com>
Authored: Sun Dec 20 23:19:48 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Sun Dec 20 23:43:41 2015 +0800

----------------------------------------------------------------------
 .../apache/eagle/executor/AlertExecutor.java    | 111 ++++++-------------
 .../eagle/alert/cep/TestSiddhiEvaluator.java    |   3 -
 eagle-core/eagle-metric/pom.xml                 |   5 +
 .../org/apache/eagle/metric/CountingMetric.java |  48 --------
 .../java/org/apache/eagle/metric/Metric.java    |  88 ---------------
 .../org/apache/eagle/metric/MetricOperator.java |  22 ----
 .../manager/EagleMetricReportManager.java       |  61 ----------
 .../metric/report/EagleServiceMetricReport.java |  60 ----------
 .../metric/report/MetricEntityConvert.java      |  32 ------
 .../eagle/metric/report/MetricReport.java       |  26 -----
 .../metric/reportor/EagleCounterMetric.java     |  73 ++++++++++++
 .../eagle/metric/reportor/EagleGaugeMetric.java |  45 ++++++++
 .../eagle/metric/reportor/EagleMetric.java      |  64 +++++++++++
 .../eagle/metric/reportor/EagleMetricKey.java   |  28 +++++
 .../metric/reportor/EagleMetricListener.java    |  31 ++++++
 .../EagleServiceReporterMetricListener.java     |  63 +++++++++++
 .../eagle/metric/reportor/IEagleMetric.java     |  40 +++++++
 .../metric/reportor/MetricEntityAdaptor.java    |  50 +++++++++
 .../metric/reportor/MetricKeyCodeDecoder.java   |  64 +++++++++++
 .../eagle/service/client/ServiceConfig.java     |  11 ++
 .../metric/kafka/KafkaLatestOffsetFetcher.java  |  14 +--
 .../kafka/KafkaMessageDistributionExecutor.java |  93 ++++++----------
 .../eagle/metric/kafka/KafkaOffsetSpout.java    |  33 +++---
 .../src/main/resources/application.conf         |   5 +-
 .../partition/DataDistributionDaoImpl.java      |   2 +-
 25 files changed, 571 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
index 7e4372c..8b928c3 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -16,38 +16,38 @@
  */
 package org.apache.eagle.executor;
 
+import com.codahale.metrics.MetricRegistry;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.alert.common.AlertConstants;
 import org.apache.eagle.alert.config.AbstractPolicyDefinition;
 import org.apache.eagle.alert.dao.AlertDefinitionDAO;
 import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.datastream.Tuple2;
-import org.apache.eagle.metric.CountingMetric;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.report.EagleServiceMetricReport;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
 import org.apache.eagle.alert.policy.*;
 import org.apache.eagle.alert.siddhi.EagleAlertContext;
 import org.apache.eagle.alert.siddhi.SiddhiAlertHandler;
 import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.commons.lang3.time.DateUtils;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.metric.reportor.EagleCounterMetric;
+import org.apache.eagle.metric.reportor.EagleMetricListener;
+import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
+import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.management.ManagementFactory;
 import java.util.HashMap;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler {
 	private static final long serialVersionUID = 1L;
@@ -68,12 +68,11 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 	private static String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count";
 	private static String EAGLE_ALERT_COUNT = "eagle.alert.count";
 	private static String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
-	private	 static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
-	private Map<String, Metric> metricMap; // metricMap's key = metricName[#policyId]
+	private	static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
 	private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
 	private Map<String, String> baseDimensions;
-	private Thread metricReportThread;
-	private EagleServiceMetricReport metricReport;
+	private MetricRegistry registry;
+	private EagleMetricListener listener;
 
 	public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
                          AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){
@@ -88,7 +87,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 	public String getAlertExecutorId(){
 		return this.alertExecutorId;
 	}
-	
+
 	public int getNumPartitions() {
 		return this.numPartitions;
 	}
@@ -115,32 +114,25 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 	}
 	
 	public void initMetricReportor() {
-		String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-		int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
 
 		String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
 				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
 		String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
 				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
-		this.metricReport = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
 
-		metricMap = new ConcurrentHashMap<String, Metric>();
-		baseDimensions = new HashMap<String, String>();
+		//TODO: need to it replace it with batch flush listener
+		registry = new MetricRegistry();
+		listener = new EagleServiceReporterMetricListener(host, port, username, password);
+
+		baseDimensions = new HashMap<>();
 		baseDimensions.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId);
 		baseDimensions.put(AlertConstants.PARTITIONSEQ, String.valueOf(partitionSeq));
 		baseDimensions.put(AlertConstants.SOURCE, ManagementFactory.getRuntimeMXBean().getName());
 		baseDimensions.put(EagleConfigConstants.DATA_SOURCE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE));
 		baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE));
-
-		dimensionsMap = new HashMap<String, Map<String, String>>();
-		this.metricReportThread = new Thread() {
-			@Override
-			public void run() {
-				runMetricReporter();
-			}
-		};
-		this.metricReportThread.setDaemon(true);
-		metricReportThread.start();
+		dimensionsMap = new HashMap<>();
 	}
 
 	@Override
@@ -246,44 +238,17 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 		return value / granularity * granularity;
 	}
 
-	public void runMetricReporter() {
-		while(true) {
-			try {
-				long current = System.currentTimeMillis();
-				List<Metric> metricList = new ArrayList<Metric>();
-				synchronized (this.metricMap) {
-					for (Entry<String, Metric> entry : metricMap.entrySet()) {
-						String name = entry.getKey();
-						Metric metric = entry.getValue();
-						long previous = metric.getTimestamp();
-						if (current > previous + MERITE_GRANULARITY) {
-							metricList.add(metric);
-							metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDimensions(), metric.getMetricName()));
-						}
-					}
-				}
-				if (metricList.size() > 0) {
-					LOG.info("Going to persist alert metrics, size: " + metricList.size());
-					metricReport.emit(metricList);
-				}
-				try {
-					Thread.sleep(MERITE_GRANULARITY / 2);
-					} catch (InterruptedException ex) { /* Do nothing */ }
-				}
-			catch (Throwable t) {
-				LOG.error("Got a throwable in metricReporter " , t);
-			}
-		}
-	}
-
 	public void updateCounter(String name, Map<String, String> dimensions, double value) {
 		long current = System.currentTimeMillis();
-		synchronized (metricMap) {
-			if (metricMap.get(name) == null) {
-				String metricName = name.split("#")[0];
-				metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), dimensions, metricName));
-			}
-			metricMap.get(name).update(value);
+		String metricName = MetricKeyCodeDecoder.codeMetricKey(name, dimensions);
+		if (registry.getMetrics().get(metricName) == null) {
+			EagleCounterMetric metric = new EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY);
+			metric.registerListener(listener);
+			registry.register(metricName, metric);
+		} else {
+			EagleCounterMetric metric = (EagleCounterMetric) registry.getMetrics().get(metricName);
+			metric.update(value, current);
+			//TODO: need remove unused metric from registry
 		}
 	}
 	
@@ -299,10 +264,6 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 		}
 		return dimensionsMap.get(policyId);
 	}
-	
-	public String getMetricKey(String metricName, String policy) {
-		return metricName + "#" + policy;
-	}
 
     /**
      * within this single executor, execute all PolicyEvaluator sequentially
@@ -325,7 +286,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
                 for(Entry<String, PolicyEvaluator> entry : policyEvaluators.entrySet()){
                     String policyId = entry.getKey();
                     PolicyEvaluator evaluator = entry.getValue();
-                    updateCounter(getMetricKey(EAGLE_POLICY_EVAL_COUNT, policyId), getDimensions(policyId));
+                    updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId));
                     try {
                         EagleAlertContext siddhiAlertContext = new EagleAlertContext();
                         siddhiAlertContext.alertExecutor = this;
@@ -336,7 +297,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
                     }
                     catch (Exception ex) {
                         LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
-                        updateCounter(getMetricKey(EAGLE_POLICY_EVAL_FAIL_COUNT, policyId), getDimensions(policyId));
+                        updateCounter(EAGLE_POLICY_EVAL_FAIL_COUNT, getDimensions(policyId));
                     }
                 }
             }
@@ -400,7 +361,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
             LOG.info(String.format("Detected %s alerts for policy %s",alerts.size(),policyId));
 			Collector outputCollector = context.outputCollector;
 			PolicyEvaluator evaluator = context.evaluator;
-			updateCounter(getMetricKey(EAGLE_ALERT_COUNT, policyId), getDimensions(policyId), alerts.size());
+			updateCounter(EAGLE_ALERT_COUNT, getDimensions(policyId), alerts.size());
 			for (AlertAPIEntity entity : alerts) {
 				synchronized(this) {
 					outputCollector.collect(new Tuple2(policyId, entity));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
index 47381d1..c1b4185 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -107,9 +107,6 @@ public class TestSiddhiEvaluator {
 			public Map<String, String> getDimensions(String policyId) {
 				return new HashMap<String, String>();
 			}
-
-			@Override
-			public void runMetricReporter() {}
 		};
 		context.alertExecutor.prepareConfig(config);
 		context.alertExecutor.init();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/pom.xml b/eagle-core/eagle-metric/pom.xml
index d1fc1d4..c12a2b9 100644
--- a/eagle-core/eagle-metric/pom.xml
+++ b/eagle-core/eagle-metric/pom.xml
@@ -32,6 +32,11 @@
 
 	<dependencies>
 		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+			<version>3.1.2</version>
+		</dependency>
+		<dependency>
 			<groupId>eagle</groupId>
 			<artifactId>eagle-entity-base</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
deleted file mode 100644
index 4f65b8e..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-/**
- */
-public class CountingMetric extends Metric{
-
-    public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, double value) {
-        super(timestamp, dimensions, metricName, new AtomicDouble(value));
-    }
-
-    public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
-    	super(timestamp, dimensions, metricName, value);
-    }
-
-    public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName) {
-	   this(timestamp, dimensions, metricName, new AtomicDouble(0.0));
-    }
-
-    public CountingMetric(CountingMetric metric) {
-        this(metric.timestamp, new HashMap<>(metric.dimensions), metric.metricName, metric.value);
-    }
-
-    @Override
-    public double update(double delta) {
-    	return value.addAndGet(delta);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
deleted file mode 100644
index 616c82b..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-/**
- */
-public abstract class Metric implements MetricOperator{
-
-	protected final long timestamp;
-    protected final Map<String, String> dimensions;
-    protected final String metricName;	
-    protected final AtomicDouble value;
-    
-    public Metric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
-       this.timestamp = timestamp;
-       this.dimensions = new HashMap<>(dimensions);
-       this.metricName = metricName;
-	   this.value = value;
-    }
-  
-    public Metric(long timestamp, Map<String, String> dimensions, String metricName) {
-	   this(timestamp, dimensions,metricName, new AtomicDouble(0.0));
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-     }
-
-     public Map<String, String> getDimensions() {
-        return dimensions;
-     }
-   
-     public String getMetricName() {
- 	   return metricName;
-     }
-    
-    public AtomicDouble getValue() {
-       return value;
-    }
-  
-    @Override
-    public int hashCode() {
-	   int hashCode = (int) (timestamp % Integer.MAX_VALUE);
-	   for (Entry<String, String> entry : dimensions.entrySet()) {
-         String key = entry.getKey();
-	     String value = entry.getValue();
-	     hashCode ^= key.hashCode() ^ value.hashCode();
-	   }
-	   return hashCode;	 
-    }
-  
-    @Override
-    public boolean equals(Object obj) {
-	   if (obj instanceof Metric) {
-		   Metric event = (Metric) obj;
-		   if (dimensions.size() != event.dimensions.size()) return false;
-		   for (Entry<String, String> keyValue : event.dimensions.entrySet()) {
-		       boolean keyExist = dimensions.containsKey(keyValue.getKey());
-			    if ( !keyExist || !dimensions.get(keyValue.getKey()).equals(keyValue.getValue())) {				
-				    return false;
-				}
-	       }
-		   if (timestamp != event.timestamp) return false;
-		     return true;
-	   }
-	   return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java
deleted file mode 100644
index 2059ea4..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricOperator.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric;
-
-public interface MetricOperator {
-
-	double update(double value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
deleted file mode 100644
index 153159c..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.manager;
-
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.report.MetricReport;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class EagleMetricReportManager {
-
-    private static EagleMetricReportManager manager = new EagleMetricReportManager();
-    private Map<String, MetricReport> metricReportMap = new ConcurrentHashMap<>();
-
-    private EagleMetricReportManager() {
-
-    }
-
-    public static EagleMetricReportManager getInstance () {
-        return manager;
-    }
-
-    public boolean register(String name, MetricReport report) {
-       if (metricReportMap.get(name) == null) {
-           synchronized (metricReportMap) {
-               if (metricReportMap.get(name) == null) {
-                   metricReportMap.put(name, report);
-                   return true;
-               }
-            }
-        }
-        return false;
-    }
-
-    public Map<String, MetricReport> getRegisteredReports() {
-        return metricReportMap;
-    }
-
-    public void emit(List<Metric> list) {
-        synchronized (this.metricReportMap) {
-            for (MetricReport report : metricReportMap.values()) {
-                report.emit(list);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
deleted file mode 100644
index 7ff415e..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class EagleServiceMetricReport implements MetricReport{
-		
-    private EagleServiceClientImpl client;
-	private static final Logger LOG = LoggerFactory.getLogger(EagleServiceMetricReport.class);
-
-	public EagleServiceMetricReport(String host, int port, String username, String password) {
-		client = new EagleServiceClientImpl(host, port, username, password);
-	}
-
-    public EagleServiceMetricReport(String host, int port) {
-    	client = new EagleServiceClientImpl(host, port, null, null);
-    }
-
-	public void emit(List<Metric> list) {
-		List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>();
-		for (Metric metric : list) {
-			entities.add(MetricEntityConvert.convert(metric));
-		}
-		try {
-			int total = entities.size();
-			GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
-            if(response.isSuccess()) {
-                LOG.info("Wrote " + total + " entities to service");
-            }else{
-                LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
-            }
-		}
-		catch (Exception ex) {
-            LOG.error("Got exception while writing entities: ", ex);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
deleted file mode 100644
index c389fa7..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.metric.Metric;
-
-public class MetricEntityConvert {
-
-	public static GenericMetricEntity convert(Metric metric) {
-        GenericMetricEntity entity = new GenericMetricEntity();
-        entity.setPrefix(metric.getMetricName());
-        entity.setValue(new double[]{metric.getValue().get()});
-        entity.setTags(metric.getDimensions());
-        entity.setTimestamp(metric.getTimestamp());
-        return entity;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
deleted file mode 100644
index 85d423b..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import java.util.List;
-
-import org.apache.eagle.metric.Metric;
-
-public interface MetricReport {
-	// The method should be thread safe
-	void emit(List<Metric> list);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java
new file mode 100644
index 0000000..0a7f70e
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleCounterMetric.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class EagleCounterMetric extends EagleMetric {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EagleCounterMetric.class);
+
+    public EagleCounterMetric(long latestUserTimeClock, String name, double value, long granularity) {
+        super(latestUserTimeClock, name, value, granularity);
+    }
+
+    public EagleCounterMetric(EagleCounterMetric metric) {
+        super(metric);
+    }
+
+    public long trim(long latestUserTimeClock) {
+        return latestUserTimeClock / granularity * granularity;
+    }
+
+    public void flush(long latestUserTimeClock) {
+        for (EagleMetricListener listener : metricListeners) {
+            EagleCounterMetric newEagleMetric = new EagleCounterMetric(this);
+            newEagleMetric.name = MetricKeyCodeDecoder.addTimestampToMetricKey(trim(latestUserTimeClock), newEagleMetric.name);
+            listener.onMetricFlushed(Arrays.asList((EagleMetric)newEagleMetric));
+        }
+    }
+
+    public boolean checkIfNeedFlush(long currentUserTimeClock) {
+        if (currentUserTimeClock - latestUserTimeClock > granularity) {
+            return true;
+        }
+        return false;
+    }
+
+    public boolean update(double d, long currentUserTimeClock) {
+        Boolean readyToFlushed = checkIfNeedFlush(currentUserTimeClock);
+        if (!readyToFlushed) {
+            if (currentUserTimeClock < latestUserTimeClock) {
+                LOG.warn("Something must be wrong, event should come in order of userTimeClock");
+            }
+            value.addAndGet(d);
+        }
+        else {
+            flush(latestUserTimeClock);
+            value.getAndSet(1);
+            latestUserTimeClock = currentUserTimeClock;
+        }
+        return readyToFlushed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java
new file mode 100644
index 0000000..e6fc098
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleGaugeMetric.java
@@ -0,0 +1,45 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class EagleGaugeMetric extends EagleMetric {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EagleGaugeMetric.class);
+
+    public EagleGaugeMetric(long latestUserTimeClock, String name, double value) {
+        super(latestUserTimeClock, name, value, 0);
+    }
+
+    public EagleGaugeMetric(EagleGaugeMetric metric) {
+        super(metric);
+    }
+
+    public boolean update(double d, long currentUserTimeClock) {
+        value.getAndSet(d);
+        this.latestUserTimeClock = currentUserTimeClock;
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java
new file mode 100644
index 0000000..e45a8ce
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetric.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class EagleMetric implements IEagleMetric {
+
+    public long latestUserTimeClock;
+    public AtomicDouble value;
+    public String name;
+    public long granularity;
+    public List<EagleMetricListener> metricListeners = new ArrayList<>();
+    private static final Logger LOG = LoggerFactory.getLogger(EagleCounterMetric.class);
+
+    public EagleMetric(EagleMetric metric) {
+        this.latestUserTimeClock = metric.latestUserTimeClock;
+        this.name = metric.name;
+        this.value = new AtomicDouble(metric.value.doubleValue());
+        this.granularity = metric.granularity;
+    }
+
+    public EagleMetric(long latestUserTimeClock, String name, double value, long granularity) {
+        this.latestUserTimeClock = latestUserTimeClock;
+        this.name = name;
+        this.value = new AtomicDouble(value);
+        this.granularity = granularity;
+    }
+
+    public EagleMetric(long latestUserTimeClock, String metricName, double value) {
+        this(latestUserTimeClock, metricName, value, 5 * DateUtils.MILLIS_PER_MINUTE);
+    }
+
+    public void registerListener(EagleMetricListener listener) {
+        metricListeners.add(listener);
+    }
+
+    public Double getValue() {
+        return value.doubleValue();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java
new file mode 100644
index 0000000..daf6ddf
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricKey.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import java.util.Map;
+
+public class EagleMetricKey {
+    public String metricName;
+    public Map<String, String> tags;
+    public long timestamp;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java
new file mode 100644
index 0000000..d8fe7b2
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleMetricListener.java
@@ -0,0 +1,31 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import java.util.List;
+
+public interface EagleMetricListener {
+
+    /**
+     * The method should be called in thread-safe mode
+     * @param metrics
+     */
+    void onMetricFlushed(List<EagleMetric> metrics);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java
new file mode 100644
index 0000000..49a8cdc
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/EagleServiceReporterMetricListener.java
@@ -0,0 +1,63 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EagleServiceReporterMetricListener implements EagleMetricListener{
+
+    private EagleServiceClientImpl client;
+    private static final Logger LOG = LoggerFactory.getLogger(EagleServiceReporterMetricListener.class);
+
+    public EagleServiceReporterMetricListener(String host, int port, String username, String password) {
+        client = new EagleServiceClientImpl(host, port, username, password);
+    }
+
+    public EagleServiceReporterMetricListener(String host, int port) {
+        client = new EagleServiceClientImpl(host, port, null, null);
+    }
+
+    public void onMetricFlushed(List<EagleMetric> metrics) {
+        List<GenericMetricEntity> entities = new ArrayList<>();
+        for (EagleMetric metric : metrics) {
+            String metricName = metric.name;
+            entities.add(MetricEntityAdaptor.convert(metricName, metric));
+        }
+        try {
+            int total = entities.size();
+            GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
+            if(response.isSuccess()) {
+                LOG.info("Wrote " + total + " entities to service");
+            }else{
+                LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
+            }
+        }
+        catch (Exception ex) {
+            LOG.error("Got exception while writing entities: ", ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java
new file mode 100644
index 0000000..43f8092
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/IEagleMetric.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import com.codahale.metrics.Gauge;
+
+/**
+ * It's just a workaround to extends Gauge instead of Metric interface
+ * For MetricRegistry's notifyListenerOfRemovedMetric method will throw exception on unknown metric type
+ */
+
+public interface IEagleMetric extends Gauge<Double> {
+
+    void registerListener(EagleMetricListener listener);
+
+    /**
+     * return true if the metric need to be flushed, otherwise return false
+     * @param value
+     * @param userTimeClock
+     * @return
+     */
+    boolean update(double value, long userTimeClock);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java
new file mode 100644
index 0000000..e1f8154
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricEntityAdaptor.java
@@ -0,0 +1,50 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import com.codahale.metrics.Metric;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+
+public class MetricEntityAdaptor {
+
+    public static GenericMetricEntity convert(String name, Metric metric) {
+        //TODO: add other type metric support
+        EagleMetricKey metricName = MetricKeyCodeDecoder.decodeTSMetricKey(name);
+        if (metric instanceof EagleCounterMetric) {
+            EagleCounterMetric counter = (EagleCounterMetric)metric;
+            GenericMetricEntity entity = new GenericMetricEntity();
+            entity.setPrefix(metricName.metricName);
+            entity.setValue(new double[]{counter.getValue()});
+            entity.setTags(metricName.tags);
+            entity.setTimestamp(metricName.timestamp);
+            return entity;
+        }
+        else if (metric instanceof EagleGaugeMetric) {
+            EagleGaugeMetric gauge = (EagleGaugeMetric)metric;
+            GenericMetricEntity entity = new GenericMetricEntity();
+            entity.setPrefix(metricName.metricName);
+            entity.setValue(new double[]{gauge.getValue()});
+            entity.setTags(metricName.tags);
+            entity.setTimestamp(metricName.timestamp);
+            return entity;
+        }
+        throw new RuntimeException("Not support this metric type for now!");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java
new file mode 100644
index 0000000..7f798fb
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/reportor/MetricKeyCodeDecoder.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.reportor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricKeyCodeDecoder {
+
+    public static String codeMetricKey(String metricName, Map<String, String> tags) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(metricName);
+        for (Map.Entry<String, String> entry : tags.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            sb.append(" " + key + ":" + value);
+        }
+        return sb.toString();
+    }
+
+    public static EagleMetricKey decodeMetricKey(String name) {
+        EagleMetricKey metricName = new EagleMetricKey();
+        String[] parts = name.split(" ");
+        metricName.metricName = parts[0];
+        metricName.tags = new HashMap<>();
+        for (int i = 1; i < parts.length; i++) {
+            String[] keyValue = parts[i].split(":");
+            metricName.tags.put(keyValue[0], keyValue[1]);
+        }
+        return metricName;
+    }
+
+    public static String addTimestampToMetricKey(long timestamp, String metricKey) {
+        return timestamp + " " + metricKey;
+    }
+
+    public static String codeTSMetricKey(long timestamp, String metricName, Map<String, String> tags) {
+        return addTimestampToMetricKey(timestamp, codeMetricKey(metricName, tags));
+    }
+
+    public static EagleMetricKey decodeTSMetricKey(String name) {
+        Integer index = name.indexOf(" ");
+        EagleMetricKey metricKey = decodeMetricKey(name.substring(index + 1));
+        metricKey.timestamp = Long.valueOf(name.substring(0, index));
+        return metricKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
index e68360f..1dbf75b 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
@@ -26,4 +26,15 @@ public class ServiceConfig implements Serializable{
     public Integer servicePort;
     public String username;
     public String password;
+
+    public ServiceConfig() {
+
+    }
+
+    public ServiceConfig(String serviceHost, Integer servicePort, String username, String password) {
+        this.serviceHost = serviceHost;
+        this.servicePort = servicePort;
+        this.username = username;
+        this.password = password;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
index de93ea3..90342f3 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
@@ -41,18 +41,16 @@ public class KafkaLatestOffsetFetcher {
         Map<Integer, Long> ret = new HashMap<>();
         for (int partition = 0; partition < partitionCount; partition++) {
             PartitionMetadata metadata = metadatas.get(partition);
-            if (metadata == null) {
-                throw new RuntimeException("Can't find metadata for Topic and Partition. Exiting");
-            }
-            if (metadata.leader() == null) {
-                throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
+            if (metadata == null || metadata.leader() == null) {
+                ret.put(partition, -1L);
+                //throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
             }
             String leadBroker = metadata.leader().host();
             String clientName = "Client_" + topic + "_" + partition;
             SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
-            long lastestOffset = getLatestOffset(consumer, topic, partition, clientName);
+            long latestOffset = getLatestOffset(consumer, topic, partition, clientName);
             if (consumer != null) consumer.close();
-            ret.put(partition, lastestOffset);
+            ret.put(partition, latestOffset);
         }
         return ret;
     }
@@ -88,7 +86,7 @@ public class KafkaLatestOffsetFetcher {
                 }
                 if (partitionMetadata.size() == partitionCount) break;
             } catch (Exception e) {
-                throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: " + e);
+                throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: ", e);
             } finally {
                 if (consumer != null) consumer.close();
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
index 7af5ea6..b521d65 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -19,47 +19,31 @@
 
 package org.apache.eagle.metric.kafka;
 
+import com.codahale.metrics.MetricRegistry;
 import com.typesafe.config.Config;
+import org.apache.commons.lang.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor1;
 import org.apache.eagle.datastream.Tuple1;
-import org.apache.eagle.metric.CountingMetric;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.manager.EagleMetricReportManager;
-import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.apache.eagle.metric.reportor.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> {
 
     private Config config;
     private Map<String, String> baseMetricDimension;
-    private Map<String, EventMetric> eventMetrics;
-    private static final long DEFAULT_METRIC_GRANULARITY = 5 * 60 * 1000;
-    private static final String metricName = "kafka.message.user.count";
+    private MetricRegistry registry;
+    private EagleMetricListener listener;
+    private long granularity;
+    private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);
 
-    public static class EventMetric {
-        long latestMessageTime;
-        Metric metric;
-
-        public EventMetric(long latestMessageTime, Metric metric) {
-            this.latestMessageTime = latestMessageTime;
-            this.metric = metric;
-        }
-
-        public void update(double d) {
-            this.metric.update(d);
-        }
-    }
-
     @Override
     public void prepareConfig(Config config) {
         this.config = config;
@@ -72,54 +56,45 @@ public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<S
         this.baseMetricDimension = new HashMap<>();
         this.baseMetricDimension.put("site", site);
         this.baseMetricDimension.put("topic", topic);
-        String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-        int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
-        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        registry = new MetricRegistry();
 
-        EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
-        EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
-        eventMetrics = new ConcurrentHashMap<>();
-    }
+        this.granularity = DEFAULT_METRIC_GRANULARITY;
+        if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
+            this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE;
+        }
 
-    public long trimTimestamp(long timestamp, long granularity) {
-        return timestamp / granularity * granularity;
+        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        listener = new EagleServiceReporterMetricListener(host, port, username, password);
     }
 
-    public void putNewMetric(long currentMessageTime, String user) {
-        Map<String ,String> dimensions = new HashMap<>();
+    public String generateMetricKey(String user) {
+        Map<String, String> dimensions = new HashMap<>();
         dimensions.putAll(baseMetricDimension);
         dimensions.put("user", user);
-        long trimTimestamp = trimTimestamp(currentMessageTime, DEFAULT_METRIC_GRANULARITY);
-        Metric metric = new CountingMetric(trimTimestamp, dimensions, metricName, 1);
-        eventMetrics.put(user, new EventMetric(currentMessageTime, metric));
-    }
-
-    public void update(long currentMessageTime, String user) {
-        if (eventMetrics.get(user) == null) {
-            LOG.info("A new user in the time interval, user: " + user + ", currentMessageTime: " + currentMessageTime);
-            putNewMetric(currentMessageTime, user);
-        }
-        else {
-            long latestMessageTime = eventMetrics.get(user).latestMessageTime;
-            if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
-                LOG.info("Going to emit a user metric, user: " + user + ", currentMessageTime: " + currentMessageTime
-                        + ", latestMessageTime: " + latestMessageTime);
-                EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
-                putNewMetric(currentMessageTime, user);
-            }
-            else {
-                eventMetrics.get(user).update(1);
-            }
-        }
+        String metricName = "eagle.kafka.message.count";
+        String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions);
+        return encodedMetricName;
     }
 
     @Override
     public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) {
         try {
             String user = (String) input.get(0);
-            Long timestamp = (Long) (input.get(1));
-            update(timestamp, user);
+            Long timestamp = (Long) input.get(1);
+            String metricKey = generateMetricKey(user);
+            if (registry.getMetrics().get(metricKey) == null) {
+                EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity);
+                metric.registerListener(listener);
+                registry.register(metricKey, metric);
+            }
+            else {
+                EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey);
+                metric.update(1, timestamp);
+                //TODO: if we need to remove metric from registry
+            }
         }
         catch (Exception ex) {
             LOG.error("Got an exception, ex: ", ex);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
index aee817a..a03705f 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -20,10 +20,7 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
-import org.apache.eagle.metric.CountingMetric;
-import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.manager.EagleMetricReportManager;
-import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.apache.eagle.metric.reportor.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,12 +31,13 @@ import java.util.Map;
 
 public class KafkaOffsetSpout extends BaseRichSpout {
 	private static final long serialVersionUID = 1L;
-	private static final long DEFAULT_ROUND_INTERVALS = 5 * 60 * 1000;
+	private static final long DEFAULT_ROUND_INTERVALS = 60 * 1000;
 	private KafkaOffsetCheckerConfig config;
 	private KafkaConsumerOffsetFetcher consumerOffsetFetcher;
 	private KafkaLatestOffsetFetcher latestOffsetFetcher;
 	private Map<String, String> baseMetricDimension;
 	private long lastRoundTime = 0;
+	private EagleMetricListener listener;
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSpout.class);
 
@@ -57,20 +55,20 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 		this.baseMetricDimension.put("site", config.kafkaConfig.site);
 		this.baseMetricDimension.put("topic", config.kafkaConfig.topic);
 		this.baseMetricDimension.put("group", config.kafkaConfig.group);
-		String eagleServiceHost = config.serviceConfig.serviceHost;
-		Integer eagleServicePort = config.serviceConfig.servicePort;
+		String host = config.serviceConfig.serviceHost;
+		Integer port = config.serviceConfig.servicePort;
 		String username = config.serviceConfig.username;
 		String password = config.serviceConfig.password;
-		EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
-		EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
+		listener = new EagleServiceReporterMetricListener(host, port, username, password);
 	}
 
-	public Metric constructMetric(long timestamp, String partition, double value) {
+	public EagleMetric constructMetric(long timestamp, String partition, double value) {
 		Map<String, String> dimensions = new HashMap<>();
 		dimensions.putAll(baseMetricDimension);
 		dimensions.put("partition", partition);
 		String metricName = "eagle.kafka.message.consumer.lag";
-		Metric metric = new CountingMetric(timestamp, dimensions, metricName, value);
+		String metricKey = MetricKeyCodeDecoder.codeTSMetricKey(timestamp, metricName, dimensions);
+		EagleGaugeMetric metric = new EagleGaugeMetric(timestamp, metricKey, value);
 		return metric;
 	}
 
@@ -83,18 +81,21 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 		Long currentTime = System.currentTimeMillis();
 		if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
 			try {
-				long trimedCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
+				long trimCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
 				Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
 				Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
-				List<Metric> list = new ArrayList<>();
+				List<EagleMetric> metrics = new ArrayList<>();
 				for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
 					String partition = entry.getKey();
 					Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
 					Long lag = latestOffset.get(partitionNumber) - entry.getValue();
-					list.add(constructMetric(trimedCurrentTime, partition, lag));
+					// If the partition is not available
+					if (latestOffset.get(partitionNumber) == -1) lag = -1L;
+					EagleMetric metric = constructMetric(trimCurrentTime, partition, lag);
+					metrics.add(metric);
 				}
-				lastRoundTime = trimedCurrentTime;
-				EagleMetricReportManager.getInstance().emit(list);
+				lastRoundTime = trimCurrentTime;
+				listener.onMetricFlushed(metrics);
 			} catch (Exception ex) {
 				LOG.error("Got an exception, ex: ", ex);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-metric-collection/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/resources/application.conf b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
index 4b07019..9c91744 100644
--- a/eagle-security/eagle-metric-collection/src/main/resources/application.conf
+++ b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
@@ -12,7 +12,7 @@
     # For fetch gap
     "site" : "sandbox",
     "topic" : "sandbox_hdfs_audit_log",
-    "zkQuorum" : "localhost:2191",
+    "zkQuorum" : "localhost:2181",
     "hdfsTopologyConsumerGroupId" : "eagle.hdfsaudit.consumer",
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
@@ -26,7 +26,8 @@
     #"transactionZKPort" : "2181",
     "transactionZKRoot" : "/consumers",
     #"transactionStateUpdateMS" : 2000,
-    "kafkaEndPoints" : "localhost:9092"
+    "kafkaEndPoints" : "localhost:9092",
+    "kafkaDistributionDataIntervalMin" : 1
   },
   "eagleProps" : {
     "eagleService": {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/628e8753/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index 7d32091..24deb65 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -79,7 +79,7 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
                     .endTime(endTime)
                     .pageSize(Integer.MAX_VALUE)
                     .query(query)
-                    .metricName("kafka.message.user.count")
+                    .metricName("eagle.kafka.message.count")
                     .send();
             if (!response.isSuccess()) {
                 LOG.error(response.getException());