You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/12/03 04:29:00 UTC

[2/5] incubator-eagle git commit: balance events partition based on greedy parition algorithm

balance events partition based on greedy parition algorithm


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3de5cce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3de5cce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3de5cce6

Branch: refs/heads/master
Commit: 3de5cce698e2da3db223c468d28388ae5189a2e2
Parents: 34fa6d9
Author: sunlibin <ab...@gmail.com>
Authored: Thu Nov 26 18:45:46 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 13:38:14 2015 +0800

----------------------------------------------------------------------
 .../storm/partition/EagleCustomGrouping.java    |  51 +++++++++
 .../datastream/StormTopologyCompiler.scala      |  14 ++-
 .../eagle/datastream/StreamAlertExpansion.scala |   9 +-
 .../eagle/datastream/StreamConnector.scala      |   8 ++
 .../datastream/StreamGroupbyExpansion.scala     |   8 +-
 .../eagle/datastream/StreamProducer.scala       |  28 ++++-
 .../eagle/partition/DataDistributionDao.java    |  28 +++++
 .../eagle/partition/PartitionAlgorithm.java     |  29 +++++
 .../eagle/partition/PartitionStrategy.java      |  27 +++++
 .../eagle/partition/PartitionStrategyImpl.java  |  80 ++++++++++++++
 .../java/org/apache/eagle/partition/Weight.java |  30 ++++++
 .../apache/eagle/metric/MetricConstants.java    |  24 +++++
 .../manager/EagleMetricReportManager.java       |  16 +++
 .../eagle/service/client/ServiceConfig.java     |  29 +++++
 eagle-security/eagle-metric-collection/pom.xml  |   5 +
 .../kafka/KafkaMessageDistributionExecutor.java |   4 +-
 .../metric/kafka/KafkaOffsetCheckerConfig.java  |   8 +-
 .../kafka/KafkaOffsetSourceSpoutProvider.java   |   3 +-
 .../eagle/metric/kafka/KafkaOffsetSpout.java    |  15 +--
 .../test/java/TestDataDistributionDaoImpl.java  |  40 +++++++
 .../src/test/java/TestGreedyPartition.java      |  44 ++++++++
 .../src/test/java/TestKafkaOffset.java          |  68 ++++++++++++
 .../apache/eagle/security/partition/Bucket.java |  30 ++++++
 .../security/partition/BucketComparator.java    |  36 +++++++
 .../partition/DataDistributionDaoImpl.java      | 106 +++++++++++++++++++
 .../partition/GreedyPartitionAlgorithm.java     |  65 ++++++++++++
 .../auditlog/HdfsAuditLogProcessorMain.java     |  85 +++++++++++----
 .../src/main/resources/application.conf         |   1 +
 28 files changed, 845 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
new file mode 100644
index 0000000..96e42b7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
@@ -0,0 +1,51 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import org.apache.eagle.partition.PartitionStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class EagleCustomGrouping implements CustomStreamGrouping {
+
+    public List<Integer> targetTasks;
+    public PartitionStrategy strategy;
+
+    public EagleCustomGrouping(PartitionStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        this.targetTasks = new ArrayList<>(targetTasks);
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        int numTasks = targetTasks.size();
+        int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
+        return Arrays.asList(targetTasks.get(targetTaskIndex));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
index dbe69d2..4f9fccc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseRichBolt
 import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
 import backtype.storm.tuple.Fields
 import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.storm.partition.EagleCustomGrouping
 import org.slf4j.LoggerFactory
 
 case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
@@ -63,10 +64,19 @@ case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGr
           }
           case Some(bt) => boltDeclarer = bt
         }
-        sc.groupByFields match{
+        if (sc.groupByFields != Nil) {
+          boltDeclarer.fieldsGrouping(fromName, new Fields(fields(sc.groupByFields)))
+        }
+        else if (sc.customGroupBy != null) {
+          boltDeclarer.customGrouping(fromName, new EagleCustomGrouping(sc.customGroupBy));
+        }
+        else {
+          boltDeclarer.shuffleGrouping(fromName);
+        }
+/*        sc.groupByFields match{
           case Nil => boltDeclarer.shuffleGrouping(fromName)
           case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p)))
-        }
+        }*/
         LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields)
       })
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
index fa83e6d..802107d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
@@ -70,7 +70,7 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
   def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
                dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = {
     child match {
-      case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) => {
+      case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer, strategy) => {
         /**
          * step 1: wrapper previous StreamProducer with one more field "streamName"
          * for AlertStreamSink, we check previous StreamProducer and replace that
@@ -108,7 +108,12 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
           t.setConfig(config)
           t.setGraph(dag)
           alertProducers += t
-          newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+          if (strategy == null) {
+             newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+          }
+          else {
+            newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).customGroupBy(strategy))
+          }
         })
 
         // remove AlertStreamSink

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
index 0cece47..083a5af 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
@@ -18,11 +18,19 @@
  */
 package org.apache.eagle.datastream
 
+import org.apache.eagle.partition.PartitionStrategy
+
 case class StreamConnector(from: StreamProducer, to: StreamProducer) {
   var groupByFields : Seq[Int] = Nil
+  var customGroupBy : PartitionStrategy = null
 
   def groupBy(fields : Seq[Int]) : StreamConnector = {
     groupByFields = fields
     this
   }
+
+  def customGroupBy(custom : PartitionStrategy) : StreamConnector = {
+    customGroupBy = custom
+    this
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
index 42bc9a8..caf71e3 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
@@ -43,7 +43,13 @@ class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
         child match {
           case p : GroupByProducer => {
             dag.outgoingEdgesOf(p).foreach(c2 => {
-              toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+              if (p.fields != Nil) {
+                toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+              }
+              else if (p.partitionStrategy != null) {
+                toBeAddedEdges += StreamConnector(current, c2.to).customGroupBy(p.partitionStrategy)
+              }
+              else toBeAddedEdges += StreamConnector(current, c2.to);
             })
             toBeRemovedVertex += p
           }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
index 40d4904..ae00d03 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
+import org.apache.eagle.partition.PartitionStrategy
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -103,6 +104,12 @@ trait StreamProducer{
     ret
   }
 
+  def customGroupBy(strategy : PartitionStrategy) : StreamProducer = {
+    val ret = GroupByProducer(incrementAndGetId(), strategy)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
   def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
     streamUnion(others);
   }
@@ -124,15 +131,23 @@ trait StreamProducer{
     alert(upStreamNames, alertExecutorId, false)
   }
 
-  def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) = {
-    val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer)
+  def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy : PartitionStrategy=null ) = {
+    val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer, strategy)
     hookupDAG(graph, this, ret)
   }
 
+  def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, true, strategy)
+  }
+
   def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
     alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
   }
 
+  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, false, strategy)
+  }
+
   def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
     alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
   }
@@ -170,7 +185,12 @@ case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends
 
 case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer
 
-case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer
+case class GroupByProducer(id: Int, fields : Seq[Int], partitionStrategy: PartitionStrategy) extends StreamProducer
+
+object GroupByProducer {
+  def apply(id: Int, fields: Seq[Int]) = new GroupByProducer(id, fields, null)
+  def apply(id: Int, partitionStrategy : PartitionStrategy) = new GroupByProducer(id, Nil, partitionStrategy)
+}
 
 case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer
 
@@ -187,7 +207,7 @@ case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamPr
   }
 }
 
-case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer
+case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer
 
 object UniqueId{
   val id : AtomicInteger = new AtomicInteger(0);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
new file mode 100644
index 0000000..5c78f96
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface DataDistributionDao extends Serializable {
+
+    List<Weight> fetchDataDistribution() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
new file mode 100644
index 0000000..0614388
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
@@ -0,0 +1,29 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public interface PartitionAlgorithm extends Serializable {
+    Map<String, Integer> partition(List<Weight> weights, int k);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
new file mode 100644
index 0000000..e431f28
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
@@ -0,0 +1,27 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+
+public interface PartitionStrategy extends Serializable {
+
+    int balance(String key, int buckNum);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
new file mode 100644
index 0000000..46696a6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
@@ -0,0 +1,80 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionStrategyImpl implements PartitionStrategy {
+
+    public DataDistributionDao dao;
+    public PartitionAlgorithm algorithm;
+    public Map<String, Integer> routingTable;
+    public long lastRefreshTime;
+    public long refreshInterval;
+    public static long DEFAULT_REFRESH_INTERVAL = 60 * 60 * 1000;
+    private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
+
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval) {
+        this.dao = dao;
+        this.algorithm = algorithm;
+        this.refreshInterval = refreshInterval;
+    }
+
+    public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
+        this(dao, algorithm, DEFAULT_REFRESH_INTERVAL);
+    }
+
+    public boolean needRefresh() {
+        if (System.currentTimeMillis() > lastRefreshTime + refreshInterval) {
+            lastRefreshTime = System.currentTimeMillis();
+            return true;
+        }
+        return false;
+    }
+
+    public Map<String, Integer> generateRoutingTable(int buckNum) {
+        try {
+            List<Weight> weights = dao.fetchDataDistribution();
+            routingTable = algorithm.partition(weights, buckNum);
+            return routingTable;
+        }
+        catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public int balance(String key, int buckNum) {
+        if (needRefresh()) {
+            LOG.info("Going to refresh routing table");
+            routingTable = generateRoutingTable(buckNum);
+            LOG.info("Finish refresh routing table");
+        }
+        if (routingTable.containsKey(key)) {
+            return routingTable.get(key);
+        }
+        else {
+            return Math.abs(key.hashCode()) % buckNum;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
new file mode 100644
index 0000000..14d005d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+public class Weight {
+    public String key;
+    public Double value;
+
+    public Weight(String key, Double value) {
+        this.key = key;
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
new file mode 100644
index 0000000..54c069d
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
@@ -0,0 +1,24 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric;
+
+public class MetricConstants {
+    public static final String GENERIC_METRIC_ENTITY_ENDPOINT = "GenericMetricService";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
index b63944d..153159c 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.eagle.metric.manager;
 
 import org.apache.eagle.metric.Metric;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
new file mode 100644
index 0000000..e68360f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
@@ -0,0 +1,29 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.client;
+
+import java.io.Serializable;
+
+public class ServiceConfig implements Serializable{
+    public String serviceHost;
+    public Integer servicePort;
+    public String username;
+    public String password;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml
index f2e78a6..31fc6a4 100644
--- a/eagle-security/eagle-metric-collection/pom.xml
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -32,6 +32,11 @@
   <dependencies>
       <dependency>
           <groupId>eagle</groupId>
+          <artifactId>eagle-metric</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
           <artifactId>eagle-security-hdfs-auditlog</artifactId>
           <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
index be6d0f7..7af5ea6 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -97,12 +97,14 @@ public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<S
 
     public void update(long currentMessageTime, String user) {
         if (eventMetrics.get(user) == null) {
-            LOG.info("Got metrics for new user: " + user);
+            LOG.info("A new user in the time interval, user: " + user + ", currentMessageTime: " + currentMessageTime);
             putNewMetric(currentMessageTime, user);
         }
         else {
             long latestMessageTime = eventMetrics.get(user).latestMessageTime;
             if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
+                LOG.info("Going to emit a user metric, user: " + user + ", currentMessageTime: " + currentMessageTime
+                        + ", latestMessageTime: " + latestMessageTime);
                 EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
                 putNewMetric(currentMessageTime, user);
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
index 5a06c82..040d08f 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
@@ -20,6 +20,7 @@
 package org.apache.eagle.metric.kafka;
 
 import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
 
 import java.io.Serializable;
 
@@ -31,13 +32,6 @@ public class KafkaOffsetCheckerConfig implements Serializable {
         public String group;
     }
 
-    public static class ServiceConfig implements Serializable{
-        public String serviceHost;
-        public Integer servicePort;
-        public String username;
-        public String password;
-    }
-
     public ZKStateConfig zkConfig;
     public KafkaConfig kafkaConfig;
     public ServiceConfig serviceConfig;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
index c794632..5fd02fd 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -20,6 +20,7 @@ import backtype.storm.topology.base.BaseRichSpout;
 import com.typesafe.config.Config;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +36,7 @@ public class KafkaOffsetSourceSpoutProvider {
 		zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
 		zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
 
-		KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new KafkaOffsetCheckerConfig.ServiceConfig();
+		ServiceConfig serviceConfig = new ServiceConfig();
 		serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
 		serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
 		serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
index d6f7298..aee817a 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -20,9 +20,6 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
 import org.apache.eagle.metric.CountingMetric;
 import org.apache.eagle.metric.Metric;
 import org.apache.eagle.metric.manager.EagleMetricReportManager;
@@ -62,8 +59,8 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 		this.baseMetricDimension.put("group", config.kafkaConfig.group);
 		String eagleServiceHost = config.serviceConfig.serviceHost;
 		Integer eagleServicePort = config.serviceConfig.servicePort;
-		String username = config.serviceConfig.serviceHost;
-		String password = config.serviceConfig.serviceHost;
+		String username = config.serviceConfig.username;
+		String password = config.serviceConfig.password;
 		EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
 		EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
 	}
@@ -77,11 +74,16 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 		return metric;
 	}
 
+	public long trimTimestamp(long currentTime, long granularity) {
+		return currentTime / granularity * granularity;
+	}
+
 	@Override
 	public void nextTuple() {
 		Long currentTime = System.currentTimeMillis();
 		if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
 			try {
+				long trimedCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
 				Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
 				Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
 				List<Metric> list = new ArrayList<>();
@@ -89,8 +91,9 @@ public class KafkaOffsetSpout extends BaseRichSpout {
 					String partition = entry.getKey();
 					Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
 					Long lag = latestOffset.get(partitionNumber) - entry.getValue();
-					list.add(constructMetric(currentTime, partition, lag));
+					list.add(constructMetric(trimedCurrentTime, partition, lag));
 				}
+				lastRoundTime = trimedCurrentTime;
 				EagleMetricReportManager.getInstance().emit(list);
 			} catch (Exception ex) {
 				LOG.error("Got an exception, ex: ", ex);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
new file mode 100644
index 0000000..1c54a90
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+
+public class TestDataDistributionDaoImpl {
+
+    //@Test
+    public void test() throws Exception{
+        System.setProperty("config.resource", "/application.local.conf");
+        Config config = ConfigFactory.load();
+        String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        Integer eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        String topic = config.getString("dataSourceConfig.topic");
+        DataDistributionDao dao = new DataDistributionDaoImpl(eagleServiceHost, eagleServicePort, username, password, topic);
+        dao.fetchDataDistribution();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
new file mode 100644
index 0000000..cfd873a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
+import org.junit.Test;
+
+public class TestGreedyPartition {
+
+    //@Test
+    public void test() throws Exception{
+        System.setProperty("config.resource", "/application.local.conf");
+        Config config = ConfigFactory.load();
+        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        String topic = config.getString("dataSourceConfig.topic");
+        DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+        PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+        algorithm.partition(dao.fetchDataDistribution(), 4);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
index bfba783..0ce8fce 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
@@ -1,2 +1,70 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.metric.kafka.KafkaConsumerOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaLatestOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaOffsetCheckerConfig;
+import org.apache.eagle.service.client.ServiceConfig;
+import org.junit.Test;
+
+import java.util.Map;
+
 public class TestKafkaOffset {
+
+    //@Test
+    public void test() throws Exception {
+        System.setProperty("config.resource", "/application.local.conf");
+        Config config = ConfigFactory.load();
+        ZKStateConfig zkStateConfig = new ZKStateConfig();
+        zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        zkStateConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+        zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+        ServiceConfig serviceConfig = new ServiceConfig();
+        serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        serviceConfig.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+        KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new KafkaOffsetCheckerConfig.KafkaConfig();
+        kafkaConfig.kafkaEndPoints = config.getString("dataSourceConfig.kafkaEndPoints");
+        kafkaConfig.site = config.getString("dataSourceConfig.site");
+        kafkaConfig.topic = config.getString("dataSourceConfig.topic");
+        kafkaConfig.group = config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId");
+        KafkaOffsetCheckerConfig checkerConfig = new KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig);
+
+        KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(checkerConfig.zkConfig, checkerConfig.kafkaConfig.topic, checkerConfig.kafkaConfig.group);
+        KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(checkerConfig.kafkaConfig.kafkaEndPoints);
+
+        Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+        Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(checkerConfig.kafkaConfig.topic, consumedOffset.size());
+        for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+            String partition = entry.getKey();
+            Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+            Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+            System.out.println("total: " + latestOffset.get(partitionNumber) + ", consumed: " + entry.getValue() + ",lag: " + lag);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
new file mode 100644
index 0000000..eb31d39
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+public class Bucket {
+    Integer bucketNum;
+    Double value;
+
+    public Bucket(Integer bucketNum, Double value) {
+        this.bucketNum = bucketNum;
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
new file mode 100644
index 0000000..ea86f94
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
@@ -0,0 +1,36 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import java.util.Comparator;
+
+public class BucketComparator implements Comparator<Bucket> {
+
+    @Override
+    public int compare(Bucket w1, Bucket w2) {
+        if (w1.value < w2.value) {
+            return -1;
+        }
+        if (w1.value > w2.value) {
+            return 1;
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
new file mode 100644
index 0000000..631947c
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -0,0 +1,106 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import com.sun.jersey.api.client.WebResource;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metric.MetricConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.Weight;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DataDistributionDaoImpl implements DataDistributionDao {
+
+    private final Logger LOG = LoggerFactory.getLogger(DataDistributionDaoImpl.class);
+
+    private final String eagleServiceHost;
+    private final Integer eagleServicePort;
+    private String username;
+    private String password;
+    private String topic;
+
+    public DataDistributionDaoImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password, String topic) {
+        this.eagleServiceHost = eagleServiceHost;
+        this.eagleServicePort = eagleServicePort;
+        this.username = username;
+        this.password = password;
+        this.topic = topic;
+    }
+
+    @Override
+    public List<Weight> fetchDataDistribution() throws Exception {
+        IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password) {
+            @Override
+            public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
+                String queryString = request.getQueryParameterString();
+                StringBuilder sb = new StringBuilder();
+                sb.append("/list");
+                sb.append("?");
+                sb.append(queryString);
+                final String urlString = sb.toString();
+                if (!this.silence) LOG.info("Going to query service: " + this.getBaseEndpoint() + urlString);
+                WebResource r = getWebResource(urlString);
+
+                return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE))
+                        .header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
+                        .get(GenericServiceAPIResponseEntity.class);
+            }
+        };
+        try {
+            String query = MetricConstants.GENERIC_METRIC_ENTITY_ENDPOINT + "[@topic=\"" + topic + "\"]<@user>{sum(value)}.{sum(value) desc}";
+            long endTime = System.currentTimeMillis();
+            long startTime = endTime - 2 * DateUtils.MILLIS_PER_DAY;
+            GenericServiceAPIResponseEntity<Map> response = client.search()
+                    .startTime(startTime)
+                    .endTime(endTime)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .metricName("kafka.message.user.count")
+                    .send();
+            if (!response.isSuccess()) {
+                LOG.error(response.getException());
+            }
+            List<Weight> userWeights = new ArrayList<>();
+            for (Map keyValue : response.getObj()) {
+                List<String> keyList = (List)(keyValue.get("key"));
+                List<Double> valueList = (List)(keyValue.get("value"));
+                userWeights.add(new Weight(keyList.get(0), valueList.get(0)));
+            }
+            return userWeights;
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception, ex: ", ex);
+            throw new RuntimeException(ex);
+        }
+        finally {
+            client.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
new file mode 100644
index 0000000..0197393
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.Weight;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class GreedyPartitionAlgorithm implements PartitionAlgorithm {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GreedyPartitionAlgorithm.class);
+
+    public void printWeightTable(PriorityQueue<Bucket> queue) {
+        double total = 0;
+        Iterator<Bucket> iter = queue.iterator();
+        while (iter.hasNext()) {
+            total += iter.next().value;
+        }
+        StringBuilder sb = new StringBuilder();
+        iter = queue.iterator();
+        while (iter.hasNext()) {
+            sb.append(iter.next().value / total + ",");
+        }
+        sb.deleteCharAt(sb.length()-1);
+        LOG.info("Weights: " + sb.toString());
+    }
+
+    public HashMap<String, Integer> partition(List<Weight> weights, int k) {
+        PriorityQueue<Bucket> queue = new PriorityQueue<>(k, new BucketComparator());
+        HashMap<String, Integer> ret = new HashMap<>();
+        // Initialize the queue
+        for (int i = 0; i < k; i++) {
+            queue.add(new Bucket(i, 0.0));
+        }
+        int n = weights.size();
+        for (int i = 0; i < n; i++) {
+            Bucket bucket = queue.poll();
+            bucket.value = bucket.value + weights.get(i).value;
+            queue.add(bucket);
+            ret.put(weights.get(i).key, bucket.bucketNum);
+        }
+        printWeightTable(queue);
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index 327cb8d..8e6d5d7 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -20,13 +20,19 @@ package org.apache.eagle.security.auditlog;
 
 import backtype.storm.spout.SchemeAsMultiScheme;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
 import org.apache.eagle.dataproc.util.ConfigOptionParser;
 import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
 import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.partition.PartitionStrategyImpl;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,33 +43,68 @@ import java.util.Map;
 public class HdfsAuditLogProcessorMain {
 	private static final Logger LOG = LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
 
-	public static void main(String[] args) throws Exception{
-        Config config = new ConfigOptionParser().load(args);
-
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+    public static PartitionStrategy createStrategy(Config config) {
+        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        String topic = config.getString("dataSourceConfig.topic");
+        DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+        PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm);
+        return strategy;
+    }
 
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+    public static KafkaSourcedSpoutProvider createProvider(Config config) {
+         String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+         final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+                 @Override
+                 public List<Object> deserialize(byte[] ser) {
+                         Object tmp = deserializer.deserialize(ser);
+                         Map<String, Object> map = (Map<String, Object>)tmp;
+                         if(tmp == null) return null;
+                         return Arrays.asList(map.get("user"), tmp);
+                 }
+         };
+         KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
+                 @Override
+                 public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
+                         return new SchemeAsMultiScheme(scheme);
+                  }
+         };
+         return provider;
+    }
 
-        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
-        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-                @Override
-                public List<Object> deserialize(byte[] ser) {
-                        Object tmp = deserializer.deserialize(ser);
-                        Map<String, Object> map = (Map<String, Object>)tmp;
-                        if(tmp == null) return null;
-                        return Arrays.asList(map.get("user"), tmp);
-                }
-        };
-        KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
-                public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-                        return new SchemeAsMultiScheme(scheme);
-                }
-        };
+    public static void execWithDefaultPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
         env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0))
                 .flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
                 .flatMap(new IPZoneDataJoinExecutor())
                 .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
         env.execute();
+    }
+
+    public static void execWithBalancedPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+        PartitionStrategy strategy = createStrategy(config);
+        env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy)
+                .flatMap(new FileSensitivityDataJoinExecutor()).customGroupBy(strategy)
+                .flatMap(new IPZoneDataJoinExecutor())
+                .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor", strategy);
+        env.execute();
+    }
+
+	public static void main(String[] args) throws Exception{
+        Config config = new ConfigOptionParser().load(args);
+        LOG.info("Config class: " + config.getClass().getCanonicalName());
+        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+
+        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+        KafkaSourcedSpoutProvider provider = createProvider(config);
+        Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") ? config.getBoolean("eagleProps.balancePartitionEnabled") : false;
+        if (balancePartition) {
+            execWithBalancedPartition(config, env, provider);
+        }
+        else {
+            execWithDefaultPartition(config, env, provider);
+        }
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index de146f2..3b678a3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -50,6 +50,7 @@
     "mailHost" : "mailHost.com",
     "mailSmtpPort":"25",
     "mailDebug" : "true",
+    "balancePartitionEnabled" : "true",
     "eagleService": {
       "host": "localhost",
       "port": 38080,