You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/12/03 04:28:59 UTC
[1/5] incubator-eagle git commit: add metric topology for offline
metric collection
Repository: incubator-eagle
Updated Branches:
refs/heads/master 91aa216a8 -> 3ee73e8d5
add metric topology for offline metric collection
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/34fa6d90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/34fa6d90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/34fa6d90
Branch: refs/heads/master
Commit: 34fa6d90a8cfe21a8ad85ce852b648a4a05cc363
Parents: 6f29955
Author: sunlibin <ab...@gmail.com>
Authored: Sat Nov 21 18:04:54 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 11:48:27 2015 +0800
----------------------------------------------------------------------
.../apache/eagle/executor/AlertExecutor.java | 10 +-
.../config/RunningJobCrawlConfig.java | 14 +-
.../storm/kafka/KafkaSourcedSpoutProvider.java | 4 -
.../storm/kafka/KafkaSourcedSpoutScheme.java | 1 -
.../impl/storm/zookeeper/ZKStateConfig.java | 28 ++++
.../eagle/datastream/StreamProducer.scala | 4 +
.../org/apache/eagle/metric/CountingMetric.java | 8 +-
.../java/org/apache/eagle/metric/Metric.java | 5 +-
.../manager/EagleMetricReportManager.java | 45 +++++++
.../metric/report/EagleSerivceMetricReport.java | 61 ---------
.../metric/report/EagleServiceMetricReport.java | 60 +++++++++
.../metric/report/MetricEntityConvert.java | 2 +-
.../eagle/metric/report/MetricReport.java | 4 +-
eagle-security/eagle-metric-collection/pom.xml | 95 ++++++++++++++
.../metric/kafka/EagleMetricCollectorMain.java | 127 ++++++++++++++++++
.../eagle/metric/kafka/KafkaConsumerOffset.java | 27 ++++
.../kafka/KafkaConsumerOffsetFetcher.java | 70 ++++++++++
.../metric/kafka/KafkaLatestOffsetFetcher.java | 98 ++++++++++++++
.../kafka/KafkaMessageDistributionExecutor.java | 126 ++++++++++++++++++
.../metric/kafka/KafkaOffsetCheckerConfig.java | 50 +++++++
.../kafka/KafkaOffsetSourceSpoutProvider.java | 53 ++++++++
.../eagle/metric/kafka/KafkaOffsetSpout.java | 131 +++++++++++++++++++
.../src/main/resources/application.conf | 39 ++++++
.../src/main/resources/log4j.properties | 39 ++++++
.../src/test/java/TestKafkaOffset.java | 2 +
.../auditlog/HdfsAuditLogProcessorMain.java | 1 -
...HiveJobRunningSourcedStormSpoutProvider.java | 2 +-
eagle-security/pom.xml | 1 +
eagle-topology-assembly/pom.xml | 5 +
29 files changed, 1021 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
index 4f0f4b3..d86a846 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -28,7 +28,7 @@ import org.apache.eagle.datastream.JavaStormStreamExecutor2;
import org.apache.eagle.datastream.Tuple2;
import org.apache.eagle.metric.CountingMetric;
import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.report.EagleSerivceMetricReport;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
import com.sun.jersey.client.impl.CopyOnWriteHashMap;
import com.typesafe.config.Config;
import org.apache.eagle.alert.policy.*;
@@ -73,7 +73,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
private Map<String, String> baseDimensions;
private Thread metricReportThread;
- private EagleSerivceMetricReport metricReport;
+ private EagleServiceMetricReport metricReport;
public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){
@@ -122,7 +122,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
- this.metricReport = new EagleSerivceMetricReport(eagleServiceHost, eagleServicePort, username, password);
+ this.metricReport = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
metricMap = new ConcurrentHashMap<String, Metric>();
baseDimensions = new HashMap<String, String>();
@@ -172,7 +172,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
}
policyEvaluators = new CopyOnWriteHashMap<>();
- // for efficency, we don't put single policy evaluator
+ // for efficiency, we don't put single policy evaluator
policyEvaluators.putAll(tmpPolicyEvaluators);
DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
@@ -258,7 +258,7 @@ public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEnti
long previous = metric.getTimestamp();
if (current > previous + MERITE_GRANULARITY) {
metricList.add(metric);
- metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDemensions(), metric.getMetricName()));
+ metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDimensions(), metric.getMetricName()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
index b17a41d..79a8928 100644
--- a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
+++ b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
@@ -16,10 +16,11 @@
*/
package org.apache.eagle.jobrunning.config;
-import java.io.Serializable;
-
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
import org.apache.eagle.job.JobPartitioner;
+import java.io.Serializable;
+
public class RunningJobCrawlConfig implements Serializable{
private static final long serialVersionUID = 1L;
public RunningJobEndpointConfig endPointConfig;
@@ -49,13 +50,4 @@ public class RunningJobCrawlConfig implements Serializable{
public Class<? extends JobPartitioner> partitionerCls;
public int numTotalPartitions = 1;
}
-
- public static class ZKStateConfig implements Serializable{
- private static final long serialVersionUID = 1L;
- public String zkQuorum;
- public String zkRoot;
- public int zkSessionTimeoutMs;
- public int zkRetryTimes;
- public int zkRetryInterval;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
index 06d37ef..373b3ca 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@ -51,10 +51,6 @@ public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{
String zkConnString = context.getString("dataSourceConfig.zkConnection");
// transaction zkRoot
String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
- // Site
- String site = context.getString("eagleProps.site");
-
- //String realTopic = (site ==null)? topic : String.format("%s_%s",site,topic);
LOG.info(String.format("Use topic id: %s",topic));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
index 8bdbcb5..8b65c1f 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
@@ -51,7 +51,6 @@ public class KafkaSourcedSpoutScheme implements Scheme {
@Override
public List<Object> deserialize(byte[] ser) {
Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>)tmp;
if(tmp == null)
return null;
// the following tasks are executed within the same process of kafka spout
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
new file mode 100644
index 0000000..f9515f5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.zookeeper;
+
+import java.io.Serializable;
+
+public class ZKStateConfig implements Serializable {
+ private static final long serialVersionUID = 1L;
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
index 9fb3e22..40d4904 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -103,6 +103,10 @@ trait StreamProducer{
ret
}
+ def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
+ streamUnion(others);
+ }
+
def streamUnion(others : Seq[StreamProducer]) : StreamProducer = {
val ret = StreamUnionProducer(incrementAndGetId(), others)
hookupDAG(graph, this, ret)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
index f4d5cd5..4f65b8e 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
@@ -25,16 +25,20 @@ import com.google.common.util.concurrent.AtomicDouble;
*/
public class CountingMetric extends Metric{
+ public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, double value) {
+ super(timestamp, dimensions, metricName, new AtomicDouble(value));
+ }
+
public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
super(timestamp, dimensions, metricName, value);
}
-
+
public CountingMetric(long timestamp, Map<String, String> dimensions, String metricName) {
this(timestamp, dimensions, metricName, new AtomicDouble(0.0));
}
public CountingMetric(CountingMetric metric) {
- this(metric.timestamp, new HashMap<String, String>(metric.dimensions), metric.metricName, metric.value);
+ this(metric.timestamp, new HashMap<>(metric.dimensions), metric.metricName, metric.value);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
index 993906e..616c82b 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
@@ -16,6 +16,7 @@
*/
package org.apache.eagle.metric;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -32,7 +33,7 @@ public abstract class Metric implements MetricOperator{
public Metric(long timestamp, Map<String, String> dimensions, String metricName, AtomicDouble value) {
this.timestamp = timestamp;
- this.dimensions = dimensions;
+ this.dimensions = new HashMap<>(dimensions);
this.metricName = metricName;
this.value = value;
}
@@ -45,7 +46,7 @@ public abstract class Metric implements MetricOperator{
return timestamp;
}
- public Map<String, String> getDemensions() {
+ public Map<String, String> getDimensions() {
return dimensions;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
new file mode 100644
index 0000000..b63944d
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
@@ -0,0 +1,45 @@
+package org.apache.eagle.metric.manager;
+
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.report.MetricReport;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EagleMetricReportManager {
+
+ private static EagleMetricReportManager manager = new EagleMetricReportManager();
+ private Map<String, MetricReport> metricReportMap = new ConcurrentHashMap<>();
+
+ private EagleMetricReportManager() {
+
+ }
+
+ public static EagleMetricReportManager getInstance () {
+ return manager;
+ }
+
+ public boolean register(String name, MetricReport report) {
+ if (metricReportMap.get(name) == null) {
+ synchronized (metricReportMap) {
+ if (metricReportMap.get(name) == null) {
+ metricReportMap.put(name, report);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public Map<String, MetricReport> getRegisteredReports() {
+ return metricReportMap;
+ }
+
+ public void emit(List<Metric> list) {
+ synchronized (this.metricReportMap) {
+ for (MetricReport report : metricReportMap.values()) {
+ report.emit(list);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
deleted file mode 100644
index 31056f2..0000000
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.metric.Metric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-
-public class EagleSerivceMetricReport implements MetricReport{
-
- private EagleServiceClientImpl client;
- private static final Logger LOG = LoggerFactory.getLogger(EagleSerivceMetricReport.class);
-
- public EagleSerivceMetricReport(String host, int port, String username, String password) {
- client = new EagleServiceClientImpl(host, port, username, password);
- }
-
- public EagleSerivceMetricReport(String host, int port) {
- client = new EagleServiceClientImpl(host, port, null, null);
- }
-
- public void emit(List<Metric> list) {
- List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>();
- for (Metric metric : list) {
- entities.add(MetricEntityConvert.convert(metric));
- }
- try {
- int total = entities.size();
- GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
- if(response.isSuccess()) {
- LOG.info("Wrote " + total + " entities to service");
- }else{
- LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
- }
- }
- catch (Exception ex) {
- LOG.error("Got exception while writing entities: ", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
new file mode 100644
index 0000000..7ff415e
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.report;
+
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EagleServiceMetricReport implements MetricReport{
+
+ private EagleServiceClientImpl client;
+ private static final Logger LOG = LoggerFactory.getLogger(EagleServiceMetricReport.class);
+
+ public EagleServiceMetricReport(String host, int port, String username, String password) {
+ client = new EagleServiceClientImpl(host, port, username, password);
+ }
+
+ public EagleServiceMetricReport(String host, int port) {
+ client = new EagleServiceClientImpl(host, port, null, null);
+ }
+
+ public void emit(List<Metric> list) {
+ List<GenericMetricEntity> entities = new ArrayList<GenericMetricEntity>();
+ for (Metric metric : list) {
+ entities.add(MetricEntityConvert.convert(metric));
+ }
+ try {
+ int total = entities.size();
+ GenericServiceAPIResponseEntity<String> response = client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
+ if(response.isSuccess()) {
+ LOG.info("Wrote " + total + " entities to service");
+ }else{
+ LOG.error("Failed to write " + total + " entities to service, due to server exception: "+ response.getException());
+ }
+ }
+ catch (Exception ex) {
+ LOG.error("Got exception while writing entities: ", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
index 10f05ca..c389fa7 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
@@ -25,7 +25,7 @@ public class MetricEntityConvert {
GenericMetricEntity entity = new GenericMetricEntity();
entity.setPrefix(metric.getMetricName());
entity.setValue(new double[]{metric.getValue().get()});
- entity.setTags(metric.getDemensions());
+ entity.setTags(metric.getDimensions());
entity.setTimestamp(metric.getTimestamp());
return entity;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
index c03a89f..85d423b 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
@@ -21,6 +21,6 @@ import java.util.List;
import org.apache.eagle.metric.Metric;
public interface MetricReport {
-
- public void emit(List<Metric> list);
+ // The method should be thread safe
+ void emit(List<Metric> list);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml
new file mode 100644
index 0000000..f2e78a6
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-security-parent</artifactId>
+ <version>0.3.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>eagle-metric-collection</artifactId>
+ <packaging>jar</packaging>
+ <name>eagle-metric-collection</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-security-hdfs-auditlog</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-alert-process</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-stream-process-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.hierynomus</groupId>
+ <artifactId>sshj</artifactId>
+ <version>0.13.0</version>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
new file mode 100644
index 0000000..65fe68a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
+import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class EagleMetricCollectorMain {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class);
+
+ public static void main(String[] args) throws Exception {
+ new ConfigOptionParser().load(args);
+ //System.setProperty("config.resource", "/application.local.conf");
+
+ Config config = ConfigFactory.load();
+
+ StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+
+ String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+ final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ Object tmp = deserializer.deserialize(ser);
+ Map<String, Object> map = (Map<String, Object>)tmp;
+ if(tmp == null) return null;
+ return Arrays.asList(map.get("user"), map.get("timestamp"));
+ }
+ };
+
+ KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
+ @Override
+ public BaseRichSpout getSpout(Config context) {
+ // Kafka topic
+ String topic = context.getString("dataSourceConfig.topic");
+ // Kafka consumer group id
+ String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
+ // Kafka fetch size
+ int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+ // Kafka deserializer class
+ String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+
+ // Kafka broker zk connection
+ String zkConnString = context.getString("dataSourceConfig.zkQuorum");
+
+ // transaction zkRoot
+ String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+
+ LOG.info(String.format("Use topic id: %s",topic));
+
+ String brokerZkPath = null;
+ if(context.hasPath("dataSourceConfig.brokerZkPath")) {
+ brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+ }
+
+ BrokerHosts hosts;
+ if(brokerZkPath == null) {
+ hosts = new ZkHosts(zkConnString);
+ } else {
+ hosts = new ZkHosts(zkConnString, brokerZkPath);
+ }
+
+ SpoutConfig spoutConfig = new SpoutConfig(hosts,
+ topic,
+ zkRoot + "/" + topic,
+ groupId);
+
+ // transaction zkServers
+ String[] zkConnections = zkConnString.split(",");
+ List<String> zkHosts = new ArrayList<>();
+ for (String zkConnection : zkConnections) {
+ zkHosts.add(zkConnection.split(":")[0]);
+ }
+ Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
+
+ spoutConfig.zkServers = zkHosts;
+ // transaction zkPort
+ spoutConfig.zkPort = zkPort;
+ // transaction update interval
+ spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+ // Kafka fetch size
+ spoutConfig.fetchSizeBytes = fetchSize;
+
+ spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
+ KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+ return kafkaSpout;
+ }
+ };
+
+ env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
+ env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageDistributionCheck").groupBy(Arrays.asList(0))
+ .flatMap(new KafkaMessageDistributionExecutor());
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
new file mode 100644
index 0000000..5cf0e11
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import java.util.Map;
+
+public class KafkaConsumerOffset {
+ public Map<String, String> topology;
+ public Long offset;
+ public Long partition;
+ public Map<String, String> broker;
+ public String topic;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
new file mode 100644
index 0000000..f34f195
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaConsumerOffsetFetcher {
+
+ public CuratorFramework curator;
+ public String zkRoot;
+ public ObjectMapper mapper;
+ public String topic;
+ public String group;
+
+ public KafkaConsumerOffsetFetcher(ZKStateConfig config, String topic, String group) {
+ try {
+ this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000,
+ new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval));
+ curator.start();
+ this.zkRoot = config.zkRoot;
+ mapper = new ObjectMapper();
+ Module module = new SimpleModule("offset").registerSubtypes(new NamedType(KafkaConsumerOffset.class));
+ mapper.registerModule(module);
+ this.topic = topic;
+ this.group = group;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Map<String, Long> fetch() throws Exception {
+ Map<String, Long> map = new HashMap<String, Long>();
+ String path = zkRoot + "/" + topic + "/" + group;
+ if (curator.checkExists().forPath(path) != null) {
+ List<String> partitions = curator.getChildren().forPath(path);
+ for (String partition : partitions) {
+ String partitionPath = path + "/" + partition;
+ String data = new String(curator.getData().forPath(partitionPath));
+ KafkaConsumerOffset offset = mapper.readValue(data, KafkaConsumerOffset.class);
+ map.put(partition, offset.offset);
+ }
+ }
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
new file mode 100644
index 0000000..de93ea3
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import java.util.*;
+
+public class KafkaLatestOffsetFetcher {
+
+ private List<String> brokerList;
+ private int port;
+
+ public KafkaLatestOffsetFetcher(String connectString) {
+ brokerList = new ArrayList<>();
+ String[] brokers = connectString.split(",");
+ for (String broker : brokers) {
+ brokerList.add(broker.split(":")[0]);
+ }
+ this.port = Integer.valueOf(brokers[0].split(":")[1]);
+ }
+
+ public Map<Integer, Long> fetch(String topic, int partitionCount) {
+ Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount);
+ Map<Integer, Long> ret = new HashMap<>();
+ for (int partition = 0; partition < partitionCount; partition++) {
+ PartitionMetadata metadata = metadatas.get(partition);
+ if (metadata == null) {
+ throw new RuntimeException("Can't find metadata for Topic and Partition. Exiting");
+ }
+ if (metadata.leader() == null) {
+ throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
+ }
+ String leadBroker = metadata.leader().host();
+ String clientName = "Client_" + topic + "_" + partition;
+ SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+ long lastestOffset = getLatestOffset(consumer, topic, partition, clientName);
+ if (consumer != null) consumer.close();
+ ret.put(partition, lastestOffset);
+ }
+ return ret;
+ }
+
+ public long getLatestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+ Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+ if (response.hasError()) {
+ throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition) );
+ }
+ long[] offsets = response.offsets(topic, partition);
+ return offsets[0];
+ }
+
+ private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> brokerList, int port, String topic, int partitionCount) {
+ Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>();
+ for (String broker : brokerList) {
+ SimpleConsumer consumer = null;
+ try {
+ consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup");
+ List<String> topics = Collections.singletonList(topic);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+ List<TopicMetadata> metaData = resp.topicsMetadata();
+ for (TopicMetadata item : metaData) {
+ for (PartitionMetadata part : item.partitionsMetadata()) {
+ partitionMetadata.put(part.partitionId(), part);
+ }
+ }
+ if (partitionMetadata.size() == partitionCount) break;
+ } catch (Exception e) {
+ throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: " + e);
+ } finally {
+ if (consumer != null) consumer.close();
+ }
+ }
+ return partitionMetadata;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
new file mode 100644
index 0000000..be6d0f7
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.apache.eagle.datastream.Tuple1;
+import org.apache.eagle.metric.CountingMetric;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.manager.EagleMetricReportManager;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> {
+
+ private Config config;
+ private Map<String, String> baseMetricDimension;
+ private Map<String, EventMetric> eventMetrics;
+ private static final long DEFAULT_METRIC_GRANULARITY = 5 * 60 * 1000;
+ private static final String metricName = "kafka.message.user.count";
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);
+
+ public static class EventMetric {
+ long latestMessageTime;
+ Metric metric;
+
+ public EventMetric(long latestMessageTime, Metric metric) {
+ this.latestMessageTime = latestMessageTime;
+ this.metric = metric;
+ }
+
+ public void update(double d) {
+ this.metric.update(d);
+ }
+ }
+
+ @Override
+ public void prepareConfig(Config config) {
+ this.config = config;
+ }
+
+ @Override
+ public void init() {
+ String site = config.getString("dataSourceConfig.site");
+ String topic = config.getString("dataSourceConfig.topic");
+ this.baseMetricDimension = new HashMap<>();
+ this.baseMetricDimension.put("site", site);
+ this.baseMetricDimension.put("topic", topic);
+ String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+ EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
+ EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
+ eventMetrics = new ConcurrentHashMap<>();
+ }
+
+ public long trimTimestamp(long timestamp, long granularity) {
+ return timestamp / granularity * granularity;
+ }
+
+ public void putNewMetric(long currentMessageTime, String user) {
+ Map<String ,String> dimensions = new HashMap<>();
+ dimensions.putAll(baseMetricDimension);
+ dimensions.put("user", user);
+ long trimTimestamp = trimTimestamp(currentMessageTime, DEFAULT_METRIC_GRANULARITY);
+ Metric metric = new CountingMetric(trimTimestamp, dimensions, metricName, 1);
+ eventMetrics.put(user, new EventMetric(currentMessageTime, metric));
+ }
+
+ public void update(long currentMessageTime, String user) {
+ if (eventMetrics.get(user) == null) {
+ LOG.info("Got metrics for new user: " + user);
+ putNewMetric(currentMessageTime, user);
+ }
+ else {
+ long latestMessageTime = eventMetrics.get(user).latestMessageTime;
+ if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
+ EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
+ putNewMetric(currentMessageTime, user);
+ }
+ else {
+ eventMetrics.get(user).update(1);
+ }
+ }
+ }
+
+ @Override
+ public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) {
+ try {
+ String user = (String) input.get(0);
+ Long timestamp = (Long) (input.get(1));
+ update(timestamp, user);
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception, ex: ", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
new file mode 100644
index 0000000..5a06c82
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+
+import java.io.Serializable;
+
+public class KafkaOffsetCheckerConfig implements Serializable {
+ public static class KafkaConfig implements Serializable{
+ public String kafkaEndPoints;
+ public String topic;
+ public String site;
+ public String group;
+ }
+
+ public static class ServiceConfig implements Serializable{
+ public String serviceHost;
+ public Integer servicePort;
+ public String username;
+ public String password;
+ }
+
+ public ZKStateConfig zkConfig;
+ public KafkaConfig kafkaConfig;
+ public ServiceConfig serviceConfig;
+
+ public KafkaOffsetCheckerConfig (ServiceConfig serviceConfig, ZKStateConfig zkConfig, KafkaConfig kafkaConfig) {
+ this.serviceConfig = serviceConfig;
+ this.zkConfig = zkConfig;
+ this.kafkaConfig = kafkaConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
new file mode 100644
index 0000000..c794632
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaOffsetSourceSpoutProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSourceSpoutProvider.class);
+
+ public BaseRichSpout getSpout(Config config){
+
+ ZKStateConfig zkStateConfig = new ZKStateConfig();
+ zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+ zkStateConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+ zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+ zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+ zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+ KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new KafkaOffsetCheckerConfig.ServiceConfig();
+ serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ serviceConfig.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+ KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new KafkaOffsetCheckerConfig.KafkaConfig();
+ kafkaConfig.kafkaEndPoints = config.getString("dataSourceConfig.kafkaEndPoints");
+ kafkaConfig.site = config.getString("dataSourceConfig.site");
+ kafkaConfig.topic = config.getString("dataSourceConfig.topic");
+ kafkaConfig.group = config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId");
+ KafkaOffsetCheckerConfig checkerConfig = new KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig);
+ KafkaOffsetSpout spout = new KafkaOffsetSpout(checkerConfig);
+ return spout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
new file mode 100644
index 0000000..d6f7298
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.metric.CountingMetric;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.manager.EagleMetricReportManager;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaOffsetSpout extends BaseRichSpout {
+ private static final long serialVersionUID = 1L;
+ private static final long DEFAULT_ROUND_INTERVALS = 5 * 60 * 1000;
+ private KafkaOffsetCheckerConfig config;
+ private KafkaConsumerOffsetFetcher consumerOffsetFetcher;
+ private KafkaLatestOffsetFetcher latestOffsetFetcher;
+ private Map<String, String> baseMetricDimension;
+ private long lastRoundTime = 0;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetSpout.class);
+
+ public KafkaOffsetSpout(KafkaOffsetCheckerConfig config) {//Config config, ZKStateConfig zkStateConfig, String kafkaEndPoints){
+ this.config = config;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(config.zkConfig, config.kafkaConfig.topic, config.kafkaConfig.group);
+ latestOffsetFetcher = new KafkaLatestOffsetFetcher(config.kafkaConfig.kafkaEndPoints);
+
+ this.baseMetricDimension = new HashMap<>();
+ this.baseMetricDimension.put("site", config.kafkaConfig.site);
+ this.baseMetricDimension.put("topic", config.kafkaConfig.topic);
+ this.baseMetricDimension.put("group", config.kafkaConfig.group);
+ String eagleServiceHost = config.serviceConfig.serviceHost;
+ Integer eagleServicePort = config.serviceConfig.servicePort;
+ String username = config.serviceConfig.serviceHost;
+ String password = config.serviceConfig.serviceHost;
+ EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
+ EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
+ }
+
+ public Metric constructMetric(long timestamp, String partition, double value) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.putAll(baseMetricDimension);
+ dimensions.put("partition", partition);
+ String metricName = "eagle.kafka.message.consumer.lag";
+ Metric metric = new CountingMetric(timestamp, dimensions, metricName, value);
+ return metric;
+ }
+
+ @Override
+ public void nextTuple() {
+ Long currentTime = System.currentTimeMillis();
+ if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
+ try {
+ Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+ Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
+ List<Metric> list = new ArrayList<>();
+ for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+ String partition = entry.getKey();
+ Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+ Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+ list.add(constructMetric(currentTime, partition, lag));
+ }
+ EagleMetricReportManager.getInstance().emit(list);
+ } catch (Exception ex) {
+ LOG.error("Got an exception, ex: ", ex);
+ }
+ }
+ try{
+ Thread.sleep(10 * 1000);
+ }catch(Throwable t){
+ //Do nothing
+ }
+ }
+
+ /**
+ * empty because framework will take care of output fields declaration
+ */
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ }
+
+ @Override
+ public void deactivate() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/resources/application.conf b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
new file mode 100644
index 0000000..4b07019
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
@@ -0,0 +1,39 @@
+{
+ "envContextConfig" : {
+ "env" : "storm",
+ "mode" : "local",
+ "topologyName" : "metricCollectionTopology",
+ "stormConfigFile" : "security-auditlog-storm.yaml",
+ "parallelismConfig" : {
+ "kafkaMsgConsumer" : 1
+ }
+ },
+ "dataSourceConfig": {
+ # For fetch gap
+ "site" : "sandbox",
+ "topic" : "sandbox_hdfs_audit_log",
+ "zkQuorum" : "localhost:2191",
+ "hdfsTopologyConsumerGroupId" : "eagle.hdfsaudit.consumer",
+ "zkSessionTimeoutMs" : 15000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 2000,
+ "zkConnectionTimeoutMS" : 15000,
+ #"fetchSize" : 1048586,
+ "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+ "metricCollectionConsumerId" : "eagle.metric.collection.consumer",
+ # For kafka spout
+ #"transactionZKServers" : "localhost",
+ #"transactionZKPort" : "2181",
+ "transactionZKRoot" : "/consumers",
+ #"transactionStateUpdateMS" : 2000,
+ "kafkaEndPoints" : "localhost:9092"
+ },
+ "eagleProps" : {
+ "eagleService": {
+ "host": "localhost",
+ "port": 38080,
+ "username": "admin",
+ "password": "secret"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8a0919a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+
+#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
+#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
+#log4j.logger.eagle.executor.AlertExecutor=DEBUG
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
new file mode 100644
index 0000000..bfba783
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
@@ -0,0 +1,2 @@
+public class TestKafkaOffset {
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index fa712d4..327cb8d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -41,7 +41,6 @@ public class HdfsAuditLogProcessorMain {
Config config = new ConfigOptionParser().load(args);
LOG.info("Config class: " + config.getClass().getCanonicalName());
-
if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index 16f77c3..9ddf0b2 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -21,7 +21,7 @@ import org.apache.eagle.job.JobPartitioner;
import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ControlConfig;
import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.RunningJobEndpointConfig;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ZKStateConfig;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
import org.apache.eagle.jobrunning.storm.JobRunningSpout;
import com.typesafe.config.Config;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/pom.xml b/eagle-security/pom.xml
index 0f45c49..599cff6 100644
--- a/eagle-security/pom.xml
+++ b/eagle-security/pom.xml
@@ -41,5 +41,6 @@
<module>eagle-security-hdfs-securitylog</module>
<module>eagle-security-hbase-securitylog</module>
<module>eagle-security-hbase-web</module>
+ <module>eagle-metric-collection</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 838c2f8..2132bb3 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -54,6 +54,11 @@
<artifactId>eagle-security-hbase-securitylog</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-metric-collection</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
[2/5] incubator-eagle git commit: balance events partition based on
greedy parition algorithm
Posted by ha...@apache.org.
balance events partition based on greedy parition algorithm
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3de5cce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3de5cce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3de5cce6
Branch: refs/heads/master
Commit: 3de5cce698e2da3db223c468d28388ae5189a2e2
Parents: 34fa6d9
Author: sunlibin <ab...@gmail.com>
Authored: Thu Nov 26 18:45:46 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 13:38:14 2015 +0800
----------------------------------------------------------------------
.../storm/partition/EagleCustomGrouping.java | 51 +++++++++
.../datastream/StormTopologyCompiler.scala | 14 ++-
.../eagle/datastream/StreamAlertExpansion.scala | 9 +-
.../eagle/datastream/StreamConnector.scala | 8 ++
.../datastream/StreamGroupbyExpansion.scala | 8 +-
.../eagle/datastream/StreamProducer.scala | 28 ++++-
.../eagle/partition/DataDistributionDao.java | 28 +++++
.../eagle/partition/PartitionAlgorithm.java | 29 +++++
.../eagle/partition/PartitionStrategy.java | 27 +++++
.../eagle/partition/PartitionStrategyImpl.java | 80 ++++++++++++++
.../java/org/apache/eagle/partition/Weight.java | 30 ++++++
.../apache/eagle/metric/MetricConstants.java | 24 +++++
.../manager/EagleMetricReportManager.java | 16 +++
.../eagle/service/client/ServiceConfig.java | 29 +++++
eagle-security/eagle-metric-collection/pom.xml | 5 +
.../kafka/KafkaMessageDistributionExecutor.java | 4 +-
.../metric/kafka/KafkaOffsetCheckerConfig.java | 8 +-
.../kafka/KafkaOffsetSourceSpoutProvider.java | 3 +-
.../eagle/metric/kafka/KafkaOffsetSpout.java | 15 +--
.../test/java/TestDataDistributionDaoImpl.java | 40 +++++++
.../src/test/java/TestGreedyPartition.java | 44 ++++++++
.../src/test/java/TestKafkaOffset.java | 68 ++++++++++++
.../apache/eagle/security/partition/Bucket.java | 30 ++++++
.../security/partition/BucketComparator.java | 36 +++++++
.../partition/DataDistributionDaoImpl.java | 106 +++++++++++++++++++
.../partition/GreedyPartitionAlgorithm.java | 65 ++++++++++++
.../auditlog/HdfsAuditLogProcessorMain.java | 85 +++++++++++----
.../src/main/resources/application.conf | 1 +
28 files changed, 845 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
new file mode 100644
index 0000000..96e42b7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/partition/EagleCustomGrouping.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.partition;
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.task.WorkerTopologyContext;
+import org.apache.eagle.partition.PartitionStrategy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class EagleCustomGrouping implements CustomStreamGrouping {
+
+ public List<Integer> targetTasks;
+ public PartitionStrategy strategy;
+
+ public EagleCustomGrouping(PartitionStrategy strategy) {
+ this.strategy = strategy;
+ }
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ this.targetTasks = new ArrayList<>(targetTasks);
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ int numTasks = targetTasks.size();
+ int targetTaskIndex = strategy.balance((String)values.get(0), numTasks);
+ return Arrays.asList(targetTasks.get(targetTaskIndex));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
index dbe69d2..4f9fccc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseRichBolt
import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
import backtype.storm.tuple.Fields
import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.storm.partition.EagleCustomGrouping
import org.slf4j.LoggerFactory
case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
@@ -63,10 +64,19 @@ case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGr
}
case Some(bt) => boltDeclarer = bt
}
- sc.groupByFields match{
+ if (sc.groupByFields != Nil) {
+ boltDeclarer.fieldsGrouping(fromName, new Fields(fields(sc.groupByFields)))
+ }
+ else if (sc.customGroupBy != null) {
+ boltDeclarer.customGrouping(fromName, new EagleCustomGrouping(sc.customGroupBy));
+ }
+ else {
+ boltDeclarer.shuffleGrouping(fromName);
+ }
+/* sc.groupByFields match{
case Nil => boltDeclarer.shuffleGrouping(fromName)
case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p)))
- }
+ }*/
LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields)
})
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
index fa83e6d..802107d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
@@ -70,7 +70,7 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = {
child match {
- case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) => {
+ case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer, strategy) => {
/**
* step 1: wrapper previous StreamProducer with one more field "streamName"
* for AlertStreamSink, we check previous StreamProducer and replace that
@@ -108,7 +108,12 @@ class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
t.setConfig(config)
t.setGraph(dag)
alertProducers += t
- newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+ if (strategy == null) {
+ newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+ }
+ else {
+ newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).customGroupBy(strategy))
+ }
})
// remove AlertStreamSink
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
index 0cece47..083a5af 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
@@ -18,11 +18,19 @@
*/
package org.apache.eagle.datastream
+import org.apache.eagle.partition.PartitionStrategy
+
case class StreamConnector(from: StreamProducer, to: StreamProducer) {
var groupByFields : Seq[Int] = Nil
+ var customGroupBy : PartitionStrategy = null
def groupBy(fields : Seq[Int]) : StreamConnector = {
groupByFields = fields
this
}
+
+ def customGroupBy(custom : PartitionStrategy) : StreamConnector = {
+ customGroupBy = custom
+ this
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
index 42bc9a8..caf71e3 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
@@ -43,7 +43,13 @@ class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
child match {
case p : GroupByProducer => {
dag.outgoingEdgesOf(p).foreach(c2 => {
- toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+ if (p.fields != Nil) {
+ toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+ }
+ else if (p.partitionStrategy != null) {
+ toBeAddedEdges += StreamConnector(current, c2.to).customGroupBy(p.partitionStrategy)
+ }
+ else toBeAddedEdges += StreamConnector(current, c2.to);
})
toBeRemovedVertex += p
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
index 40d4904..ae00d03 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import backtype.storm.topology.base.BaseRichSpout
import com.typesafe.config.Config
+import org.apache.eagle.partition.PartitionStrategy
import org.jgrapht.experimental.dag.DirectedAcyclicGraph
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -103,6 +104,12 @@ trait StreamProducer{
ret
}
+ def customGroupBy(strategy : PartitionStrategy) : StreamProducer = {
+ val ret = GroupByProducer(incrementAndGetId(), strategy)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
streamUnion(others);
}
@@ -124,15 +131,23 @@ trait StreamProducer{
alert(upStreamNames, alertExecutorId, false)
}
- def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) = {
- val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer)
+ def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy : PartitionStrategy=null ) = {
+ val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer, strategy)
hookupDAG(graph, this, ret)
}
+ def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+ alert(util.Arrays.asList(upStreamName), alertExecutorId, true, strategy)
+ }
+
def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
}
+ def alertWithoutConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
+ alert(util.Arrays.asList(upStreamName), alertExecutorId, false, strategy)
+ }
+
def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
}
@@ -170,7 +185,12 @@ case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends
case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer
-case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer
+case class GroupByProducer(id: Int, fields : Seq[Int], partitionStrategy: PartitionStrategy) extends StreamProducer
+
+object GroupByProducer {
+ def apply(id: Int, fields: Seq[Int]) = new GroupByProducer(id, fields, null)
+ def apply(id: Int, partitionStrategy : PartitionStrategy) = new GroupByProducer(id, Nil, partitionStrategy)
+}
case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer
@@ -187,7 +207,7 @@ case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamPr
}
}
-case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer
+case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer
object UniqueId{
val id : AtomicInteger = new AtomicInteger(0);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
new file mode 100644
index 0000000..5c78f96
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface DataDistributionDao extends Serializable {
+
+ List<Weight> fetchDataDistribution() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
new file mode 100644
index 0000000..0614388
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionAlgorithm.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public interface PartitionAlgorithm extends Serializable {
+ Map<String, Integer> partition(List<Weight> weights, int k);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
new file mode 100644
index 0000000..e431f28
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategy.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import java.io.Serializable;
+
+public interface PartitionStrategy extends Serializable {
+
+ int balance(String key, int buckNum);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
new file mode 100644
index 0000000..46696a6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+
+public class PartitionStrategyImpl implements PartitionStrategy {
+
+ public DataDistributionDao dao;
+ public PartitionAlgorithm algorithm;
+ public Map<String, Integer> routingTable;
+ public long lastRefreshTime;
+ public long refreshInterval;
+ public static long DEFAULT_REFRESH_INTERVAL = 60 * 60 * 1000;
+ private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
+
+ public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval) {
+ this.dao = dao;
+ this.algorithm = algorithm;
+ this.refreshInterval = refreshInterval;
+ }
+
+ public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
+ this(dao, algorithm, DEFAULT_REFRESH_INTERVAL);
+ }
+
+ public boolean needRefresh() {
+ if (System.currentTimeMillis() > lastRefreshTime + refreshInterval) {
+ lastRefreshTime = System.currentTimeMillis();
+ return true;
+ }
+ return false;
+ }
+
+ public Map<String, Integer> generateRoutingTable(int buckNum) {
+ try {
+ List<Weight> weights = dao.fetchDataDistribution();
+ routingTable = algorithm.partition(weights, buckNum);
+ return routingTable;
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public int balance(String key, int buckNum) {
+ if (needRefresh()) {
+ LOG.info("Going to refresh routing table");
+ routingTable = generateRoutingTable(buckNum);
+ LOG.info("Finish refresh routing table");
+ }
+ if (routingTable.containsKey(key)) {
+ return routingTable.get(key);
+ }
+ else {
+ return Math.abs(key.hashCode()) % buckNum;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
new file mode 100644
index 0000000..14d005d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/Weight.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.partition;
+
+public class Weight {
+ public String key;
+ public Double value;
+
+ public Weight(String key, Double value) {
+ this.key = key;
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
new file mode 100644
index 0000000..54c069d
--- /dev/null
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/MetricConstants.java
@@ -0,0 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric;
+
+public class MetricConstants {
+ public static final String GENERIC_METRIC_ENTITY_ENDPOINT = "GenericMetricService";
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
index b63944d..153159c 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.eagle.metric.manager;
import org.apache.eagle.metric.Metric;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
new file mode 100644
index 0000000..e68360f
--- /dev/null
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/ServiceConfig.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.client;
+
+import java.io.Serializable;
+
+public class ServiceConfig implements Serializable{
+ public String serviceHost;
+ public Integer servicePort;
+ public String username;
+ public String password;
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml b/eagle-security/eagle-metric-collection/pom.xml
index f2e78a6..31fc6a4 100644
--- a/eagle-security/eagle-metric-collection/pom.xml
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -32,6 +32,11 @@
<dependencies>
<dependency>
<groupId>eagle</groupId>
+ <artifactId>eagle-metric</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
<artifactId>eagle-security-hdfs-auditlog</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
index be6d0f7..7af5ea6 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -97,12 +97,14 @@ public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<S
public void update(long currentMessageTime, String user) {
if (eventMetrics.get(user) == null) {
- LOG.info("Got metrics for new user: " + user);
+ LOG.info("A new user in the time interval, user: " + user + ", currentMessageTime: " + currentMessageTime);
putNewMetric(currentMessageTime, user);
}
else {
long latestMessageTime = eventMetrics.get(user).latestMessageTime;
if (currentMessageTime > latestMessageTime + DEFAULT_METRIC_GRANULARITY) {
+ LOG.info("Going to emit a user metric, user: " + user + ", currentMessageTime: " + currentMessageTime
+ + ", latestMessageTime: " + latestMessageTime);
EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
putNewMetric(currentMessageTime, user);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
index 5a06c82..040d08f 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
@@ -20,6 +20,7 @@
package org.apache.eagle.metric.kafka;
import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
import java.io.Serializable;
@@ -31,13 +32,6 @@ public class KafkaOffsetCheckerConfig implements Serializable {
public String group;
}
- public static class ServiceConfig implements Serializable{
- public String serviceHost;
- public Integer servicePort;
- public String username;
- public String password;
- }
-
public ZKStateConfig zkConfig;
public KafkaConfig kafkaConfig;
public ServiceConfig serviceConfig;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
index c794632..5fd02fd 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -20,6 +20,7 @@ import backtype.storm.topology.base.BaseRichSpout;
import com.typesafe.config.Config;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.service.client.ServiceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class KafkaOffsetSourceSpoutProvider {
zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
- KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new KafkaOffsetCheckerConfig.ServiceConfig();
+ ServiceConfig serviceConfig = new ServiceConfig();
serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
index d6f7298..aee817a 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -20,9 +20,6 @@ import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
import org.apache.eagle.metric.CountingMetric;
import org.apache.eagle.metric.Metric;
import org.apache.eagle.metric.manager.EagleMetricReportManager;
@@ -62,8 +59,8 @@ public class KafkaOffsetSpout extends BaseRichSpout {
this.baseMetricDimension.put("group", config.kafkaConfig.group);
String eagleServiceHost = config.serviceConfig.serviceHost;
Integer eagleServicePort = config.serviceConfig.servicePort;
- String username = config.serviceConfig.serviceHost;
- String password = config.serviceConfig.serviceHost;
+ String username = config.serviceConfig.username;
+ String password = config.serviceConfig.password;
EagleServiceMetricReport report = new EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, password);
EagleMetricReportManager.getInstance().register("metricCollectServiceReport", report);
}
@@ -77,11 +74,16 @@ public class KafkaOffsetSpout extends BaseRichSpout {
return metric;
}
+ public long trimTimestamp(long currentTime, long granularity) {
+ return currentTime / granularity * granularity;
+ }
+
@Override
public void nextTuple() {
Long currentTime = System.currentTimeMillis();
if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
try {
+ long trimedCurrentTime = trimTimestamp(currentTime, DEFAULT_ROUND_INTERVALS);
Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
List<Metric> list = new ArrayList<>();
@@ -89,8 +91,9 @@ public class KafkaOffsetSpout extends BaseRichSpout {
String partition = entry.getKey();
Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
Long lag = latestOffset.get(partitionNumber) - entry.getValue();
- list.add(constructMetric(currentTime, partition, lag));
+ list.add(constructMetric(trimedCurrentTime, partition, lag));
}
+ lastRoundTime = trimedCurrentTime;
EagleMetricReportManager.getInstance().emit(list);
} catch (Exception ex) {
LOG.error("Got an exception, ex: ", ex);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
new file mode 100644
index 0000000..1c54a90
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+
+public class TestDataDistributionDaoImpl {
+
+ //@Test
+ public void test() throws Exception{
+ System.setProperty("config.resource", "/application.local.conf");
+ Config config = ConfigFactory.load();
+ String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(eagleServiceHost, eagleServicePort, username, password, topic);
+ dao.fetchDataDistribution();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
new file mode 100644
index 0000000..cfd873a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
+import org.junit.Test;
+
+public class TestGreedyPartition {
+
+ //@Test
+ public void test() throws Exception{
+ System.setProperty("config.resource", "/application.local.conf");
+ Config config = ConfigFactory.load();
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+ PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+ algorithm.partition(dao.fetchDataDistribution(), 4);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
index bfba783..0ce8fce 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
@@ -1,2 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.metric.kafka.KafkaConsumerOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaLatestOffsetFetcher;
+import org.apache.eagle.metric.kafka.KafkaOffsetCheckerConfig;
+import org.apache.eagle.service.client.ServiceConfig;
+import org.junit.Test;
+
+import java.util.Map;
+
public class TestKafkaOffset {
+
+ //@Test
+ public void test() throws Exception {
+ System.setProperty("config.resource", "/application.local.conf");
+ Config config = ConfigFactory.load();
+ ZKStateConfig zkStateConfig = new ZKStateConfig();
+ zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+ zkStateConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+ zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+ zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+ zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+ ServiceConfig serviceConfig = new ServiceConfig();
+ serviceConfig.serviceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ serviceConfig.servicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ serviceConfig.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ serviceConfig.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+ KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new KafkaOffsetCheckerConfig.KafkaConfig();
+ kafkaConfig.kafkaEndPoints = config.getString("dataSourceConfig.kafkaEndPoints");
+ kafkaConfig.site = config.getString("dataSourceConfig.site");
+ kafkaConfig.topic = config.getString("dataSourceConfig.topic");
+ kafkaConfig.group = config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId");
+ KafkaOffsetCheckerConfig checkerConfig = new KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig);
+
+ KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(checkerConfig.zkConfig, checkerConfig.kafkaConfig.topic, checkerConfig.kafkaConfig.group);
+ KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(checkerConfig.kafkaConfig.kafkaEndPoints);
+
+ Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+ Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(checkerConfig.kafkaConfig.topic, consumedOffset.size());
+ for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+ String partition = entry.getKey();
+ Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+ Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+ System.out.println("total: " + latestOffset.get(partitionNumber) + ", consumed: " + entry.getValue() + ",lag: " + lag);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
new file mode 100644
index 0000000..eb31d39
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/Bucket.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+public class Bucket {
+ Integer bucketNum;
+ Double value;
+
+ public Bucket(Integer bucketNum, Double value) {
+ this.bucketNum = bucketNum;
+ this.value = value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
new file mode 100644
index 0000000..ea86f94
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/BucketComparator.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import java.util.Comparator;
+
+public class BucketComparator implements Comparator<Bucket> {
+
+ @Override
+ public int compare(Bucket w1, Bucket w2) {
+ if (w1.value < w2.value) {
+ return -1;
+ }
+ if (w1.value > w2.value) {
+ return 1;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
new file mode 100644
index 0000000..631947c
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import com.sun.jersey.api.client.WebResource;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metric.MetricConstants;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.Weight;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.EagleServiceSingleEntityQueryRequest;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DataDistributionDaoImpl implements DataDistributionDao {
+
+ private final Logger LOG = LoggerFactory.getLogger(DataDistributionDaoImpl.class);
+
+ private final String eagleServiceHost;
+ private final Integer eagleServicePort;
+ private String username;
+ private String password;
+ private String topic;
+
+ public DataDistributionDaoImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password, String topic) {
+ this.eagleServiceHost = eagleServiceHost;
+ this.eagleServicePort = eagleServicePort;
+ this.username = username;
+ this.password = password;
+ this.topic = topic;
+ }
+
+ @Override
+ public List<Weight> fetchDataDistribution() throws Exception {
+ IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password) {
+ @Override
+ public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
+ String queryString = request.getQueryParameterString();
+ StringBuilder sb = new StringBuilder();
+ sb.append("/list");
+ sb.append("?");
+ sb.append(queryString);
+ final String urlString = sb.toString();
+ if (!this.silence) LOG.info("Going to query service: " + this.getBaseEndpoint() + urlString);
+ WebResource r = getWebResource(urlString);
+
+ return putAuthHeaderIfNeeded(r.accept(DEFAULT_MEDIA_TYPE))
+ .header(CONTENT_TYPE, DEFAULT_HTTP_HEADER_CONTENT_TYPE)
+ .get(GenericServiceAPIResponseEntity.class);
+ }
+ };
+ try {
+ String query = MetricConstants.GENERIC_METRIC_ENTITY_ENDPOINT + "[@topic=\"" + topic + "\"]<@user>{sum(value)}.{sum(value) desc}";
+ long endTime = System.currentTimeMillis();
+ long startTime = endTime - 2 * DateUtils.MILLIS_PER_DAY;
+ GenericServiceAPIResponseEntity<Map> response = client.search()
+ .startTime(startTime)
+ .endTime(endTime)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .metricName("kafka.message.user.count")
+ .send();
+ if (!response.isSuccess()) {
+ LOG.error(response.getException());
+ }
+ List<Weight> userWeights = new ArrayList<>();
+ for (Map keyValue : response.getObj()) {
+ List<String> keyList = (List)(keyValue.get("key"));
+ List<Double> valueList = (List)(keyValue.get("value"));
+ userWeights.add(new Weight(keyList.get(0), valueList.get(0)));
+ }
+ return userWeights;
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception, ex: ", ex);
+ throw new RuntimeException(ex);
+ }
+ finally {
+ client.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
new file mode 100644
index 0000000..0197393
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/GreedyPartitionAlgorithm.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.partition;
+
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.Weight;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class GreedyPartitionAlgorithm implements PartitionAlgorithm {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GreedyPartitionAlgorithm.class);
+
+ public void printWeightTable(PriorityQueue<Bucket> queue) {
+ double total = 0;
+ Iterator<Bucket> iter = queue.iterator();
+ while (iter.hasNext()) {
+ total += iter.next().value;
+ }
+ StringBuilder sb = new StringBuilder();
+ iter = queue.iterator();
+ while (iter.hasNext()) {
+ sb.append(iter.next().value / total + ",");
+ }
+ sb.deleteCharAt(sb.length()-1);
+ LOG.info("Weights: " + sb.toString());
+ }
+
+ public HashMap<String, Integer> partition(List<Weight> weights, int k) {
+ PriorityQueue<Bucket> queue = new PriorityQueue<>(k, new BucketComparator());
+ HashMap<String, Integer> ret = new HashMap<>();
+ // Initialize the queue
+ for (int i = 0; i < k; i++) {
+ queue.add(new Bucket(i, 0.0));
+ }
+ int n = weights.size();
+ for (int i = 0; i < n; i++) {
+ Bucket bucket = queue.poll();
+ bucket.value = bucket.value + weights.get(i).value;
+ queue.add(bucket);
+ ret.put(weights.get(i).key, bucket.bucketNum);
+ }
+ printWeightTable(queue);
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index 327cb8d..8e6d5d7 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -20,13 +20,19 @@ package org.apache.eagle.security.auditlog;
import backtype.storm.spout.SchemeAsMultiScheme;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;
+import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
import org.apache.eagle.dataproc.util.ConfigOptionParser;
import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.partition.PartitionStrategyImpl;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,33 +43,68 @@ import java.util.Map;
public class HdfsAuditLogProcessorMain {
private static final Logger LOG = LoggerFactory.getLogger(HdfsAuditLogProcessorMain.class);
- public static void main(String[] args) throws Exception{
- Config config = new ConfigOptionParser().load(args);
-
- LOG.info("Config class: " + config.getClass().getCanonicalName());
- if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+ public static PartitionStrategy createStrategy(Config config) {
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+ PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+ PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm);
+ return strategy;
+ }
- StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+ public static KafkaSourcedSpoutProvider createProvider(Config config) {
+ String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+ final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ Object tmp = deserializer.deserialize(ser);
+ Map<String, Object> map = (Map<String, Object>)tmp;
+ if(tmp == null) return null;
+ return Arrays.asList(map.get("user"), tmp);
+ }
+ };
+ KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
+ @Override
+ public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
+ return new SchemeAsMultiScheme(scheme);
+ }
+ };
+ return provider;
+ }
- String deserClsName = config.getString("dataSourceConfig.deserializerClass");
- final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>)tmp;
- if(tmp == null) return null;
- return Arrays.asList(map.get("user"), tmp);
- }
- };
- KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
- public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
- return new SchemeAsMultiScheme(scheme);
- }
- };
+ public static void execWithDefaultPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").groupBy(Arrays.asList(0))
.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
.flatMap(new IPZoneDataJoinExecutor())
.alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
env.execute();
+ }
+
+ public static void execWithBalancedPartition(Config config, StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
+ PartitionStrategy strategy = createStrategy(config);
+ env.newSource(provider.getSpout(config)).renameOutputFields(2).withName("kafkaMsgConsumer").customGroupBy(strategy)
+ .flatMap(new FileSensitivityDataJoinExecutor()).customGroupBy(strategy)
+ .flatMap(new IPZoneDataJoinExecutor())
+ .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor", strategy);
+ env.execute();
+ }
+
+ public static void main(String[] args) throws Exception{
+ Config config = new ConfigOptionParser().load(args);
+ LOG.info("Config class: " + config.getClass().getCanonicalName());
+ if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
+
+ StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
+ KafkaSourcedSpoutProvider provider = createProvider(config);
+ Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") ? config.getBoolean("eagleProps.balancePartitionEnabled") : false;
+ if (balancePartition) {
+ execWithBalancedPartition(config, env, provider);
+ }
+ else {
+ execWithDefaultPartition(config, env, provider);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3de5cce6/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index de146f2..3b678a3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -50,6 +50,7 @@
"mailHost" : "mailHost.com",
"mailSmtpPort":"25",
"mailDebug" : "true",
+ "balancePartitionEnabled" : "true",
"eagleService": {
"host": "localhost",
"port": 38080,
[3/5] incubator-eagle git commit: fix a typo in fetching alertConfigs
Posted by ha...@apache.org.
fix a typo in fetching alertConfigs
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/58a9555e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/58a9555e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/58a9555e
Branch: refs/heads/master
Commit: 58a9555e28894e707d0dad528758dceb2e08d20f
Parents: 3de5cce
Author: sunlibin <ab...@gmail.com>
Authored: Mon Nov 30 13:55:21 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Mon Nov 30 13:55:21 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/eagle/executor/AlertExecutorCreationUtils.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/58a9555e/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
index c073939..6f7b69a 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
@@ -102,7 +102,7 @@ public class AlertExecutorCreationUtils {
String alertExecutorConfigsKey = "alertExecutorConfigs";
if(config.hasPath(alertExecutorConfigsKey)) {
Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey);
- if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorConfigs)) {
+ if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorId)) {
Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
int parts = 0;
if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
[5/5] incubator-eagle git commit: [EAGLE-2] Add eagle offline metric
collection topology and do online balance partition based on the statistic
metric
Posted by ha...@apache.org.
[EAGLE-2] Add eagle offline metric collection topology and do online balance partition based on the statistic metric
JIRA: [EAGLE-2][EAGLE-24][EAGLE-50][EAGLE-52]
Author: sunlibin <ab...@gmail.com>
Closes #8 from sunlibin:Eagle-Metric-And-Balance-Partition
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3ee73e8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3ee73e8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3ee73e8d
Branch: refs/heads/master
Commit: 3ee73e8d5953b1c51d3c8c7d2af24ff417ca57f5
Parents: 91aa216 b11a223
Author: Hao Chen <ha...@apache.org>
Authored: Thu Dec 3 11:26:44 2015 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Dec 3 11:26:44 2015 +0800
----------------------------------------------------------------------
.../apache/eagle/executor/AlertExecutor.java | 10 +-
.../executor/AlertExecutorCreationUtils.java | 2 +-
.../config/RunningJobCrawlConfig.java | 14 +-
.../storm/kafka/KafkaSourcedSpoutProvider.java | 4 -
.../storm/kafka/KafkaSourcedSpoutScheme.java | 1 -
.../storm/partition/EagleCustomGrouping.java | 51 +++++++
.../impl/storm/zookeeper/ZKStateConfig.java | 28 ++++
.../datastream/StormTopologyCompiler.scala | 14 +-
.../eagle/datastream/StreamAlertExpansion.scala | 9 +-
.../eagle/datastream/StreamConnector.scala | 8 ++
.../datastream/StreamGroupbyExpansion.scala | 8 +-
.../eagle/datastream/StreamProducer.scala | 32 ++++-
.../eagle/partition/DataDistributionDao.java | 28 ++++
.../eagle/partition/PartitionAlgorithm.java | 29 ++++
.../eagle/partition/PartitionStrategy.java | 27 ++++
.../eagle/partition/PartitionStrategyImpl.java | 86 ++++++++++++
.../java/org/apache/eagle/partition/Weight.java | 30 +++++
.../org/apache/eagle/metric/CountingMetric.java | 8 +-
.../java/org/apache/eagle/metric/Metric.java | 5 +-
.../apache/eagle/metric/MetricConstants.java | 24 ++++
.../manager/EagleMetricReportManager.java | 61 +++++++++
.../metric/report/EagleSerivceMetricReport.java | 61 ---------
.../metric/report/EagleServiceMetricReport.java | 60 +++++++++
.../metric/report/MetricEntityConvert.java | 2 +-
.../eagle/metric/report/MetricReport.java | 4 +-
.../eagle/service/client/ServiceConfig.java | 29 ++++
eagle-security/eagle-metric-collection/pom.xml | 100 ++++++++++++++
.../metric/kafka/EagleMetricCollectorMain.java | 127 ++++++++++++++++++
.../eagle/metric/kafka/KafkaConsumerOffset.java | 27 ++++
.../kafka/KafkaConsumerOffsetFetcher.java | 70 ++++++++++
.../metric/kafka/KafkaLatestOffsetFetcher.java | 98 ++++++++++++++
.../kafka/KafkaMessageDistributionExecutor.java | 128 ++++++++++++++++++
.../metric/kafka/KafkaOffsetCheckerConfig.java | 44 ++++++
.../kafka/KafkaOffsetSourceSpoutProvider.java | 54 ++++++++
.../eagle/metric/kafka/KafkaOffsetSpout.java | 134 +++++++++++++++++++
.../src/main/resources/application.conf | 39 ++++++
.../src/main/resources/log4j.properties | 39 ++++++
.../test/java/TestDataDistributionDaoImpl.java | 41 ++++++
.../src/test/java/TestGreedyPartition.java | 45 +++++++
.../src/test/java/TestKafkaOffset.java | 70 ++++++++++
.../apache/eagle/security/partition/Bucket.java | 30 +++++
.../security/partition/BucketComparator.java | 36 +++++
.../partition/DataDistributionDaoImpl.java | 104 ++++++++++++++
.../partition/GreedyPartitionAlgorithm.java | 65 +++++++++
.../auditlog/HdfsAuditLogProcessorMain.java | 91 +++++++++----
.../src/main/resources/application.conf | 3 +
...HiveJobRunningSourcedStormSpoutProvider.java | 2 +-
eagle-security/pom.xml | 1 +
eagle-topology-assembly/pom.xml | 5 +
49 files changed, 1865 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
[4/5] incubator-eagle git commit: modify some interfaces and
constructors
Posted by ha...@apache.org.
modify some interfaces and constructors
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b11a223a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b11a223a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b11a223a
Branch: refs/heads/master
Commit: b11a223a452b6a11d4a111175ca26948663d17ef
Parents: 58a9555
Author: sunlibin <ab...@gmail.com>
Authored: Mon Nov 30 18:09:06 2015 +0800
Committer: sunlibin <ab...@gmail.com>
Committed: Wed Dec 2 14:54:29 2015 +0800
----------------------------------------------------------------------
.../apache/eagle/partition/DataDistributionDao.java | 2 +-
.../apache/eagle/partition/PartitionStrategyImpl.java | 14 ++++++++++----
.../eagle/metric/kafka/EagleMetricCollectorMain.java | 2 +-
.../src/test/java/TestDataDistributionDaoImpl.java | 3 ++-
.../src/test/java/TestGreedyPartition.java | 3 ++-
.../security/partition/DataDistributionDaoImpl.java | 4 +---
.../security/auditlog/HdfsAuditLogProcessorMain.java | 7 ++++++-
.../src/main/resources/application.conf | 4 +++-
8 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
index 5c78f96..0b17775 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/DataDistributionDao.java
@@ -24,5 +24,5 @@ import java.util.List;
public interface DataDistributionDao extends Serializable {
- List<Weight> fetchDataDistribution() throws Exception;
+ List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
index 46696a6..eacefd5 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/partition/PartitionStrategyImpl.java
@@ -19,8 +19,10 @@
package org.apache.eagle.partition;
+import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.List;
import java.util.Map;
@@ -31,17 +33,20 @@ public class PartitionStrategyImpl implements PartitionStrategy {
public Map<String, Integer> routingTable;
public long lastRefreshTime;
public long refreshInterval;
- public static long DEFAULT_REFRESH_INTERVAL = 60 * 60 * 1000;
+ public long timeRange;
+ public static long DEFAULT_TIME_RANGE = 2 * DateUtils.MILLIS_PER_DAY;
+ public static long DEFAULT_REFRESH_INTERVAL = 2 * DateUtils.MILLIS_PER_HOUR;
private final Logger LOG = LoggerFactory.getLogger(PartitionStrategyImpl.class);
- public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval) {
+ public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm, long refreshInterval, long timeRange) {
this.dao = dao;
this.algorithm = algorithm;
this.refreshInterval = refreshInterval;
+ this.timeRange = timeRange;
}
public PartitionStrategyImpl(DataDistributionDao dao, PartitionAlgorithm algorithm) {
- this(dao, algorithm, DEFAULT_REFRESH_INTERVAL);
+ this(dao, algorithm, DEFAULT_REFRESH_INTERVAL, DEFAULT_TIME_RANGE);
}
public boolean needRefresh() {
@@ -54,7 +59,8 @@ public class PartitionStrategyImpl implements PartitionStrategy {
public Map<String, Integer> generateRoutingTable(int buckNum) {
try {
- List<Weight> weights = dao.fetchDataDistribution();
+ long currentTime = System.currentTimeMillis();
+ List<Weight> weights = dao.fetchDataDistribution(currentTime - timeRange, currentTime);
routingTable = algorithm.partition(weights, buckNum);
return routingTable;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
index 65fe68a..218b812 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -120,7 +120,7 @@ public class EagleMetricCollectorMain {
};
env.newSource(new KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
- env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageDistributionCheck").groupBy(Arrays.asList(0))
+ env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageFetcher").groupBy(Arrays.asList(0))
.flatMap(new KafkaMessageDistributionExecutor());
env.execute();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
index 1c54a90..4d82085 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestDataDistributionDaoImpl.java
@@ -19,6 +19,7 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.partition.DataDistributionDao;
import org.apache.eagle.security.partition.DataDistributionDaoImpl;
@@ -35,6 +36,6 @@ public class TestDataDistributionDaoImpl {
String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
String topic = config.getString("dataSourceConfig.topic");
DataDistributionDao dao = new DataDistributionDaoImpl(eagleServiceHost, eagleServicePort, username, password, topic);
- dao.fetchDataDistribution();
+ dao.fetchDataDistribution(System.currentTimeMillis() - 2 * DateUtils.MILLIS_PER_DAY, System.currentTimeMillis());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
index cfd873a..f3e1cf8 100644
--- a/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestGreedyPartition.java
@@ -19,6 +19,7 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.partition.DataDistributionDao;
import org.apache.eagle.partition.PartitionAlgorithm;
@@ -39,6 +40,6 @@ public class TestGreedyPartition {
String topic = config.getString("dataSourceConfig.topic");
DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
- algorithm.partition(dao.fetchDataDistribution(), 4);
+ algorithm.partition(dao.fetchDataDistribution(System.currentTimeMillis() - 2 * DateUtils.MILLIS_PER_DAY, System.currentTimeMillis()), 4);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
index 631947c..e808502 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/partition/DataDistributionDaoImpl.java
@@ -55,7 +55,7 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
}
@Override
- public List<Weight> fetchDataDistribution() throws Exception {
+ public List<Weight> fetchDataDistribution(long startTime, long endTime) throws Exception {
IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password) {
@Override
public <T extends Object> GenericServiceAPIResponseEntity<T> search(EagleServiceSingleEntityQueryRequest request) throws EagleServiceClientException {
@@ -75,8 +75,6 @@ public class DataDistributionDaoImpl implements DataDistributionDao {
};
try {
String query = MetricConstants.GENERIC_METRIC_ENTITY_ENDPOINT + "[@topic=\"" + topic + "\"]<@user>{sum(value)}.{sum(value) desc}";
- long endTime = System.currentTimeMillis();
- long startTime = endTime - 2 * DateUtils.MILLIS_PER_DAY;
GenericServiceAPIResponseEntity<Map> response = client.search()
.startTime(startTime)
.endTime(endTime)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index 8e6d5d7..fda41d3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -21,6 +21,7 @@ package org.apache.eagle.security.auditlog;
import backtype.storm.spout.SchemeAsMultiScheme;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigRenderOptions;
+import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
@@ -51,7 +52,11 @@ public class HdfsAuditLogProcessorMain {
String topic = config.getString("dataSourceConfig.topic");
DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
- PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm);
+ String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
+ Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
+ String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
+ Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60;
+ PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
return strategy;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b11a223a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 3b678a3..43e5b58 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -50,7 +50,9 @@
"mailHost" : "mailHost.com",
"mailSmtpPort":"25",
"mailDebug" : "true",
- "balancePartitionEnabled" : "true",
+ "balancePartitionEnabled" : true,
+ #"partitionRefreshIntervalInMin" : 60,
+ #"kafkaStatisticRangeInMin" : 60,
"eagleService": {
"host": "localhost",
"port": 38080,