You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/12/03 04:29:00 UTC
[2/5] incubator-eagle git commit: balance events partition based on
greedy parition algorithm
balance events partition based on greedy parition algorithm
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3de5cce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3de5cce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3de5cce6
Branch: refs/heads/master
Commit: 3de5cce698e2da3db223c468d28388ae5189a2e2
Parents: 34fa6d9
Author: sunlibin <ab...@gmail.com>
Authored: Thu Nov 26 18:45:46 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 13:38:14 2015 +0800
----------------------------------------------------------------------
.../storm/partition/EagleCustomGrouping.java | 51 +++++++++
.../datastream/StormTopologyCompiler.scala | 14 ++-
.../eagle/datastream/StreamAlertExpansion.scala | 9 +-
.../eagle/datastream/StreamConnector.scala | 8 ++
.../datastream/StreamGroupbyExpansion.scala | 8 +-
.../eagle/datastream/StreamProducer.scala | 28 ++++-
.../eagle/partition/DataDistributionDao.java | 28 +++++
.../eagle/partition/PartitionAlgorithm.java | 29 +++++
.../eagle/partition/PartitionStrategy.java | 27 +++++
.../eagle/partition/PartitionStrategyImpl.java | 80 ++++++++++++++
.../java/org/apache/eagle/partition/Weight.java | 30 ++++++
.../apache/eagle/metric/MetricConstants.java | 24 +++++
.../manager/EagleMetricReportManager.java | 16 +++
.../eagle/service/client/ServiceConfig.java | 29 +++++
eagle-security/eagle-metric-collection/pom.xml | 5 +
.../kafka/KafkaMessageDistributionExecutor.java | 4 +-
.../metric/kafka/KafkaOffsetCheckerConfig.java | 8 +-
.../kafka/KafkaOffsetSourceSpoutProvider.java | 3 +-
.../eagle/metric/kafka/KafkaOffsetSpout.java | 15 +--
.../test/java/TestDataDistributionDaoImpl.java | 40 +++++++
.../src/test/java/TestGreedyPartition.java | 44 ++++++++
.../src/test/java/TestKafkaOffset.java | 68 ++++++++++++
.../apache/eagle/security/partition/Bucket.java | 30 ++++++
.../security/partition/BucketComparator.java | 36 +++++++
.../partition/DataDistributionDaoImpl.java | 106 +++++++++++++++++++
.../partition/GreedyPartitionAlgorithm.java | 65 ++++++++++++
.../auditlog/HdfsAuditLogProcessorMain.java | 85 +++++++++++----
.../src/main/resources/application.conf | 1 +
28 files changed, 845 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
new file mode 100644
index 0000000..96e42b7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import org.apache.eagle.partition.PartitionStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class EagleCustomGrouping implements CustomStreamGrouping {
+
+ public List<Integer> targetTasks;
+ public PartitionStrategy strategy;
+
+ public EagleCustomGrouping(PartitionStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = new ArrayList<>(targetTasks);
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ int numTasks = targetTasks.size();
+ int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
+ return Arrays.asList(targetTasks.get(targetTaskIndex));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
index dbe69d2..4f9fccc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseRichBolt
import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
import backtype.storm.tuple.Fields
import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.storm.partition.EagleCustomGrouping
import org.slf4j.LoggerFactory
case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
@@ -63,10 +64,19 @@ case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGr
}
case Some(bt) => boltDeclarer = bt
}
- sc.groupByFields match{
+ if (sc.groupByFields != Nil) {
+ boltDeclarer.fieldsGrouping(fromName, new Fields(fields(sc.groupByFields)))
+ }
+ else if (sc.customGroupBy != null) {
+ boltDeclarer.customGrouping(fromName, new EagleCustomGrouping(sc.customGroupBy));
+ }
+ else {
+ boltDeclarer.shuffleGrouping(fromName);
+ }
+/* sc.groupByFields match{
case Nil => boltDeclarer.shuffleGrouping(fromName)
case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p)))
- }
+ }*/
LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields)
})
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
index fa83e6d..802107d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
@@ -70,7 +70,7 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = {
child match {
- case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) => {
+ case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer, strategy) => {
/**
* step 1: wrapper previous StreamProducer with one more field "streamName"
* for AlertStreamSink, we check previous StreamProducer and replace that
@@ -108,7 +108,12 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
t.setConfig(config)
t.setGraph(dag)
alertProducers += t
- newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+ if (strategy == null) {
+ newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+ }
+ else {
+ newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).customGroupBy(strategy))
+ }
})
// remove AlertStreamSink
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
index 0cece47..083a5af 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
@@ -18,11 +18,19 @@
*/
package org.apache.eagle.datastream
+import org.apache.eagle.partition.PartitionStrategy
+
case class StreamConnector(from: StreamProducer, to: StreamProducer) {
var groupByFields : Seq[Int] = Nil
+ var customGroupBy : PartitionStrategy = null
def groupBy(fields : Seq[Int]) : StreamConnector = {
groupByFields = fields
this
}
+
+ def customGroupBy(custom : PartitionStrategy) : StreamConnector = {
+ customGroupBy = custom
+ this
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
index 42bc9a8..caf71e3 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
@@ -43,7 +43,13 @@ class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
child match {
case p : GroupByProducer => {
dag.outgoingEdgesOf(p).foreach(c2 => {
- toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+ if (p.fields != Nil) {
+ toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+ }
+ else if (p.partitionStrategy != null) {
+ toBeAddedEdges += StreamConnector(current, c2.to).customGroupBy(p.partitionStrategy)
+ }
+ else toBeAddedEdges += StreamConnector(current, c2.to);
})
toBeRemovedVertex += p
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
index 40d4904..ae00d03 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import backtype.storm.topology.base.BaseRichSpout
import com.typesafe.config.Config
+import org.apache.eagle.partition.PartitionStrategy
import org.jgrapht.experimental.dag.DirectedAcyclicGraph
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -103,6 +104,12 @@ trait StreamProducer{
ret
}
+ def customGroupBy(strategy : PartitionStrategy) : StreamProducer = {
+ val ret = GroupByProducer(incrementAndGetId(), strategy)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
streamUnion(others);
}
@@ -124,15 +131,23 @@ trait StreamProducer{
alert(upStreamNames, alertExecutorId, false)
}
- def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) = {
- val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer)
+ def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy : PartitionStrategy=null ) = {
+ val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer, strategy)
hookupDAG(graph, this, ret)
}
+ def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+ alert(util.Arrays.asList(upStreamName), alertExecutorId, true, strategy)
+ }
+
def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
}
+ def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+ alert(util.Arrays.asList(upStreamName), alertExecutorId, false, strategy)
+ }
+
def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
}
@@ -170,7 +185,12 @@ case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends
case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer
-case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer
+case class GroupByProducer(id: Int, fields : Seq[Int], partitionStrategy: PartitionStrategy) extends StreamProducer
+
+object GroupByProducer {
+ def apply(id: Int, fields: Seq[Int]) = new GroupByProducer(id, fields, null)
+ def apply(id: Int, partitionStrategy : PartitionStrategy) = new GroupByProducer(id, Nil, partitionStrategy)
+}
case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer
@@ -187,7 +207,7 @@ case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamPr
}
}
-case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer
+case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer
object UniqueId{
val id : AtomicInteger = new AtomicInteger(0);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
new file mode 100644
index 0000000..5c78f96
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface DataDistributionDao extends Serializable {
+
+ List<Weight> fetchDataDistribution() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
new file mode 100644
index 0000000..0614388
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public interface PartitionAlgorithm extends Serializable {
+ Map<String, Integer> partition(List<Weight> weights, int k);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
new file mode 100644
index 0000000..e431f28
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+
+public interface PartitionStrategy extends Serializable {
+
+ int balance(String key, int buckNum);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
new file mode 100644
index 0000000..46696a6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionStrategyImpl implements PartitionStrategy {
+
+ public DataDistributionDao dao;
+ public PartitionAlgorithm algorithm;
+ public Map<String, Integer> routingTable;
+ public long lastRefreshTime;
+ public long refreshInterval;
+ public static long DEFAULT_REFRESH_INTERVAL = 60 * 60 * 1000;
+ private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
+
+ public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval) {
+ this.dao = dao;
+ this.algorithm = algorithm;
+ this.refreshInterval = refreshInterval;
+ }
+
+ public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
+ this(dao, algorithm, DEFAULT_REFRESH_INTERVAL);
+ }
+
+ public boolean needRefresh() {
+ if (System.currentTimeMillis() > lastRefreshTime + refreshInterval) {
+ lastRefreshTime = System.currentTimeMillis();
+ return true;
+ }
+ return false;
+ }
+
+ public Map<String, Integer> generateRoutingTable(int buckNum) {
+ try {
+ List<Weight> weights = dao.fetchDataDistribution();
+ routingTable = algorithm.partition(weights, buckNum);
+ return routingTable;
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public int balance(String key, int buckNum) {
+ if (needRefresh()) {
+ LOG.info("Going to refresh routing table");
+ routingTable = generateRoutingTable(buckNum);
+ LOG.info("Finish refresh routing table");
+ }
+ if (routingTable.containsKey(key)) {
+ return routingTable.get(key);
+ }
+ else {
+ return Math.abs(key.hashCode()) % buckNum;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
new file mode 100644
index 0000000..14d005d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+public class Weight {
+ public String key;
+ public Double value;
+
+ public Weight(String key, Double value) {
+ this.key = key;
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
new file mode 100644
index 0000000..54c069d
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric;
+
+public class MetricConstants {
+ public static final String GENERIC_METRIC_ENTITY_ENDPOINT = "GenericMetricService";
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
index b63944d..153159c 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.eagle.metric.manager;
import org.apache.eagle.metric.Metric;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
new file mode 100644
index 0000000..e68360f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.client;
+
+import java.io.Serializable;
+
+public class ServiceConfig implements Serializable{
+ public String serviceHost;
+ public Integer servicePort;
+ public String username;
+ public String password;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml
index f2e78a6..31fc6a4 100644
--- a/eagle-security/eagle-metric-collection/pom.xml
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -32,6 +32,11 @@
<dependencies>
<dependency>
<groupId>eagle</groupId>
+ <artifactId>eagle-metric</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
<artifactId>eagle-security-hdfs-auditlog</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
index be6d0f7..7af5ea6 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -97,12 +97,14 @@ public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<S
public void update(long currentMessageTime, String user) {
if (eventMetrics.get(user) == null) {
- LOG.info("Got metrics for new user: " + user);
+ LOG.info("A new user in the time interval, user: " + user + ", currentMessageTime: " + currentMessageTime);
putNewMetric(currentMessageTime, user);
}
else {
long latestMessageTime = eventMetrics.get(user).latestMessageTime;
if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
+ LOG.info("Going to emit a user metric, user: " + user + ", currentMessageTime: " + currentMessageTime
+ + ", latestMessageTime: " + latestMessageTime);
EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
putNewMetric(currentMessageTime, user);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
index 5a06c82..040d08f 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
@@ -20,6 +20,7 @@
package org.apache.eagle.metric.kafka;
import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
import java.io.Serializable;
@@ -31,13 +32,6 @@ public class KafkaOffsetCheckerConfig implements Serializable {
public String group;
}
- public static class ServiceConfig implements Serializable{
- public String serviceHost;
- public Integer servicePort;
- public String username;
- public String password;
- }
-
public ZKStateConfig zkConfig;
public KafkaConfig kafkaConfig;
public ServiceConfig serviceConfig;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
index c794632..5fd02fd 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -20,6 +20,7 @@ import backtype.storm.topology.base.BaseRichSpout;
import com.typesafe.config.Config;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class KafkaOffsetSourceSpoutProvider {
zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
- KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new KafkaOffsetCheckerConfig.ServiceConfig();
+ ServiceConfig serviceConfig = new ServiceConfig();
serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
index d6f7298..aee817a 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -20,9 +20,6 @@ import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
import org.apache.eagle.metric.CountingMetric;
import org.apache.eagle.metric.Metric;
import org.apache.eagle.metric.manager.EagleMetricReportManager;
@@ -62,8 +59,8 @@ public class KafkaOffsetSpout extends BaseRichSpout {
this.baseMetricDimension.put("group", config.kafkaConfig.group);
String eagleServiceHost = config.serviceConfig.serviceHost;
Integer eagleServicePort = config.serviceConfig.servicePort;
- String username = config.serviceConfig.serviceHost;
- String password = config.serviceConfig.serviceHost;
+ String username = config.serviceConfig.username;
+ String password = config.serviceConfig.password;
EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
}
@@ -77,11 +74,16 @@ public class KafkaOffsetSpout extends BaseRichSpout {
return metric;
}
+ public long trimTimestamp(long currentTime, long granularity) {
+ return currentTime / granularity * granularity;
+ }
+
@Override
public void nextTuple() {
Long currentTime = System.currentTimeMillis();
if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
try {
+ long trimedCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
List<Metric> list = new ArrayList<>();
@@ -89,8 +91,9 @@ public class KafkaOffsetSpout extends BaseRichSpout {
String partition = entry.getKey();
Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
Long lag = latestOffset.get(partitionNumber) - entry.getValue();
- list.add(constructMetric(currentTime, partition, lag));
+ list.add(constructMetric(trimedCurrentTime, partition, lag));
}
+ lastRoundTime = trimedCurrentTime;
EagleMetricReportManager.getInstance().emit(list);
} catch (Exception ex) {
LOG.error("Got an exception, ex: ", ex);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
new file mode 100644
index 0000000..1c54a90
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+
+public class TestDataDistributionDaoImpl {
+
+ //@Test
+ public void test() throws Exception{
+ System.setProperty("config.resource", "/application.local.conf");
+ Config config = ConfigFactory.load();
+ String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(eagleServiceHost, eagleServicePort, username, password, topic);
+ dao.fetchDataDistribution();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
new file mode 100644
index 0000000..cfd873a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
+import org.junit.Test;
+
+public class TestGreedyPartition {
+
+ //@Test
+ public void test() throws Exception{
+ System.setProperty("config.resource", "/application.local.conf");
+ Config config = ConfigFactory.load();
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+ PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+ algorithm.partition(dao.fetchDataDistribution(), 4);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
index bfba783..0ce8fce 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
@@ -1,2 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.metric.kafka.KafkaConsumerOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaLatestOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaOffsetCheckerConfig;
+import org.apache.eagle.service.client.ServiceConfig;
+import org.junit.Test;
+
+import java.util.Map;
+
public class TestKafkaOffset {
+
+ //@Test
+ public void test() throws Exception {
+ System.setProperty("config.resource", "/application.local.conf");
+ Config config = ConfigFactory.load();
+ ZKStateConfig zkStateConfig = new ZKStateConfig();
+ zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+ zkStateConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+ zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+ zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+ zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+ ServiceConfig serviceConfig = new ServiceConfig();
+ serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ serviceConfig.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+ KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new KafkaOffsetCheckerConfig.KafkaConfig();
+ kafkaConfig.kafkaEndPoints = config.getString("dataSourceConfig.kafkaEndPoints");
+ kafkaConfig.site = config.getString("dataSourceConfig.site");
+ kafkaConfig.topic = config.getString("dataSourceConfig.topic");
+ kafkaConfig.group = config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId");
+ KafkaOffsetCheckerConfig checkerConfig = new KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig);
+
+ KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(checkerConfig.zkConfig, checkerConfig.kafkaConfig.topic, checkerConfig.kafkaConfig.group);
+ KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(checkerConfig.kafkaConfig.kafkaEndPoints);
+
+ Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+ Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(checkerConfig.kafkaConfig.topic, consumedOffset.size());
+ for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+ String partition = entry.getKey();
+ Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+ Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+ System.out.println("total: " + latestOffset.get(partitionNumber) + ", consumed: " + entry.getValue() + ",lag: " + lag);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
new file mode 100644
index 0000000..eb31d39
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+public class Bucket {
+ Integer bucketNum;
+ Double value;
+
+ public Bucket(Integer bucketNum, Double value) {
+ this.bucketNum = bucketNum;
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
new file mode 100644
index 0000000..ea86f94
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import java.util.Comparator;
+
+public class BucketComparator implements Comparator<Bucket> {
+
+ @Override
+ public int compare(Bucket w1, Bucket w2) {
+ if (w1.value < w2.value) {
+ return -1;
+ }
+ if (w1.value > w2.value) {
+ return 1;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
new file mode 100644
index 0000000..631947c
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import com.sun.jersey.api.client.WebResource;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metric.MetricConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.Weight;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DataDistributionDaoImpl implements DataDistributionDao {
+
+ private final Logger LOG = LoggerFactory.getLogger(DataDistributionDaoImpl.class);
+
+ private final String eagleServiceHost;
+ private final Integer eagleServicePort;
+ private String username;
+ private String password;
+ private String topic;
+
+ public DataDistributionDaoImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password, String topic) {
+ this.eagleServiceHost = eagleServiceHost;
+ this.eagleServicePort = eagleServicePort;
+ this.username = username;
+ this.password = password;
+ this.topic = topic;
+ }
+
+ @Override
+ public List<Weight> fetchDataDistribution() throws Exception {
+ IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password) {
+ @Override
+ public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
+ String queryString = request.getQueryParameterString();
+ StringBuilder sb = new StringBuilder();
+ sb.append("/list");
+ sb.append("?");
+ sb.append(queryString);
+ final String urlString = sb.toString();
+ if (!this.silence) LOG.info("Going to query service: " + this.getBaseEndpoint() + urlString);
+ WebResource r = getWebResource(urlString);
+
+ return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE))
+ .header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
+ .get(GenericServiceAPIResponseEntity.class);
+ }
+ };
+ try {
+ String query = MetricConstants.GENERIC_METRIC_ENTITY_ENDPOINT + "[@topic=\"" + topic + "\"]<@user>{sum(value)}.{sum(value) desc}";
+ long endTime = System.currentTimeMillis();
+ long startTime = endTime - 2 * DateUtils.MILLIS_PER_DAY;
+ GenericServiceAPIResponseEntity<Map> response = client.search()
+ .startTime(startTime)
+ .endTime(endTime)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .metricName("kafka.message.user.count")
+ .send();
+ if (!response.isSuccess()) {
+ LOG.error(response.getException());
+ }
+ List<Weight> userWeights = new ArrayList<>();
+ for (Map keyValue : response.getObj()) {
+ List<String> keyList = (List)(keyValue.get("key"));
+ List<Double> valueList = (List)(keyValue.get("value"));
+ userWeights.add(new Weight(keyList.get(0), valueList.get(0)));
+ }
+ return userWeights;
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception, ex: ", ex);
+ throw new RuntimeException(ex);
+ }
+ finally {
+ client.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
new file mode 100644
index 0000000..0197393
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.Weight;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class GreedyPartitionAlgorithm implements PartitionAlgorithm {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GreedyPartitionAlgorithm.class);
+
+ public void printWeightTable(PriorityQueue<Bucket> queue) {
+ double total = 0;
+ Iterator<Bucket> iter = queue.iterator();
+ while (iter.hasNext()) {
+ total += iter.next().value;
+ }
+ StringBuilder sb = new StringBuilder();
+ iter = queue.iterator();
+ while (iter.hasNext()) {
+ sb.append(iter.next().value / total + ",");
+ }
+ sb.deleteCharAt(sb.length()-1);
+ LOG.info("Weights: " + sb.toString());
+ }
+
+ public HashMap<String, Integer> partition(List<Weight> weights, int k) {
+ PriorityQueue<Bucket> queue = new PriorityQueue<>(k, new BucketComparator());
+ HashMap<String, Integer> ret = new HashMap<>();
+ // Initialize the queue
+ for (int i = 0; i < k; i++) {
+ queue.add(new Bucket(i, 0.0));
+ }
+ int n = weights.size();
+ for (int i = 0; i < n; i++) {
+ Bucket bucket = queue.poll();
+ bucket.value = bucket.value + weights.get(i).value;
+ queue.add(bucket);
+ ret.put(weights.get(i).key, bucket.bucketNum);
+ }
+ printWeightTable(queue);
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index 327cb8d..8e6d5d7 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -20,13 +20,19 @@ package org.apache.eagle.security.auditlog;
import backtype.storm.spout.SchemeAsMultiScheme;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
import org.apache.eagle.dataproc.util.ConfigOptionParser;
import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.partition.PartitionStrategyImpl;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,33 +43,68 @@ import java.util.Map;
public class HdfsAuditLogProcessorMain {
private static final Logger LOG = LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
- public static void main(String[] args) throws Exception{
- Config config = new ConfigOptionParser().load(args);
-
- LOG.info("Config class: " + config.getClass().getCanonicalName());
- if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+ public static PartitionStrategy createStrategy(Config config) {
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+ PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+ PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm);
+ return strategy;
+ }
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+ public static KafkaSourcedSpoutProvider createProvider(Config config) {
+ String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+ final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ Object tmp = deserializer.deserialize(ser);
+ Map<String, Object> map = (Map<String, Object>)tmp;
+ if(tmp == null) return null;
+ return Arrays.asList(map.get("user"), tmp);
+ }
+ };
+ KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
+ @Override
+ public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
+ return new SchemeAsMultiScheme(scheme);
+ }
+ };
+ return provider;
+ }
- String deserClsName = config.getString("dataSourceConfig.deserializerClass");
- final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>)tmp;
- if(tmp == null) return null;
- return Arrays.asList(map.get("user"), tmp);
- }
- };
- KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
- public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
- return new SchemeAsMultiScheme(scheme);
- }
- };
+ public static void execWithDefaultPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0))
.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
.flatMap(new IPZoneDataJoinExecutor())
.alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
env.execute();
+ }
+
+ public static void execWithBalancedPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+ PartitionStrategy strategy = createStrategy(config);
+ env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy)
+ .flatMap(new FileSensitivityDataJoinExecutor()).customGroupBy(strategy)
+ .flatMap(new IPZoneDataJoinExecutor())
+ .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor", strategy);
+ env.execute();
+ }
+
+ public static void main(String[] args) throws Exception{
+ Config config = new ConfigOptionParser().load(args);
+ LOG.info("Config class: " + config.getClass().getCanonicalName());
+ if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+
+ StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+ KafkaSourcedSpoutProvider provider = createProvider(config);
+ Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") ? config.getBoolean("eagleProps.balancePartitionEnabled") : false;
+ if (balancePartition) {
+ execWithBalancedPartition(config, env, provider);
+ }
+ else {
+ execWithDefaultPartition(config, env, provider);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index de146f2..3b678a3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -50,6 +50,7 @@
"mailHost" : "mailHost.com",
"mailSmtpPort":"25",
"mailDebug" : "true",
+ "balancePartitionEnabled" : "true",
"eagleService": {
"host": "localhost",
"port": 38080,