You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/12/03 04:28:59 UTC

[1/5] incubator-eagle git commit: add metric topology for offline metric collection

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 91aa216a8 -> 3ee73e8d5


add metric topology for offline metric collection


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/34fa6d90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/34fa6d90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/34fa6d90

Branch: refs/heads/master
Commit: 34fa6d90a8cfe21a8ad85ce852b648a4a05cc363
Parents: 6f29955
Author: sunlibin <ab...@gmail.com>
Authored: Sat Nov 21 18:04:54 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 11:48:27 2015 +0800

----------------------------------------------------------------------
 .../apache/eagle/executor/AlertExecutor.java    |  10 +-
 .../config/RunningJobCrawlConfig.java           |  14 +-
 .../storm/kafka/KafkaSourcedSpoutProvider.java  |   4 -
 .../storm/kafka/KafkaSourcedSpoutScheme.java    |   1 -
 .../impl/storm/zookeeper/ZKStateConfig.java     |  28 ++++
 .../eagle/datastream/StreamProducer.scala       |   4 +
 .../org/apache/eagle/metric/CountingMetric.java |   8 +-
 .../java/org/apache/eagle/metric/Metric.java    |   5 +-
 .../manager/EagleMetricReportManager.java       |  45 +++++++
 .../metric/report/EagleSerivceMetricReport.java |  61 ---------
 .../metric/report/EagleServiceMetricReport.java |  60 +++++++++
 .../metric/report/MetricEntityConvert.java      |   2 +-
 .../eagle/metric/report/MetricReport.java       |   4 +-
 eagle-security/eagle-metric-collection/pom.xml  |  95 ++++++++++++++
 .../metric/kafka/EagleMetricCollectorMain.java  | 127 ++++++++++++++++++
 .../eagle/metric/kafka/KafkaConsumerOffset.java |  27 ++++
 .../kafka/KafkaConsumerOffsetFetcher.java       |  70 ++++++++++
 .../metric/kafka/KafkaLatestOffsetFetcher.java  |  98 ++++++++++++++
 .../kafka/KafkaMessageDistributionExecutor.java | 126 ++++++++++++++++++
 .../metric/kafka/KafkaOffsetCheckerConfig.java  |  50 +++++++
 .../kafka/KafkaOffsetSourceSpoutProvider.java   |  53 ++++++++
 .../eagle/metric/kafka/KafkaOffsetSpout.java    | 131 +++++++++++++++++++
 .../src/main/resources/application.conf         |  39 ++++++
 .../src/main/resources/log4j.properties         |  39 ++++++
 .../src/test/java/TestKafkaOffset.java          |   2 +
 .../auditlog/HdfsAuditLogProcessorMain.java     |   1 -
 ...HiveJobRunningSourcedStormSpoutProvider.java |   2 +-
 eagle-security/pom.xml                          |   1 +
 eagle-topology-assembly/pom.xml                 |   5 +
 29 files changed, 1021 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
index 4f0f4b3..d86a846 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -28,7 +28,7 @@ import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.metric.CountingMetric;
 import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.report.EagleSerivceMetricReport;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
 import com.sun.jersey.client.impl.CopyOnWriteHashMap;
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.policy.*;
@@ -73,7 +73,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 	private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
 	private Map<String, String> baseDimensions;
 	private Thread metricReportThread;
-	private EagleSerivceMetricReport metricReport;
+	private EagleServiceMetricReport metricReport;
 
 	public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
                          AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){
@@ -122,7 +122,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
 		String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
 				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
-		this.metricReport = new EagleSerivceMetricReport(eagleServiceHost, eagleServicePort, username, password);
+		this.metricReport = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
 
 		metricMap = new ConcurrentHashMap<String, Metric>();
 		baseDimensions = new HashMap<String, String>();
@@ -172,7 +172,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 		}
 		
 		policyEvaluators = new CopyOnWriteHashMap<>();
-		// for efficency, we don't put single policy evaluator 
+		// for efficiency, we don't put single policy evaluator
 		policyEvaluators.putAll(tmpPolicyEvaluators);
 		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
 		
@@ -258,7 +258,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
 						long previous = metric.getTimestamp();
 						if (current > previous + MERITE_GRANULARITY) {
 							metricList.add(metric);
-							metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDemensions(), metric.getMetricName()));
+							metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDimensions(), metric.getMetricName()));
 						}
 					}
 				}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
index b17a41d..79a8928 100644
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
@@ -16,10 +16,11 @@
  */
 package org.apache.eagle.jobrunning.config;
 
-import java.io.Serializable;
-
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
 import org.apache.eagle.job.JobPartitioner;
 
+import java.io.Serializable;
+
 public class RunningJobCrawlConfig implements Serializable{
 	private static final long serialVersionUID = 1L;
 	public RunningJobEndpointConfig endPointConfig;
@@ -49,13 +50,4 @@ public class RunningJobCrawlConfig implements Serializable{
         public Class<? extends JobPartitioner> partitionerCls;
         public int numTotalPartitions = 1;
     }
-    
-	public static class ZKStateConfig implements Serializable{
-		private static final long serialVersionUID = 1L;
-		public String zkQuorum;
-        public String zkRoot;
-        public int zkSessionTimeoutMs;
-        public int zkRetryTimes;
-        public int zkRetryInterval;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
index 06d37ef..373b3ca 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@ -51,10 +51,6 @@ public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{
 		String zkConnString = context.getString("dataSourceConfig.zkConnection");
 		// transaction zkRoot
 		String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
-		// Site
-		String site = context.getString("eagleProps.site");
-
-        //String realTopic = (site ==null)? topic : String.format("%s_%s",site,topic);
 
         LOG.info(String.format("Use topic id: %s",topic));
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
index 8bdbcb5..8b65c1f 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
@@ -51,7 +51,6 @@ public class KafkaSourcedSpoutScheme implements Scheme {
 	@Override
 	public List<Object> deserialize(byte[] ser) {
 		Object tmp = deserializer.deserialize(ser);
-		Map<String, Object> map = (Map<String, Object>)tmp;
 		if(tmp == null)
 			return null;
 		// the following tasks are executed within the same process of kafka spout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
new file mode 100644
index 0000000..f9515f5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.zookeeper;
+
+import java.io.Serializable;
+
+public class ZKStateConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public String zkQuorum;
+    public String zkRoot;
+    public int zkSessionTimeoutMs;
+    public int zkRetryTimes;
+    public int zkRetryInterval;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
index 9fb3e22..40d4904 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -103,6 +103,10 @@ trait StreamProducer{
     ret
   }
 
+  def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
+    streamUnion(others);
+  }
+
   def streamUnion(others : Seq[StreamProducer]) : StreamProducer = {
     val ret = StreamUnionProducer(incrementAndGetId(), others)
     hookupDAG(graph, this, ret)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
index f4d5cd5..4f65b8e 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
@@ -25,16 +25,20 @@ import com.google.common.util.concurrent.AtomicDouble;
  */
 public class CountingMetric extends Metric{
 
+    public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, double value) {
+        super(timestamp, dimensions, metricName, new AtomicDouble(value));
+    }
+
     public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
     	super(timestamp, dimensions, metricName, value);
     }
-  
+
     public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName) {
 	   this(timestamp, dimensions, metricName, new AtomicDouble(0.0));
     }
 
     public CountingMetric(CountingMetric metric) {
-        this(metric.timestamp, new HashMap<String, String>(metric.dimensions), metric.metricName, metric.value);
+        this(metric.timestamp, new HashMap<>(metric.dimensions), metric.metricName, metric.value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
index 993906e..616c82b 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.metric;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -32,7 +33,7 @@ public abstract class Metric implements MetricOperator{
     
     public Metric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
        this.timestamp = timestamp;
-       this.dimensions = dimensions;
+       this.dimensions = new HashMap<>(dimensions);
        this.metricName = metricName;
 	   this.value = value;
     }
@@ -45,7 +46,7 @@ public abstract class Metric implements MetricOperator{
         return timestamp;
      }
 
-     public Map<String, String> getDemensions() {
+     public Map<String, String> getDimensions() {
         return dimensions;
      }
    

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
new file mode 100644
index 0000000..b63944d
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
@@ -0,0 +1,45 @@
+package org.apache.eagle.metric.manager;
+
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.report.MetricReport;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EagleMetricReportManager {
+
+    private static EagleMetricReportManager manager = new EagleMetricReportManager();
+    private Map<String, MetricReport> metricReportMap = new ConcurrentHashMap<>();
+
+    private EagleMetricReportManager() {
+
+    }
+
+    public static EagleMetricReportManager getInstance () {
+        return manager;
+    }
+
+    public boolean register(String name, MetricReport report) {
+       if (metricReportMap.get(name) == null) {
+           synchronized (metricReportMap) {
+               if (metricReportMap.get(name) == null) {
+                   metricReportMap.put(name, report);
+                   return true;
+               }
+            }
+        }
+        return false;
+    }
+
+    public Map<String, MetricReport> getRegisteredReports() {
+        return metricReportMap;
+    }
+
+    public void emit(List<Metric> list) {
+        synchronized (this.metricReportMap) {
+            for (MetricReport report : metricReportMap.values()) {
+                report.emit(list);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
deleted file mode 100644
index 31056f2..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.metric.Metric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-
-public class EagleSerivceMetricReport implements MetricReport{
-		
-    private EagleServiceClientImpl client;
-	private static final Logger LOG = LoggerFactory.getLogger(EagleSerivceMetricReport.class);
-
-	public EagleSerivceMetricReport(String host, int port, String username, String password) {
-		client = new EagleServiceClientImpl(host, port, username, password);
-	}
-
-    public EagleSerivceMetricReport(String host, int port) {
-    	 client = new EagleServiceClientImpl(host, port, null, null);
-    }
-    	 
-	public void emit(List<Metric> list) {
-		List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>();
-		for (Metric metric : list) {
-			entities.add(MetricEntityConvert.convert(metric));
-		}
-		try {
-			int total = entities.size();
-			GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
-            if(response.isSuccess()) {
-                LOG.info("Wrote " + total + " entities to service");
-            }else{
-                LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
-            }
-		}
-		catch (Exception ex) {
-            LOG.error("Got exception while writing entities: ", ex);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
new file mode 100644
index 0000000..7ff415e
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.report;
+
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EagleServiceMetricReport implements MetricReport{
+		
+    private EagleServiceClientImpl client;
+	private static final Logger LOG = LoggerFactory.getLogger(EagleServiceMetricReport.class);
+
+	public EagleServiceMetricReport(String host, int port, String username, String password) {
+		client = new EagleServiceClientImpl(host, port, username, password);
+	}
+
+    public EagleServiceMetricReport(String host, int port) {
+    	client = new EagleServiceClientImpl(host, port, null, null);
+    }
+
+	public void emit(List<Metric> list) {
+		List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>();
+		for (Metric metric : list) {
+			entities.add(MetricEntityConvert.convert(metric));
+		}
+		try {
+			int total = entities.size();
+			GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
+            if(response.isSuccess()) {
+                LOG.info("Wrote " + total + " entities to service");
+            }else{
+                LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
+            }
+		}
+		catch (Exception ex) {
+            LOG.error("Got exception while writing entities: ", ex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
index 10f05ca..c389fa7 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
@@ -25,7 +25,7 @@ public class MetricEntityConvert {
         GenericMetricEntity entity = new GenericMetricEntity();
         entity.setPrefix(metric.getMetricName());
         entity.setValue(new double[]{metric.getValue().get()});
-        entity.setTags(metric.getDemensions());
+        entity.setTags(metric.getDimensions());
         entity.setTimestamp(metric.getTimestamp());
         return entity;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
index c03a89f..85d423b 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
@@ -21,6 +21,6 @@ import java.util.List;
 import org.apache.eagle.metric.Metric;
 
 public interface MetricReport {
-	 
-	public void emit(List<Metric> list);
+	// The method should be thread safe
+	void emit(List<Metric> list);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml
new file mode 100644
index 0000000..f2e78a6
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>eagle</groupId>
+    <artifactId>eagle-security-parent</artifactId>
+    <version>0.3.0</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-metric-collection</artifactId>
+  <packaging>jar</packaging>
+  <name>eagle-metric-collection</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-security-hdfs-auditlog</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-core</artifactId>
+        <exclusions>
+            <exclusion>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>log4j</groupId>
+                <artifactId>log4j</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.slf4j</groupId>
+                <artifactId>log4j-over-slf4j</artifactId>
+            </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>storm-kafka</artifactId>
+          <exclusions>
+              <exclusion>
+                  <groupId>ch.qos.logback</groupId>
+                  <artifactId>logback-classic</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>log4j</groupId>
+                  <artifactId>log4j</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>log4j-over-slf4j</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-alert-process</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-stream-process-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-stream-process-api</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>com.hierynomus</groupId>
+          <artifactId>sshj</artifactId>
+          <version>0.13.0</version>
+      </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
new file mode 100644
index 0000000..65fe68a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
+import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class EagleMetricCollectorMain {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class);
+
+    public static void main(String[] args) throws Exception {
+        new ConfigOptionParser().load(args);
+        //System.setProperty("config.resource", "/application.local.conf");
+
+        Config config = ConfigFactory.load();
+
+        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+
+        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+            @Override
+            public List<Object> deserialize(byte[] ser) {
+                Object tmp = deserializer.deserialize(ser);
+                Map<String, Object> map = (Map<String, Object>)tmp;
+                if(tmp == null) return null;
+                return Arrays.asList(map.get("user"), map.get("timestamp"));
+            }
+        };
+
+        KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
+            @Override
+            public BaseRichSpout getSpout(Config context) {
+                // Kafka topic
+                String topic = context.getString("dataSourceConfig.topic");
+                // Kafka consumer group id
+                String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
+                // Kafka fetch size
+                int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+                // Kafka deserializer class
+                String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+
+                // Kafka broker zk connection
+                String zkConnString = context.getString("dataSourceConfig.zkQuorum");
+
+                // transaction zkRoot
+                String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+
+                LOG.info(String.format("Use topic id: %s",topic));
+
+                String brokerZkPath = null;
+                if(context.hasPath("dataSourceConfig.brokerZkPath")) {
+                    brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+                }
+
+                BrokerHosts hosts;
+                if(brokerZkPath == null) {
+                    hosts = new ZkHosts(zkConnString);
+                } else {
+                    hosts = new ZkHosts(zkConnString, brokerZkPath);
+                }
+
+                SpoutConfig spoutConfig = new SpoutConfig(hosts,
+                        topic,
+                        zkRoot + "/" + topic,
+                        groupId);
+
+                // transaction zkServers
+                String[] zkConnections = zkConnString.split(",");
+                List<String> zkHosts = new ArrayList<>();
+                for (String zkConnection : zkConnections) {
+                    zkHosts.add(zkConnection.split(":")[0]);
+                }
+                Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
+
+                spoutConfig.zkServers = zkHosts;
+                // transaction zkPort
+                spoutConfig.zkPort = zkPort;
+                // transaction update interval
+                spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+                // Kafka fetch size
+                spoutConfig.fetchSizeBytes = fetchSize;
+
+                spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
+                KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+                return kafkaSpout;
+            }
+        };
+
+        env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
+        env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageDistributionCheck").groupBy(Arrays.asList(0))
+                .flatMap(new KafkaMessageDistributionExecutor());
+        env.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
new file mode 100644
index 0000000..5cf0e11
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import java.util.Map;
+
+public class KafkaConsumerOffset {
+    public Map<String, String> topology;
+    public Long offset;
+    public Long partition;
+    public Map<String, String> broker;
+    public String topic;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
new file mode 100644
index 0000000..f34f195
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaConsumerOffsetFetcher {
+
+    public CuratorFramework curator;
+    public String zkRoot;
+    public ObjectMapper mapper;
+    public String topic;
+    public String group;
+
+    public KafkaConsumerOffsetFetcher(ZKStateConfig config, String topic, String group) {
+        try {
+            this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000,
+                    new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval));
+            curator.start();
+            this.zkRoot = config.zkRoot;
+            mapper = new ObjectMapper();
+            Module module = new SimpleModule("offset").registerSubtypes(new NamedType(KafkaConsumerOffset.class));
+            mapper.registerModule(module);
+            this.topic = topic;
+            this.group = group;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Map<String, Long> fetch() throws Exception {
+        Map<String, Long> map = new HashMap<String, Long>();
+        String path = zkRoot + "/" + topic + "/" + group;
+        if (curator.checkExists().forPath(path) != null) {
+            List<String> partitions = curator.getChildren().forPath(path);
+            for (String partition : partitions) {
+                String partitionPath = path + "/" + partition;
+                String data = new String(curator.getData().forPath(partitionPath));
+                KafkaConsumerOffset offset = mapper.readValue(data, KafkaConsumerOffset.class);
+                map.put(partition, offset.offset);
+            }
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
new file mode 100644
index 0000000..de93ea3
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import java.util.*;
+
+public class KafkaLatestOffsetFetcher {
+
+    private List<String> brokerList;
+    private int port;
+
+    public KafkaLatestOffsetFetcher(String connectString) {
+        brokerList = new ArrayList<>();
+        String[] brokers = connectString.split(",");
+        for (String broker : brokers) {
+            brokerList.add(broker.split(":")[0]);
+        }
+        this.port = Integer.valueOf(brokers[0].split(":")[1]);
+    }
+
+    public Map<Integer, Long> fetch(String topic, int partitionCount) {
+        Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount);
+        Map<Integer, Long> ret = new HashMap<>();
+        for (int partition = 0; partition < partitionCount; partition++) {
+            PartitionMetadata metadata = metadatas.get(partition);
+            if (metadata == null) {
+                throw new RuntimeException("Can't find metadata for Topic and Partition. Exiting");
+            }
+            if (metadata.leader() == null) {
+                throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
+            }
+            String leadBroker = metadata.leader().host();
+            String clientName = "Client_" + topic + "_" + partition;
+            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+            long lastestOffset = getLatestOffset(consumer, topic, partition, clientName);
+            if (consumer != null) consumer.close();
+            ret.put(partition, lastestOffset);
+        }
+        return ret;
+    }
+
+    public long getLatestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+        Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+        if (response.hasError()) {
+            throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition) );
+        }
+        long[] offsets = response.offsets(topic, partition);
+        return offsets[0];
+    }
+
+    private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> brokerList, int port, String topic, int partitionCount) {
+        Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>();
+        for (String broker : brokerList) {
+            SimpleConsumer consumer = null;
+            try {
+                consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup");
+                List<String> topics = Collections.singletonList(topic);
+                TopicMetadataRequest req = new TopicMetadataRequest(topics);
+                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+                List<TopicMetadata> metaData = resp.topicsMetadata();
+                for (TopicMetadata item : metaData) {
+                    for (PartitionMetadata part : item.partitionsMetadata()) {
+                        partitionMetadata.put(part.partitionId(), part);
+                    }
+                }
+                if (partitionMetadata.size() == partitionCount) break;
+            } catch (Exception e) {
+                throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: " + e);
+            } finally {
+                if (consumer != null) consumer.close();
+            }
+        }
+        return partitionMetadata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
new file mode 100644
index 0000000..be6d0f7
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.apache.eagle.datastream.Tuple1;
+import org.apache.eagle.metric.CountingMetric;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.manager.EagleMetricReportManager;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> {
+
+    private Config config;
+    private Map<String, String> baseMetricDimension;
+    private Map<String, EventMetric> eventMetrics;
+    private static final long DEFAULT_METRIC_GRANULARITY = 5 * 60 * 1000;
+    private static final String metricName = "kafka.message.user.count";
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);
+
+    public static class EventMetric {
+        long latestMessageTime;
+        Metric metric;
+
+        public EventMetric(long latestMessageTime, Metric metric) {
+            this.latestMessageTime = latestMessageTime;
+            this.metric = metric;
+        }
+
+        public void update(double d) {
+            this.metric.update(d);
+        }
+    }
+
+    @Override
+    public void prepareConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public void init() {
+        String site = config.getString("dataSourceConfig.site");
+        String topic = config.getString("dataSourceConfig.topic");
+        this.baseMetricDimension = new HashMap<>();
+        this.baseMetricDimension.put("site", site);
+        this.baseMetricDimension.put("topic", topic);
+        String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+        EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
+        EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
+        eventMetrics = new ConcurrentHashMap<>();
+    }
+
+    public long trimTimestamp(long timestamp, long granularity) {
+        return timestamp / granularity * granularity;
+    }
+
+    public void putNewMetric(long currentMessageTime, String user) {
+        Map<String ,String> dimensions = new HashMap<>();
+        dimensions.putAll(baseMetricDimension);
+        dimensions.put("user", user);
+        long trimTimestamp = trimTimestamp(currentMessageTime, DEFAULT_METRIC_GRANULARITY);
+        Metric metric = new CountingMetric(trimTimestamp, dimensions, metricName, 1);
+        eventMetrics.put(user, new EventMetric(currentMessageTime, metric));
+    }
+
+    public void update(long currentMessageTime, String user) {
+        if (eventMetrics.get(user) == null) {
+            LOG.info("Got metrics for new user: " + user);
+            putNewMetric(currentMessageTime, user);
+        }
+        else {
+            long latestMessageTime = eventMetrics.get(user).latestMessageTime;
+            if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
+                EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
+                putNewMetric(currentMessageTime, user);
+            }
+            else {
+                eventMetrics.get(user).update(1);
+            }
+        }
+    }
+
+    @Override
+    public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) {
+        try {
+            String user = (String) input.get(0);
+            Long timestamp = (Long) (input.get(1));
+            update(timestamp, user);
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception, ex: ", ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
new file mode 100644
index 0000000..5a06c82
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
@@ -0,0 +1,50 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+
+import java.io.Serializable;
+
+public class KafkaOffsetCheckerConfig implements Serializable {
+    public static class KafkaConfig implements Serializable{
+        public String kafkaEndPoints;
+        public String topic;
+        public String site;
+        public String group;
+    }
+
+    public static class ServiceConfig implements Serializable{
+        public String serviceHost;
+        public Integer servicePort;
+        public String username;
+        public String password;
+    }
+
+    public ZKStateConfig zkConfig;
+    public KafkaConfig kafkaConfig;
+    public ServiceConfig serviceConfig;
+
+    public KafkaOffsetCheckerConfig (ServiceConfig serviceConfig, ZKStateConfig zkConfig, KafkaConfig kafkaConfig) {
+        this.serviceConfig = serviceConfig;
+        this.zkConfig = zkConfig;
+        this.kafkaConfig = kafkaConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
new file mode 100644
index 0000000..c794632
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaOffsetSourceSpoutProvider {
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSourceSpoutProvider.class);
+	
+	public BaseRichSpout getSpout(Config config){
+
+		ZKStateConfig zkStateConfig = new ZKStateConfig();
+		zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+		zkStateConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+		zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+		zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+		zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+		KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new KafkaOffsetCheckerConfig.ServiceConfig();
+		serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+		serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+		serviceConfig.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+		KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new KafkaOffsetCheckerConfig.KafkaConfig();
+		kafkaConfig.kafkaEndPoints = config.getString("dataSourceConfig.kafkaEndPoints");
+		kafkaConfig.site = config.getString("dataSourceConfig.site");
+		kafkaConfig.topic = config.getString("dataSourceConfig.topic");
+		kafkaConfig.group = config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId");
+		KafkaOffsetCheckerConfig checkerConfig = new KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig);
+		KafkaOffsetSpout spout = new KafkaOffsetSpout(checkerConfig);
+		return spout;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
new file mode 100644
index 0000000..d6f7298
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.metric.CountingMetric;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.manager.EagleMetricReportManager;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaOffsetSpout extends BaseRichSpout {
+	private static final long serialVersionUID = 1L;
+	private static final long DEFAULT_ROUND_INTERVALS = 5 * 60 * 1000;
+	private KafkaOffsetCheckerConfig config;
+	private KafkaConsumerOffsetFetcher consumerOffsetFetcher;
+	private KafkaLatestOffsetFetcher latestOffsetFetcher;
+	private Map<String, String> baseMetricDimension;
+	private long lastRoundTime = 0;
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSpout.class);
+
+	public KafkaOffsetSpout(KafkaOffsetCheckerConfig config) {//Config config, ZKStateConfig zkStateConfig, String kafkaEndPoints){
+		this.config = config;
+	}
+	
+	@SuppressWarnings("rawtypes")
+	@Override
+	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(config.zkConfig, config.kafkaConfig.topic, config.kafkaConfig.group);
+		latestOffsetFetcher = new KafkaLatestOffsetFetcher(config.kafkaConfig.kafkaEndPoints);
+
+		this.baseMetricDimension = new HashMap<>();
+		this.baseMetricDimension.put("site", config.kafkaConfig.site);
+		this.baseMetricDimension.put("topic", config.kafkaConfig.topic);
+		this.baseMetricDimension.put("group", config.kafkaConfig.group);
+		String eagleServiceHost = config.serviceConfig.serviceHost;
+		Integer eagleServicePort = config.serviceConfig.servicePort;
+		String username = config.serviceConfig.serviceHost;
+		String password = config.serviceConfig.serviceHost;
+		EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
+		EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
+	}
+
+	public Metric constructMetric(long timestamp, String partition, double value) {
+		Map<String, String> dimensions = new HashMap<>();
+		dimensions.putAll(baseMetricDimension);
+		dimensions.put("partition", partition);
+		String metricName = "eagle.kafka.message.consumer.lag";
+		Metric metric = new CountingMetric(timestamp, dimensions, metricName, value);
+		return metric;
+	}
+
+	@Override
+	public void nextTuple() {
+		Long currentTime = System.currentTimeMillis();
+		if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
+			try {
+				Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+				Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
+				List<Metric> list = new ArrayList<>();
+				for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+					String partition = entry.getKey();
+					Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+					Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+					list.add(constructMetric(currentTime, partition, lag));
+				}
+				EagleMetricReportManager.getInstance().emit(list);
+			} catch (Exception ex) {
+				LOG.error("Got an exception, ex: ", ex);
+			}
+		}
+        try{
+            Thread.sleep(10 * 1000);
+        }catch(Throwable t){
+			//Do nothing
+        }
+    }
+	
+	/**
+	 * empty because framework will take care of output fields declaration
+	 */
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		
+	}
+
+	@Override
+    public void ack(Object msgId) {
+    }
+
+    @Override
+    public void fail(Object msgId) {
+    }
+   
+    @Override
+    public void deactivate() {
+    	
+    }
+   
+    @Override
+    public void close() {
+    	
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/resources/application.conf b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
new file mode 100644
index 0000000..4b07019
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
@@ -0,0 +1,39 @@
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "local",
+    "topologyName" : "metricCollectionTopology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 1
+    }
+  },
+  "dataSourceConfig": {
+    # For fetch gap
+    "site" : "sandbox",
+    "topic" : "sandbox_hdfs_audit_log",
+    "zkQuorum" : "localhost:2191",
+    "hdfsTopologyConsumerGroupId" : "eagle.hdfsaudit.consumer",
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 2000,
+    "zkConnectionTimeoutMS" : 15000,
+    #"fetchSize" : 1048586,
+    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+    "metricCollectionConsumerId" : "eagle.metric.collection.consumer",
+    # For kafka spout
+    #"transactionZKServers" : "localhost",
+    #"transactionZKPort" : "2181",
+    "transactionZKRoot" : "/consumers",
+    #"transactionStateUpdateMS" : 2000,
+    "kafkaEndPoints" : "localhost:9092"
+  },
+  "eagleProps" : {
+    "eagleService": {
+      "host": "localhost",
+      "port": 38080,
+      "username": "admin",
+      "password": "secret"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8a0919a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+
+#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
+#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
+#log4j.logger.eagle.executor.AlertExecutor=DEBUG
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
new file mode 100644
index 0000000..bfba783
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
@@ -0,0 +1,2 @@
+public class TestKafkaOffset {
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index fa712d4..327cb8d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -41,7 +41,6 @@ public class HdfsAuditLogProcessorMain {
         Config config = new ConfigOptionParser().load(args);
 
         LOG.info("Config class: " + config.getClass().getCanonicalName());
-
         if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
 
         StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index 16f77c3..9ddf0b2 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -21,7 +21,7 @@ import org.apache.eagle.job.JobPartitioner;
 import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
 import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ControlConfig;
 import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.RunningJobEndpointConfig;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ZKStateConfig;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
 import org.apache.eagle.jobrunning.storm.JobRunningSpout;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/pom.xml b/eagle-security/pom.xml
index 0f45c49..599cff6 100644
--- a/eagle-security/pom.xml
+++ b/eagle-security/pom.xml
@@ -41,5 +41,6 @@
     <module>eagle-security-hdfs-securitylog</module>
     <module>eagle-security-hbase-securitylog</module>
     <module>eagle-security-hbase-web</module>
+	<module>eagle-metric-collection</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 838c2f8..2132bb3 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -54,6 +54,11 @@
             <artifactId>eagle-security-hbase-securitylog</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-metric-collection</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>


[2/5] incubator-eagle git commit: balance events partition based on greedy parition algorithm

Posted by ha...@apache.org.
balance events partition based on greedy parition algorithm


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3de5cce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3de5cce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3de5cce6

Branch: refs/heads/master
Commit: 3de5cce698e2da3db223c468d28388ae5189a2e2
Parents: 34fa6d9
Author: sunlibin <ab...@gmail.com>
Authored: Thu Nov 26 18:45:46 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 13:38:14 2015 +0800

----------------------------------------------------------------------
 .../storm/partition/EagleCustomGrouping.java    |  51 +++++++++
 .../datastream/StormTopologyCompiler.scala      |  14 ++-
 .../eagle/datastream/StreamAlertExpansion.scala |   9 +-
 .../eagle/datastream/StreamConnector.scala      |   8 ++
 .../datastream/StreamGroupbyExpansion.scala     |   8 +-
 .../eagle/datastream/StreamProducer.scala       |  28 ++++-
 .../eagle/partition/DataDistributionDao.java    |  28 +++++
 .../eagle/partition/PartitionAlgorithm.java     |  29 +++++
 .../eagle/partition/PartitionStrategy.java      |  27 +++++
 .../eagle/partition/PartitionStrategyImpl.java  |  80 ++++++++++++++
 .../java/org/apache/eagle/partition/Weight.java |  30 ++++++
 .../apache/eagle/metric/MetricConstants.java    |  24 +++++
 .../manager/EagleMetricReportManager.java       |  16 +++
 .../eagle/service/client/ServiceConfig.java     |  29 +++++
 eagle-security/eagle-metric-collection/pom.xml  |   5 +
 .../kafka/KafkaMessageDistributionExecutor.java |   4 +-
 .../metric/kafka/KafkaOffsetCheckerConfig.java  |   8 +-
 .../kafka/KafkaOffsetSourceSpoutProvider.java   |   3 +-
 .../eagle/metric/kafka/KafkaOffsetSpout.java    |  15 +--
 .../test/java/TestDataDistributionDaoImpl.java  |  40 +++++++
 .../src/test/java/TestGreedyPartition.java      |  44 ++++++++
 .../src/test/java/TestKafkaOffset.java          |  68 ++++++++++++
 .../apache/eagle/security/partition/Bucket.java |  30 ++++++
 .../security/partition/BucketComparator.java    |  36 +++++++
 .../partition/DataDistributionDaoImpl.java      | 106 +++++++++++++++++++
 .../partition/GreedyPartitionAlgorithm.java     |  65 ++++++++++++
 .../auditlog/HdfsAuditLogProcessorMain.java     |  85 +++++++++++----
 .../src/main/resources/application.conf         |   1 +
 28 files changed, 845 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
new file mode 100644
index 0000000..96e42b7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
@@ -0,0 +1,51 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import org.apache.eagle.partition.PartitionStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class EagleCustomGrouping implements CustomStreamGrouping {
+
+    public List<Integer> targetTasks;
+    public PartitionStrategy strategy;
+
+    public EagleCustomGrouping(PartitionStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        this.targetTasks = new ArrayList<>(targetTasks);
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        int numTasks = targetTasks.size();
+        int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
+        return Arrays.asList(targetTasks.get(targetTaskIndex));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
index dbe69d2..4f9fccc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseRichBolt
 import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
 import backtype.storm.tuple.Fields
 import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.storm.partition.EagleCustomGrouping
 import org.slf4j.LoggerFactory
 
 case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
@@ -63,10 +64,19 @@ case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGr
           }
           case Some(bt) => boltDeclarer = bt
         }
-        sc.groupByFields match{
+        if (sc.groupByFields != Nil) {
+          boltDeclarer.fieldsGrouping(fromName, new Fields(fields(sc.groupByFields)))
+        }
+        else if (sc.customGroupBy != null) {
+          boltDeclarer.customGrouping(fromName, new EagleCustomGrouping(sc.customGroupBy));
+        }
+        else {
+          boltDeclarer.shuffleGrouping(fromName);
+        }
+/*        sc.groupByFields match{
           case Nil => boltDeclarer.shuffleGrouping(fromName)
           case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p)))
-        }
+        }*/
         LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields)
       })
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
index fa83e6d..802107d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
@@ -70,7 +70,7 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
   def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
                dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = {
     child match {
-      case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) => {
+      case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer, strategy) => {
         /**
          * step 1: wrapper previous StreamProducer with one more field "streamName"
          * for AlertStreamSink, we check previous StreamProducer and replace that
@@ -108,7 +108,12 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
           t.setConfig(config)
           t.setGraph(dag)
           alertProducers += t
-          newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+          if (strategy == null) {
+             newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+          }
+          else {
+            newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).customGroupBy(strategy))
+          }
         })
 
         // remove AlertStreamSink

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
index 0cece47..083a5af 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
@@ -18,11 +18,19 @@
  */
 package org.apache.eagle.datastream
 
+import org.apache.eagle.partition.PartitionStrategy
+
 case class StreamConnector(from: StreamProducer, to: StreamProducer) {
   var groupByFields : Seq[Int] = Nil
+  var customGroupBy : PartitionStrategy = null
 
   def groupBy(fields : Seq[Int]) : StreamConnector = {
     groupByFields = fields
     this
   }
+
+  def customGroupBy(custom : PartitionStrategy) : StreamConnector = {
+    customGroupBy = custom
+    this
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
index 42bc9a8..caf71e3 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
@@ -43,7 +43,13 @@ class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
         child match {
           case p : GroupByProducer => {
             dag.outgoingEdgesOf(p).foreach(c2 => {
-              toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+              if (p.fields != Nil) {
+                toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+              }
+              else if (p.partitionStrategy != null) {
+                toBeAddedEdges += StreamConnector(current, c2.to).customGroupBy(p.partitionStrategy)
+              }
+              else toBeAddedEdges += StreamConnector(current, c2.to);
             })
             toBeRemovedVertex += p
           }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
index 40d4904..ae00d03 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
+import org.apache.eagle.partition.PartitionStrategy
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -103,6 +104,12 @@ trait StreamProducer{
     ret
   }
 
+  def customGroupBy(strategy : PartitionStrategy) : StreamProducer = {
+    val ret = GroupByProducer(incrementAndGetId(), strategy)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
   def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
     streamUnion(others);
   }
@@ -124,15 +131,23 @@ trait StreamProducer{
     alert(upStreamNames, alertExecutorId, false)
   }
 
-  def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) = {
-    val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer)
+  def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy : PartitionStrategy=null ) = {
+    val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer, strategy)
     hookupDAG(graph, this, ret)
   }
 
+  def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, true, strategy)
+  }
+
   def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
     alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
   }
 
+  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, false, strategy)
+  }
+
   def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
     alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
   }
@@ -170,7 +185,12 @@ case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends
 
 case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer
 
-case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer
+case class GroupByProducer(id: Int, fields : Seq[Int], partitionStrategy: PartitionStrategy) extends StreamProducer
+
+object GroupByProducer {
+  def apply(id: Int, fields: Seq[Int]) = new GroupByProducer(id, fields, null)
+  def apply(id: Int, partitionStrategy : PartitionStrategy) = new GroupByProducer(id, Nil, partitionStrategy)
+}
 
 case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer
 
@@ -187,7 +207,7 @@ case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamPr
   }
 }
 
-case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer
+case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer
 
 object UniqueId{
   val id : AtomicInteger = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
new file mode 100644
index 0000000..5c78f96
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface DataDistributionDao extends Serializable {
+
+    List<Weight> fetchDataDistribution() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
new file mode 100644
index 0000000..0614388
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
@@ -0,0 +1,29 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public interface PartitionAlgorithm extends Serializable {
+    Map<String, Integer> partition(List<Weight> weights, int k);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
new file mode 100644
index 0000000..e431f28
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
@@ -0,0 +1,27 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+
+public interface PartitionStrategy extends Serializable {
+
+    int balance(String key, int buckNum);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
new file mode 100644
index 0000000..46696a6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
@@ -0,0 +1,80 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionStrategyImpl implements PartitionStrategy {
+
+    public DataDistributionDao dao;
+    public PartitionAlgorithm algorithm;
+    public Map<String, Integer> routingTable;
+    public long lastRefreshTime;
+    public long refreshInterval;
+    public static long DEFAULT_REFRESH_INTERVAL = 60 * 60 * 1000;
+    private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
+
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval) {
+        this.dao = dao;
+        this.algorithm = algorithm;
+        this.refreshInterval = refreshInterval;
+    }
+
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
+        this(dao, algorithm, DEFAULT_REFRESH_INTERVAL);
+    }
+
+    public boolean needRefresh() {
+        if (System.currentTimeMillis() > lastRefreshTime + refreshInterval) {
+            lastRefreshTime = System.currentTimeMillis();
+            return true;
+        }
+        return false;
+    }
+
+    public Map<String, Integer> generateRoutingTable(int buckNum) {
+        try {
+            List<Weight> weights = dao.fetchDataDistribution();
+            routingTable = algorithm.partition(weights, buckNum);
+            return routingTable;
+        }
+        catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public int balance(String key, int buckNum) {
+        if (needRefresh()) {
+            LOG.info("Going to refresh routing table");
+            routingTable = generateRoutingTable(buckNum);
+            LOG.info("Finish refresh routing table");
+        }
+        if (routingTable.containsKey(key)) {
+            return routingTable.get(key);
+        }
+        else {
+            return Math.abs(key.hashCode()) % buckNum;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
new file mode 100644
index 0000000..14d005d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+public class Weight {
+    public String key;
+    public Double value;
+
+    public Weight(String key, Double value) {
+        this.key = key;
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
new file mode 100644
index 0000000..54c069d
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
@@ -0,0 +1,24 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric;
+
+public class MetricConstants {
+    public static final String GENERIC_METRIC_ENTITY_ENDPOINT = "GenericMetricService";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
index b63944d..153159c 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.eagle.metric.manager;
 
 import org.apache.eagle.metric.Metric;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
new file mode 100644
index 0000000..e68360f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
@@ -0,0 +1,29 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.client;
+
+import java.io.Serializable;
+
+public class ServiceConfig implements Serializable{
+    public String serviceHost;
+    public Integer servicePort;
+    public String username;
+    public String password;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml
index f2e78a6..31fc6a4 100644
--- a/eagle-security/eagle-metric-collection/pom.xml
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -32,6 +32,11 @@
   <dependencies>
       <dependency>
           <groupId>eagle</groupId>
+          <artifactId>eagle-metric</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
           <artifactId>eagle-security-hdfs-auditlog</artifactId>
           <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
index be6d0f7..7af5ea6 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -97,12 +97,14 @@ public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<S
 
     public void update(long currentMessageTime, String user) {
         if (eventMetrics.get(user) == null) {
-            LOG.info("Got metrics for new user: " + user);
+            LOG.info("A new user in the time interval, user: " + user + ", currentMessageTime: " + currentMessageTime);
             putNewMetric(currentMessageTime, user);
         }
         else {
             long latestMessageTime = eventMetrics.get(user).latestMessageTime;
             if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
+                LOG.info("Going to emit a user metric, user: " + user + ", currentMessageTime: " + currentMessageTime
+                        + ", latestMessageTime: " + latestMessageTime);
                 EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
                 putNewMetric(currentMessageTime, user);
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
index 5a06c82..040d08f 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
@@ -20,6 +20,7 @@
 package org.apache.eagle.metric.kafka;
 
 import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
 
 import java.io.Serializable;
 
@@ -31,13 +32,6 @@ public class KafkaOffsetCheckerConfig implements Serializable {
         public String group;
     }
 
-    public static class ServiceConfig implements Serializable{
-        public String serviceHost;
-        public Integer servicePort;
-        public String username;
-        public String password;
-    }
-
     public ZKStateConfig zkConfig;
     public KafkaConfig kafkaConfig;
     public ServiceConfig serviceConfig;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
index c794632..5fd02fd 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -20,6 +20,7 @@ import backtype.storm.topology.base.BaseRichSpout;
 import com.typesafe.config.Config;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +36,7 @@ public class KafkaOffsetSourceSpoutProvider {
 		zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
 		zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
 
-		KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new KafkaOffsetCheckerConfig.ServiceConfig();
+		ServiceConfig serviceConfig = new ServiceConfig();
 		serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
 		serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
 		serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
index d6f7298..aee817a 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -20,9 +20,6 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
 import org.apache.eagle.metric.CountingMetric;
 import org.apache.eagle.metric.Metric;
 import org.apache.eagle.metric.manager.EagleMetricReportManager;
@@ -62,8 +59,8 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 		this.baseMetricDimension.put("group", config.kafkaConfig.group);
 		String eagleServiceHost = config.serviceConfig.serviceHost;
 		Integer eagleServicePort = config.serviceConfig.servicePort;
-		String username = config.serviceConfig.serviceHost;
-		String password = config.serviceConfig.serviceHost;
+		String username = config.serviceConfig.username;
+		String password = config.serviceConfig.password;
 		EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
 		EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
 	}
@@ -77,11 +74,16 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 		return metric;
 	}
 
+	public long trimTimestamp(long currentTime, long granularity) {
+		return currentTime / granularity * granularity;
+	}
+
 	@Override
 	public void nextTuple() {
 		Long currentTime = System.currentTimeMillis();
 		if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
 			try {
+				long trimedCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
 				Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
 				Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
 				List<Metric> list = new ArrayList<>();
@@ -89,8 +91,9 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 					String partition = entry.getKey();
 					Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
 					Long lag = latestOffset.get(partitionNumber) - entry.getValue();
-					list.add(constructMetric(currentTime, partition, lag));
+					list.add(constructMetric(trimedCurrentTime, partition, lag));
 				}
+				lastRoundTime = trimedCurrentTime;
 				EagleMetricReportManager.getInstance().emit(list);
 			} catch (Exception ex) {
 				LOG.error("Got an exception, ex: ", ex);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
new file mode 100644
index 0000000..1c54a90
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+
+public class TestDataDistributionDaoImpl {
+
+    //@Test
+    public void test() throws Exception{
+        System.setProperty("config.resource", "/application.local.conf");
+        Config config = ConfigFactory.load();
+        String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        Integer eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        String topic = config.getString("dataSourceConfig.topic");
+        DataDistributionDao dao = new DataDistributionDaoImpl(eagleServiceHost, eagleServicePort, username, password, topic);
+        dao.fetchDataDistribution();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
new file mode 100644
index 0000000..cfd873a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
+import org.junit.Test;
+
+public class TestGreedyPartition {
+
+    //@Test
+    public void test() throws Exception{
+        System.setProperty("config.resource", "/application.local.conf");
+        Config config = ConfigFactory.load();
+        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        String topic = config.getString("dataSourceConfig.topic");
+        DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+        PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+        algorithm.partition(dao.fetchDataDistribution(), 4);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
index bfba783..0ce8fce 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
@@ -1,2 +1,70 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.metric.kafka.KafkaConsumerOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaLatestOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaOffsetCheckerConfig;
+import org.apache.eagle.service.client.ServiceConfig;
+import org.junit.Test;
+
+import java.util.Map;
+
 public class TestKafkaOffset {
+
+    //@Test
+    public void test() throws Exception {
+        System.setProperty("config.resource", "/application.local.conf");
+        Config config = ConfigFactory.load();
+        ZKStateConfig zkStateConfig = new ZKStateConfig();
+        zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        zkStateConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+        zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+        ServiceConfig serviceConfig = new ServiceConfig();
+        serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        serviceConfig.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+        KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new KafkaOffsetCheckerConfig.KafkaConfig();
+        kafkaConfig.kafkaEndPoints = config.getString("dataSourceConfig.kafkaEndPoints");
+        kafkaConfig.site = config.getString("dataSourceConfig.site");
+        kafkaConfig.topic = config.getString("dataSourceConfig.topic");
+        kafkaConfig.group = config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId");
+        KafkaOffsetCheckerConfig checkerConfig = new KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig);
+
+        KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(checkerConfig.zkConfig, checkerConfig.kafkaConfig.topic, checkerConfig.kafkaConfig.group);
+        KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(checkerConfig.kafkaConfig.kafkaEndPoints);
+
+        Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+        Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(checkerConfig.kafkaConfig.topic, consumedOffset.size());
+        for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+            String partition = entry.getKey();
+            Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+            Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+            System.out.println("total: " + latestOffset.get(partitionNumber) + ", consumed: " + entry.getValue() + ",lag: " + lag);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
new file mode 100644
index 0000000..eb31d39
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+public class Bucket {
+    Integer bucketNum;
+    Double value;
+
+    public Bucket(Integer bucketNum, Double value) {
+        this.bucketNum = bucketNum;
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
new file mode 100644
index 0000000..ea86f94
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
@@ -0,0 +1,36 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import java.util.Comparator;
+
+public class BucketComparator implements Comparator<Bucket> {
+
+    @Override
+    public int compare(Bucket w1, Bucket w2) {
+        if (w1.value < w2.value) {
+            return -1;
+        }
+        if (w1.value > w2.value) {
+            return 1;
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
new file mode 100644
index 0000000..631947c
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -0,0 +1,106 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import com.sun.jersey.api.client.WebResource;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metric.MetricConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.Weight;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DataDistributionDaoImpl implements DataDistributionDao {
+
+    private final Logger LOG = LoggerFactory.getLogger(DataDistributionDaoImpl.class);
+
+    private final String eagleServiceHost;
+    private final Integer eagleServicePort;
+    private String username;
+    private String password;
+    private String topic;
+
+    public DataDistributionDaoImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password, String topic) {
+        this.eagleServiceHost = eagleServiceHost;
+        this.eagleServicePort = eagleServicePort;
+        this.username = username;
+        this.password = password;
+        this.topic = topic;
+    }
+
+    @Override
+    public List<Weight> fetchDataDistribution() throws Exception {
+        IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password) {
+            @Override
+            public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
+                String queryString = request.getQueryParameterString();
+                StringBuilder sb = new StringBuilder();
+                sb.append("/list");
+                sb.append("?");
+                sb.append(queryString);
+                final String urlString = sb.toString();
+                if (!this.silence) LOG.info("Going to query service: " + this.getBaseEndpoint() + urlString);
+                WebResource r = getWebResource(urlString);
+
+                return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE))
+                        .header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
+                        .get(GenericServiceAPIResponseEntity.class);
+            }
+        };
+        try {
+            String query = MetricConstants.GENERIC_METRIC_ENTITY_ENDPOINT + "[@topic=\"" + topic + "\"]<@user>{sum(value)}.{sum(value) desc}";
+            long endTime = System.currentTimeMillis();
+            long startTime = endTime - 2 * DateUtils.MILLIS_PER_DAY;
+            GenericServiceAPIResponseEntity<Map> response = client.search()
+                    .startTime(startTime)
+                    .endTime(endTime)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .metricName("kafka.message.user.count")
+                    .send();
+            if (!response.isSuccess()) {
+                LOG.error(response.getException());
+            }
+            List<Weight> userWeights = new ArrayList<>();
+            for (Map keyValue : response.getObj()) {
+                List<String> keyList = (List)(keyValue.get("key"));
+                List<Double> valueList = (List)(keyValue.get("value"));
+                userWeights.add(new Weight(keyList.get(0), valueList.get(0)));
+            }
+            return userWeights;
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception, ex: ", ex);
+            throw new RuntimeException(ex);
+        }
+        finally {
+            client.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
new file mode 100644
index 0000000..0197393
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.Weight;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class GreedyPartitionAlgorithm implements PartitionAlgorithm {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GreedyPartitionAlgorithm.class);
+
+    public void printWeightTable(PriorityQueue<Bucket> queue) {
+        double total = 0;
+        Iterator<Bucket> iter = queue.iterator();
+        while (iter.hasNext()) {
+            total += iter.next().value;
+        }
+        StringBuilder sb = new StringBuilder();
+        iter = queue.iterator();
+        while (iter.hasNext()) {
+            sb.append(iter.next().value / total + ",");
+        }
+        sb.deleteCharAt(sb.length()-1);
+        LOG.info("Weights: " + sb.toString());
+    }
+
+    public HashMap<String, Integer> partition(List<Weight> weights, int k) {
+        PriorityQueue<Bucket> queue = new PriorityQueue<>(k, new BucketComparator());
+        HashMap<String, Integer> ret = new HashMap<>();
+        // Initialize the queue
+        for (int i = 0; i < k; i++) {
+            queue.add(new Bucket(i, 0.0));
+        }
+        int n = weights.size();
+        for (int i = 0; i < n; i++) {
+            Bucket bucket = queue.poll();
+            bucket.value = bucket.value + weights.get(i).value;
+            queue.add(bucket);
+            ret.put(weights.get(i).key, bucket.bucketNum);
+        }
+        printWeightTable(queue);
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index 327cb8d..8e6d5d7 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -20,13 +20,19 @@ package org.apache.eagle.security.auditlog;
 
 import backtype.storm.spout.SchemeAsMultiScheme;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
 import org.apache.eagle.dataproc.util.ConfigOptionParser;
 import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
 import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.partition.PartitionStrategyImpl;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,33 +43,68 @@ import java.util.Map;
 public class HdfsAuditLogProcessorMain {
 	private static final Logger LOG = LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
 
-	public static void main(String[] args) throws Exception{
-        Config config = new ConfigOptionParser().load(args);
-
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+    public static PartitionStrategy createStrategy(Config config) {
+        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        String topic = config.getString("dataSourceConfig.topic");
+        DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+        PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm);
+        return strategy;
+    }
 
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+    public static KafkaSourcedSpoutProvider createProvider(Config config) {
+         String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+         final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+                 @Override
+                 public List<Object> deserialize(byte[] ser) {
+                         Object tmp = deserializer.deserialize(ser);
+                         Map<String, Object> map = (Map<String, Object>)tmp;
+                         if(tmp == null) return null;
+                         return Arrays.asList(map.get("user"), tmp);
+                 }
+         };
+         KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
+                 @Override
+                 public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
+                         return new SchemeAsMultiScheme(scheme);
+                  }
+         };
+         return provider;
+    }
 
-        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
-        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-                @Override
-                public List<Object> deserialize(byte[] ser) {
-                        Object tmp = deserializer.deserialize(ser);
-                        Map<String, Object> map = (Map<String, Object>)tmp;
-                        if(tmp == null) return null;
-                        return Arrays.asList(map.get("user"), tmp);
-                }
-        };
-        KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
-                public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-                        return new SchemeAsMultiScheme(scheme);
-                }
-        };
+    public static void execWithDefaultPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
         env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0))
                 .flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
                 .flatMap(new IPZoneDataJoinExecutor())
                 .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
         env.execute();
+    }
+
+    public static void execWithBalancedPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+        PartitionStrategy strategy = createStrategy(config);
+        env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy)
+                .flatMap(new FileSensitivityDataJoinExecutor()).customGroupBy(strategy)
+                .flatMap(new IPZoneDataJoinExecutor())
+                .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor", strategy);
+        env.execute();
+    }
+
+	public static void main(String[] args) throws Exception{
+        Config config = new ConfigOptionParser().load(args);
+        LOG.info("Config class: " + config.getClass().getCanonicalName());
+        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+
+        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+        KafkaSourcedSpoutProvider provider = createProvider(config);
+        Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") ? config.getBoolean("eagleProps.balancePartitionEnabled") : false;
+        if (balancePartition) {
+            execWithBalancedPartition(config, env, provider);
+        }
+        else {
+            execWithDefaultPartition(config, env, provider);
+        }
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index de146f2..3b678a3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -50,6 +50,7 @@
     "mailHost" : "mailHost.com",
     "mailSmtpPort":"25",
     "mailDebug" : "true",
+    "balancePartitionEnabled" : "true",
     "eagleService": {
       "host": "localhost",
       "port": 38080,


[3/5] incubator-eagle git commit: fix a typo in fetching alertConfigs

Posted by ha...@apache.org.
fix a typo in fetching alertConfigs


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/58a9555e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/58a9555e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/58a9555e

Branch: refs/heads/master
Commit: 58a9555e28894e707d0dad528758dceb2e08d20f
Parents: 3de5cce
Author: sunlibin <ab...@gmail.com>
Authored: Mon Nov 30 13:55:21 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 13:55:21 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/eagle/executor/AlertExecutorCreationUtils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58a9555e/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
index c073939..6f7b69a 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
@@ -102,7 +102,7 @@ public class AlertExecutorCreationUtils {
         String alertExecutorConfigsKey = "alertExecutorConfigs";
         if(config.hasPath(alertExecutorConfigsKey)) {
             Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey);
-            if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorConfigs)) {
+            if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorId)) {
                 Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
                 int parts = 0;
                 if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));


[5/5] incubator-eagle git commit: [EAGLE-2] Add eagle offline metric collection topology and do online balance partition based on the statistic metric

Posted by ha...@apache.org.
[EAGLE-2] Add eagle offline metric collection topology and do online balance partition based on the statistic metric

JIRA: [EAGLE-2][EAGLE-24][EAGLE-50][EAGLE-52]
Author: sunlibin <ab...@gmail.com>

Closes #8 from sunlibin:Eagle-Metric-And-Balance-Partition


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3ee73e8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3ee73e8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3ee73e8d

Branch: refs/heads/master
Commit: 3ee73e8d5953b1c51d3c8c7d2af24ff417ca57f5
Parents: 91aa216 b11a223
Author: Hao Chen <ha...@apache.org>
Authored: Thu Dec 3 11:26:44 2015 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Dec 3 11:26:44 2015 +0800

----------------------------------------------------------------------
 .../apache/eagle/executor/AlertExecutor.java    |  10 +-
 .../executor/AlertExecutorCreationUtils.java    |   2 +-
 .../config/RunningJobCrawlConfig.java           |  14 +-
 .../storm/kafka/KafkaSourcedSpoutProvider.java  |   4 -
 .../storm/kafka/KafkaSourcedSpoutScheme.java    |   1 -
 .../storm/partition/EagleCustomGrouping.java    |  51 +++++++
 .../impl/storm/zookeeper/ZKStateConfig.java     |  28 ++++
 .../datastream/StormTopologyCompiler.scala      |  14 +-
 .../eagle/datastream/StreamAlertExpansion.scala |   9 +-
 .../eagle/datastream/StreamConnector.scala      |   8 ++
 .../datastream/StreamGroupbyExpansion.scala     |   8 +-
 .../eagle/datastream/StreamProducer.scala       |  32 ++++-
 .../eagle/partition/DataDistributionDao.java    |  28 ++++
 .../eagle/partition/PartitionAlgorithm.java     |  29 ++++
 .../eagle/partition/PartitionStrategy.java      |  27 ++++
 .../eagle/partition/PartitionStrategyImpl.java  |  86 ++++++++++++
 .../java/org/apache/eagle/partition/Weight.java |  30 +++++
 .../org/apache/eagle/metric/CountingMetric.java |   8 +-
 .../java/org/apache/eagle/metric/Metric.java    |   5 +-
 .../apache/eagle/metric/MetricConstants.java    |  24 ++++
 .../manager/EagleMetricReportManager.java       |  61 +++++++++
 .../metric/report/EagleSerivceMetricReport.java |  61 ---------
 .../metric/report/EagleServiceMetricReport.java |  60 +++++++++
 .../metric/report/MetricEntityConvert.java      |   2 +-
 .../eagle/metric/report/MetricReport.java       |   4 +-
 .../eagle/service/client/ServiceConfig.java     |  29 ++++
 eagle-security/eagle-metric-collection/pom.xml  | 100 ++++++++++++++
 .../metric/kafka/EagleMetricCollectorMain.java  | 127 ++++++++++++++++++
 .../eagle/metric/kafka/KafkaConsumerOffset.java |  27 ++++
 .../kafka/KafkaConsumerOffsetFetcher.java       |  70 ++++++++++
 .../metric/kafka/KafkaLatestOffsetFetcher.java  |  98 ++++++++++++++
 .../kafka/KafkaMessageDistributionExecutor.java | 128 ++++++++++++++++++
 .../metric/kafka/KafkaOffsetCheckerConfig.java  |  44 ++++++
 .../kafka/KafkaOffsetSourceSpoutProvider.java   |  54 ++++++++
 .../eagle/metric/kafka/KafkaOffsetSpout.java    | 134 +++++++++++++++++++
 .../src/main/resources/application.conf         |  39 ++++++
 .../src/main/resources/log4j.properties         |  39 ++++++
 .../test/java/TestDataDistributionDaoImpl.java  |  41 ++++++
 .../src/test/java/TestGreedyPartition.java      |  45 +++++++
 .../src/test/java/TestKafkaOffset.java          |  70 ++++++++++
 .../apache/eagle/security/partition/Bucket.java |  30 +++++
 .../security/partition/BucketComparator.java    |  36 +++++
 .../partition/DataDistributionDaoImpl.java      | 104 ++++++++++++++
 .../partition/GreedyPartitionAlgorithm.java     |  65 +++++++++
 .../auditlog/HdfsAuditLogProcessorMain.java     |  91 +++++++++----
 .../src/main/resources/application.conf         |   3 +
 ...HiveJobRunningSourcedStormSpoutProvider.java |   2 +-
 eagle-security/pom.xml                          |   1 +
 eagle-topology-assembly/pom.xml                 |   5 +
 49 files changed, 1865 insertions(+), 123 deletions(-)
----------------------------------------------------------------------



[4/5] incubator-eagle git commit: modify some interfaces and constructors

Posted by ha...@apache.org.
modify some interfaces and constructors


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b11a223a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b11a223a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b11a223a

Branch: refs/heads/master
Commit: b11a223a452b6a11d4a111175ca26948663d17ef
Parents: 58a9555
Author: sunlibin <ab...@gmail.com>
Authored: Mon Nov 30 18:09:06 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Wed Dec 2 14:54:29 2015 +0800

----------------------------------------------------------------------
 .../apache/eagle/partition/DataDistributionDao.java   |  2 +-
 .../apache/eagle/partition/PartitionStrategyImpl.java | 14 ++++++++++----
 .../eagle/metric/kafka/EagleMetricCollectorMain.java  |  2 +-
 .../src/test/java/TestDataDistributionDaoImpl.java    |  3 ++-
 .../src/test/java/TestGreedyPartition.java            |  3 ++-
 .../security/partition/DataDistributionDaoImpl.java   |  4 +---
 .../security/auditlog/HdfsAuditLogProcessorMain.java  |  7 ++++++-
 .../src/main/resources/application.conf               |  4 +++-
 8 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
index 5c78f96..0b17775 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
@@ -24,5 +24,5 @@ import java.util.List;
 
 public interface DataDistributionDao extends Serializable {
 
-    List<Weight> fetchDataDistribution() throws Exception;
+    List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
index 46696a6..eacefd5 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
@@ -19,8 +19,10 @@
 
 package org.apache.eagle.partition;
 
+import org.apache.commons.lang3.time.DateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.List;
 import java.util.Map;
 
@@ -31,17 +33,20 @@ public class PartitionStrategyImpl implements PartitionStrategy {
     public Map<String, Integer> routingTable;
     public long lastRefreshTime;
     public long refreshInterval;
-    public static long DEFAULT_REFRESH_INTERVAL = 60 * 60 * 1000;
+    public long timeRange;
+    public static long DEFAULT_TIME_RANGE = 2 * DateUtils.MILLIS_PER_DAY;
+    public static long DEFAULT_REFRESH_INTERVAL = 2 * DateUtils.MILLIS_PER_HOUR;
     private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
 
-    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval) {
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval, long timeRange) {
         this.dao = dao;
         this.algorithm = algorithm;
         this.refreshInterval = refreshInterval;
+        this.timeRange = timeRange;
     }
 
     public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
-        this(dao, algorithm, DEFAULT_REFRESH_INTERVAL);
+        this(dao, algorithm, DEFAULT_REFRESH_INTERVAL, DEFAULT_TIME_RANGE);
     }
 
     public boolean needRefresh() {
@@ -54,7 +59,8 @@ public class PartitionStrategyImpl implements PartitionStrategy {
 
     public Map<String, Integer> generateRoutingTable(int buckNum) {
         try {
-            List<Weight> weights = dao.fetchDataDistribution();
+            long currentTime = System.currentTimeMillis();
+            List<Weight> weights = dao.fetchDataDistribution(currentTime - timeRange, currentTime);
             routingTable = algorithm.partition(weights, buckNum);
             return routingTable;
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
index 65fe68a..218b812 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -120,7 +120,7 @@ public class EagleMetricCollectorMain {
         };
 
         env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
-        env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageDistributionCheck").groupBy(Arrays.asList(0))
+        env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageFetcher").groupBy(Arrays.asList(0))
                 .flatMap(new KafkaMessageDistributionExecutor());
         env.execute();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
index 1c54a90..4d82085 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
@@ -19,6 +19,7 @@
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.partition.DataDistributionDao;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
@@ -35,6 +36,6 @@ public class TestDataDistributionDaoImpl {
         String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
         String topic = config.getString("dataSourceConfig.topic");
         DataDistributionDao dao = new DataDistributionDaoImpl(eagleServiceHost, eagleServicePort, username, password, topic);
-        dao.fetchDataDistribution();
+        dao.fetchDataDistribution(System.currentTimeMillis() - 2 * DateUtils.MILLIS_PER_DAY, System.currentTimeMillis());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
index cfd873a..f3e1cf8 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
@@ -19,6 +19,7 @@
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.partition.DataDistributionDao;
 import org.apache.eagle.partition.PartitionAlgorithm;
@@ -39,6 +40,6 @@ public class TestGreedyPartition {
         String topic = config.getString("dataSourceConfig.topic");
         DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
         PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
-        algorithm.partition(dao.fetchDataDistribution(), 4);
+        algorithm.partition(dao.fetchDataDistribution(System.currentTimeMillis() - 2 * DateUtils.MILLIS_PER_DAY, System.currentTimeMillis()), 4);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index 631947c..e808502 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -55,7 +55,7 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
     }
 
     @Override
-    public List<Weight> fetchDataDistribution() throws Exception {
+    public List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception {
         IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password) {
             @Override
             public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
@@ -75,8 +75,6 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
         };
         try {
             String query = MetricConstants.GENERIC_METRIC_ENTITY_ENDPOINT + "[@topic=\"" + topic + "\"]<@user>{sum(value)}.{sum(value) desc}";
-            long endTime = System.currentTimeMillis();
-            long startTime = endTime - 2 * DateUtils.MILLIS_PER_DAY;
             GenericServiceAPIResponseEntity<Map> response = client.search()
                     .startTime(startTime)
                     .endTime(endTime)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index 8e6d5d7..fda41d3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -21,6 +21,7 @@ package org.apache.eagle.security.auditlog;
 import backtype.storm.spout.SchemeAsMultiScheme;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
@@ -51,7 +52,11 @@ public class HdfsAuditLogProcessorMain {
         String topic = config.getString("dataSourceConfig.topic");
         DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
         PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
-        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm);
+        String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
+        Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
+        String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
+        Integer kafkaStatisticRangeInMin =  config.hasPath(key2) ? config.getInt(key2) : 60;
+        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
         return strategy;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 3b678a3..43e5b58 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -50,7 +50,9 @@
     "mailHost" : "mailHost.com",
     "mailSmtpPort":"25",
     "mailDebug" : "true",
-    "balancePartitionEnabled" : "true",
+    "balancePartitionEnabled" : true,
+    #"partitionRefreshIntervalInMin" : 60,
+    #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
       "port": 38080,