You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/29 04:11:46 UTC
eagle git commit: [EAGLE-846] HDFS audit log traffic monitoring
Repository: eagle
Updated Branches:
refs/heads/master 62f8c78d3 -> 77fbff720
[EAGLE-846] HDFS audit log traffic monitoring
https://issues.apache.org/jira/browse/EAGLE-846
Author: Zhao, Qingwen <qi...@apache.org>
Author: Qingwen Zhao <qi...@gmail.com>
Closes #756 from qingwen220/EAGLE-846.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/77fbff72
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/77fbff72
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/77fbff72
Branch: refs/heads/master
Commit: 77fbff720145e08e5ea6ad30117317ef5b1e031d
Parents: 62f8c78
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Dec 29 12:11:35 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Dec 29 12:11:35 2016 +0800
----------------------------------------------------------------------
.../app/environment/impl/StormEnvironment.java | 4 +
.../app/messaging/EntityStreamPersist.java | 98 ++++++++++++++
.../impl/ApplicationHealthCheckServiceImpl.java | 1 +
.../org/apache/eagle/common/DateTimeUtil.java | 22 ++++
.../apache/eagle/common/TestDateTimeUtil.java | 15 +++
.../eagle/security/hdfs/HDFSAuditLogObject.java | 8 ++
.../eagle/security/hdfs/HDFSAuditLogParser.java | 11 +-
.../traffic/HadoopLogAccumulatorBolt.java | 127 +++++++++++++++++++
.../traffic/HadoopLogTrafficPersist.java | 81 ++++++++++++
.../security/traffic/SimpleWindowCounter.java | 77 +++++++++++
.../AbstractHdfsAuditLogApplication.java | 13 +-
.../auditlog/HdfsAuditLogApplication.java | 4 +-
.../auditlog/HdfsAuditLogParserBolt.java | 33 +++--
...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 26 ++++
.../auditlog/TestUserCommandReassembler.java | 3 -
.../auditlog/MapRFSAuditLogApplication.java | 2 +-
...urity.auditlog.MapRFSAuditLogAppProvider.xml | 9 +-
.../apache/eagle/server/ServerApplication.java | 10 +-
.../eagle/topology/TopologyCheckAppConfig.java | 4 +-
....eagle.topology.TopologyCheckAppProvider.xml | 18 ++-
.../src/main/resources/application.conf | 8 +-
.../src/test/resources/application.conf | 16 ++-
22 files changed, 554 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 59c8277..942a0ac 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -48,6 +48,10 @@ public class StormEnvironment extends AbstractEnvironment {
return new MetricStreamPersist(metricDefinition, config);
}
+ public EntityStreamPersist getEntityPersist(Config config) {
+ return new EntityStreamPersist(config);
+ }
+
public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
return new MetricSchemaGenerator(metricDefinition, config);
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
new file mode 100644
index 0000000..e216dc6
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class EntityStreamPersist extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityStreamPersist.class);
+
+ private final Config config;
+ private IEagleServiceClient client;
+ private OutputCollector collector;
+ private int batchSize;
+ private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
+
+ public EntityStreamPersist(Config config) {
+ this.config = config;
+ this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.client = new EagleServiceClientImpl(config);
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ List<? extends TaggedLogAPIEntity> entities = (List<? extends TaggedLogAPIEntity>) input.getValue(0);
+ entityBucket.addAll(entities);
+
+ if (entityBucket.size() < batchSize) {
+ return;
+ }
+
+ try {
+ GenericServiceAPIResponseEntity response = client.create(entityBucket);
+ if (response.isSuccess()) {
+ LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
+ collector.ack(input);
+ } else {
+ LOG.error("Service side error: {}", response.getException());
+ collector.reportError(new IllegalStateException(response.getException()));
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ collector.fail(input);
+ }
+ entityBucket.clear();
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ this.client.getJerseyClient().destroy();
+ this.client.close();
+ } catch (IOException e) {
+ LOG.error("Close client error: {}", e.getMessage(), e);
+ } finally {
+ super.cleanup();
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
index 1607b0f..8403d55 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
@@ -49,6 +49,7 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
private int initialDelay = 10;
private int period = 300;
+ public static final String HEALTH_CHECK_PATH = "application.healthCheck";
private static final String HEALTH_INITIAL_DELAY_PATH = "application.healthCheck.initialDelay";
private static final String HEALTH_PERIOD_PATH = "application.healthCheck.period";
private static final String HEALTH_PUBLISHER_PATH = "application.healthCheck.publisher";
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
index a69feda..98a5a4b 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
@@ -61,6 +61,14 @@ public class DateTimeUtil {
return sdf.format(t);
}
+ public static String secondsToHumanDate(long seconds, TimeZone timeZone) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(timeZone);
+ Date t = new Date();
+ t.setTime(seconds * 1000);
+ return sdf.format(t);
+ }
+
public static String millisecondsToHumanDateWithMilliseconds(long milliseconds) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
sdf.setTimeZone(CURRENT_TIME_ZONE);
@@ -84,6 +92,13 @@ public class DateTimeUtil {
return d.getTime() / 1000;
}
+ public static long humanDateToSeconds(String date, TimeZone timeZone) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ sdf.setTimeZone(timeZone);
+ Date d = sdf.parse(date);
+ return d.getTime() / 1000;
+ }
+
public static long humanDateToMilliseconds(String date) throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
sdf.setTimeZone(CURRENT_TIME_ZONE);
@@ -91,6 +106,13 @@ public class DateTimeUtil {
return d.getTime();
}
+ public static long humanDateToMilliseconds(String date, TimeZone timeZone) throws ParseException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ sdf.setTimeZone(timeZone);
+ Date d = sdf.parse(date);
+ return d.getTime();
+ }
+
public static long humanDateToMillisecondsWithoutException(String date) {
try {
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
index a008723..42dad4a 100755
--- a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
@@ -19,8 +19,10 @@ package org.apache.eagle.common;
import org.junit.Assert;
import org.junit.Test;
+import java.text.ParseException;
import java.util.Calendar;
import java.util.GregorianCalendar;
+import java.util.TimeZone;
public class TestDateTimeUtil {
@Test
@@ -84,4 +86,17 @@ public class TestDateTimeUtil {
//cal.setTimeInMillis(System.currentTimeMillis());
System.out.println(cal.get(Calendar.DAY_OF_WEEK));
}
+
+ @Test
+ public void testTimeZone() throws ParseException {
+ for (String s : TimeZone.getAvailableIDs()) {
+ System.out.println(s);
+ }
+ String date = "2016-12-23 07:35:49";
+ TimeZone timeZone = TimeZone.getTimeZone("GMT+8");
+ long timestamp = DateTimeUtil.humanDateToSeconds(date, timeZone);
+ String dateUTC = "2016-12-22 23:35:49";
+ timeZone = TimeZone.getTimeZone("UTC");
+ Assert.assertTrue(DateTimeUtil.secondsToHumanDate(timestamp, timeZone).equals(dateUTC));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
index a294ce1..1cc1a2a 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
@@ -27,4 +27,12 @@ public class HDFSAuditLogObject {
public String cmd;
public String src;
public String dst;
+
+ public static final String HDFS_TIMESTAMP_KEY = "timestamp";
+ public static final String HDFS_HOST_KEY = "host";
+ public static final String HDFS_ALLOWED_KEY = "allowed";
+ public static final String HDFS_USER_KEY = "user";
+ public static final String HDFS_CMD_KEY = "cmd";
+ public static final String HDFS_SRC_KEY = "src";
+ public static final String HDFS_DST_KEY = "dst";
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
index 7257975..b4f4017 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
@@ -22,15 +22,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.TimeZone;
/**
* e.g. 2015-09-21 21:36:52,172 INFO FSNamesystem.audit: allowed=true ugi=hadoop (auth:KERBEROS) ip=/x.x.x.x cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc
*/
public final class HDFSAuditLogParser implements Serializable {
- private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class);
+ private TimeZone timeZone;
public HDFSAuditLogParser() {
+ this.timeZone = DateTimeUtil.CURRENT_TIME_ZONE;
+ }
+
+ public HDFSAuditLogParser(TimeZone timeZone) {
+ this.timeZone = timeZone;
}
public static String parseUser(String ugi) {
@@ -91,7 +98,7 @@ public final class HDFSAuditLogParser implements Serializable {
entity.dst = dst;
entity.host = ip;
entity.allowed = Boolean.valueOf(allowed);
- entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data);
+ entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data, timeZone);
return entity;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
new file mode 100644
index 0000000..c5cc0df
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.security.traffic;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.utils.Tuple2;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.app.utils.ApplicationExecutionConfig.APP_ID_KEY;
+import static org.apache.eagle.app.utils.ApplicationExecutionConfig.SITE_ID_KEY;
+
+public class HadoopLogAccumulatorBolt extends BaseRichBolt {
+ private static Logger LOG = LoggerFactory.getLogger(HadoopLogAccumulatorBolt.class);
+
+ private static final int DEFAULT_WINDOW_SIZE = 10;
+ private static final String HADOOP_LOG_METRIC_NAME = "hadoop.log.count.minute";
+ private static final String HDFS_COUNTER_WINDOW_SIZE = "dataSinkConfig.metricWindowSize";
+
+ private int taskId;
+ private String site;
+ private String appId;
+ private HadoopLogTrafficPersist client;
+ private SimpleWindowCounter accumulator;
+ private OutputCollector collector;
+ private int windowSize;
+
+ public HadoopLogAccumulatorBolt(Config config) {
+ if (config.hasPath(SITE_ID_KEY)) {
+ this.site = config.getString(SITE_ID_KEY);
+ }
+ if (config.hasPath(APP_ID_KEY)) {
+ this.appId = config.getString(APP_ID_KEY);
+ }
+ if (config.hasPath(HDFS_COUNTER_WINDOW_SIZE)) {
+ this.windowSize = config.getInt(HDFS_COUNTER_WINDOW_SIZE);
+ } else {
+ this.windowSize = DEFAULT_WINDOW_SIZE;
+ }
+ this.accumulator = new SimpleWindowCounter(windowSize);
+ this.client = new HadoopLogTrafficPersist(config);
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.taskId = context.getThisTaskId();
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(0);
+ long timeInMs = (long) toBeCopied.get(HDFSAuditLogObject.HDFS_TIMESTAMP_KEY);
+ long timeInMin = DateTimeUtil.roundDown(Calendar.MINUTE, timeInMs);
+ try {
+ collector.ack(input);
+ if (!isOrdered(timeInMin)) {
+ LOG.warn("data is out of order, the estimated throughput may be incorrect");
+ return;
+ }
+ if (accumulator.isFull()) {
+ Tuple2<Long, Long> pair = accumulator.poll();
+ GenericMetricEntity metric = generateMetric(pair.f0(), pair.f1());
+ client.emitMetric(metric);
+ } else {
+ accumulator.insert(timeInMin, 1);
+ }
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+
+ private boolean isOrdered(long timestamp) {
+ if (accumulator.isEmpty() || !accumulator.isFull()) {
+ return true;
+ }
+ return accumulator.peek() <= timestamp + windowSize * DateTimeUtil.ONEMINUTE;
+ }
+
+ private GenericMetricEntity generateMetric(long timestamp, long count) {
+ GenericMetricEntity metricEntity = new GenericMetricEntity();
+ Map<String, String> tags = new HashMap<>();
+ tags.put("appId", appId);
+ tags.put("site", site);
+ tags.put("taskId", String.valueOf(taskId));
+ metricEntity.setTimestamp(timestamp);
+ metricEntity.setTags(tags);
+ metricEntity.setPrefix(HADOOP_LOG_METRIC_NAME);
+ metricEntity.setValue(new double[] {count});
+ return metricEntity;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
new file mode 100644
index 0000000..29f61ca
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.security.traffic;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class HadoopLogTrafficPersist implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(HadoopLogTrafficPersist.class);
+ private static final String SINK_BATCH_SIZE = "dataSinkConfig.metricSinkBatchSize";
+ private final Config config;
+ private IEagleServiceClient client;
+ private int batchSize;
+ private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
+
+ public HadoopLogTrafficPersist(Config config) {
+ this.config = config;
+ this.batchSize = config.hasPath(SINK_BATCH_SIZE) ? config.getInt(SINK_BATCH_SIZE) : 1;
+ }
+
+ public void emitMetric(GenericMetricEntity metricEntity) {
+ entityBucket.add(metricEntity);
+ if (entityBucket.size() < batchSize) {
+ return;
+ }
+
+ try {
+ client = new EagleServiceClientImpl(config);
+ GenericServiceAPIResponseEntity response = client.create(entityBucket);
+ if (response.isSuccess()) {
+ LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
+
+ } else {
+ LOG.error("Service side error: {}", response.getException());
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ entityBucket.clear();
+ close();
+ }
+ }
+
+ public void close() {
+ try {
+ if (client != null) {
+ this.client.getJerseyClient().destroy();
+ this.client.close();
+ }
+ } catch (IOException e) {
+ LOG.error("Close client error: {}", e.getMessage(), e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
new file mode 100644
index 0000000..5293577
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.security.traffic;
+
+import org.apache.eagle.common.utils.Tuple2;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SimpleWindowCounter implements Serializable {
+
+ private int windowSize;
+
+ private Map<Long, Long> counter;
+ private Queue<Long> timeQueue;
+
+ public SimpleWindowCounter(int size) {
+ this.windowSize = size;
+ counter = new ConcurrentHashMap<>(windowSize);
+ timeQueue = new PriorityQueue<>();
+ }
+
+ public boolean insert(long timestamp, long countVal) {
+ boolean success = true;
+ if (counter.containsKey(timestamp)) {
+ counter.put(timestamp, counter.get(timestamp) + countVal);
+ } else {
+ if (counter.size() < windowSize) {
+ counter.put(timestamp, countVal);
+ timeQueue.add(timestamp);
+ } else {
+ success =false;
+ }
+ }
+ return success;
+ }
+
+ public int getSize() {
+ return counter.size();
+ }
+
+ public boolean isFull() {
+ return counter.size() >= windowSize;
+ }
+
+ public boolean isEmpty() {
+ return counter.isEmpty();
+ }
+
+ public synchronized Tuple2<Long, Long> poll() {
+ long oldestTimestamp = timeQueue.poll();
+ Tuple2<Long, Long> pair = new Tuple2<>(oldestTimestamp, counter.get(oldestTimestamp));
+ counter.remove(oldestTimestamp);
+ return pair;
+ }
+
+ public long peek() {
+ return timeQueue.peek();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index 6d7022b..bc8ceb1 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -28,9 +28,11 @@ import com.typesafe.config.Config;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.eagle.app.StormApplication;
import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.EntityStreamPersist;
import org.apache.eagle.app.messaging.StormStreamSink;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.dataproc.impl.storm.partition.*;
+import org.apache.eagle.security.traffic.HadoopLogAccumulatorBolt;
import org.apache.eagle.security.partition.DataDistributionDaoImpl;
import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
@@ -44,6 +46,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
public final static String SENSITIVITY_JOIN_TASK_NUM = "topology.numOfSensitivityJoinTasks";
public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks";
public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+ public final static String TRAFFIC_MONITOR_ENABLED = "dataSinkConfig.trafficMonitorEnabled";
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
@@ -63,7 +66,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
// ingest -> parserBolt
// ---------------------
- BaseRichBolt parserBolt = getParserBolt();
+ BaseRichBolt parserBolt = getParserBolt(config);
BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest");
boltDeclarer.shuffleGrouping("ingest");
@@ -83,6 +86,12 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
// sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt");
+ if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) {
+ HadoopLogAccumulatorBolt auditLogAccumulator = new HadoopLogAccumulatorBolt(config);
+ BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfParserTasks);
+ auditLogAccumulatorDeclarer.setNumTasks(numOfParserTasks).shuffleGrouping("parserBolt");
+ }
+
// ------------------------------
// sensitivityJoin -> ipZoneJoin
// ------------------------------
@@ -101,7 +110,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
return builder.createTopology();
}
- public abstract BaseRichBolt getParserBolt();
+ public abstract BaseRichBolt getParserBolt(Config config);
public abstract String getSinkStreamName();
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
index 6d3c58c..5f300f3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
@@ -30,8 +30,8 @@ import com.typesafe.config.ConfigFactory;
*/
public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication {
@Override
- public BaseRichBolt getParserBolt() {
- return new HdfsAuditLogParserBolt();
+ public BaseRichBolt getParserBolt(Config config) {
+ return new HdfsAuditLogParserBolt(config);
}
@Override
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
index 4590e8a..7b9d9a5 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
@@ -25,13 +25,17 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
+import java.util.TimeZone;
import java.util.TreeMap;
/**
@@ -39,8 +43,19 @@ import java.util.TreeMap;
*/
public class HdfsAuditLogParserBolt extends BaseRichBolt {
private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class);
+ private static final String DATASOURCE_TIMEZONE_PATH = "dataSourceConfig.timeZone";
+
private OutputCollector collector;
- private static final HDFSAuditLogParser parser = new HDFSAuditLogParser();
+ private HDFSAuditLogParser parser;
+
+ public HdfsAuditLogParserBolt(Config config) {
+ if (config.hasPath(DATASOURCE_TIMEZONE_PATH)) {
+ TimeZone timeZone = TimeZone.getTimeZone(config.getString(DATASOURCE_TIMEZONE_PATH));
+ parser = new HDFSAuditLogParser(timeZone);
+ } else {
+ parser = new HDFSAuditLogParser();
+ }
+ }
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
@@ -54,14 +69,14 @@ public class HdfsAuditLogParserBolt extends BaseRichBolt {
try {
entity = parser.parse(logLine);
Map<String, Object> map = new TreeMap<>();
- map.put("src", entity.src);
- map.put("dst", entity.dst);
- map.put("host", entity.host);
- map.put("timestamp", entity.timestamp);
- map.put("allowed", entity.allowed);
- map.put("user", entity.user);
- map.put("cmd", entity.cmd);
- collector.emit(Collections.singletonList(map));
+ map.put(HDFSAuditLogObject.HDFS_SRC_KEY, entity.src);
+ map.put(HDFSAuditLogObject.HDFS_DST_KEY, entity.dst);
+ map.put(HDFSAuditLogObject.HDFS_HOST_KEY, entity.host);
+ map.put(HDFSAuditLogObject.HDFS_TIMESTAMP_KEY, entity.timestamp);
+ map.put(HDFSAuditLogObject.HDFS_ALLOWED_KEY, entity.allowed);
+ map.put(HDFSAuditLogObject.HDFS_USER_KEY, entity.user);
+ map.put(HDFSAuditLogObject.HDFS_CMD_KEY, entity.cmd);
+ collector.emit(input, Collections.singletonList(map));
} catch (Exception ex) {
LOG.error("Failing parse audit log message {}", logLine, ex);
} finally {
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 1108497..90f9e5b 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -90,6 +90,13 @@
<description>scheme class</description>
<required>true</required>
</property>
+ <property>
+ <name>dataSourceConfig.timeZone</name>
+ <displayName>Log Time Zone</displayName>
+ <description>time zone of hdfs audit log </description>
+ <value>GMT</value>
+ <required>true</required>
+ </property>
<!-- data enrich configurations -->
<property>
@@ -150,6 +157,25 @@
<value>0</value>
<description>value controls when a produce request is considered completed</description>
</property>
+ <property>
+ <name>dataSinkConfig.trafficMonitorEnabled</name>
+ <displayName>Log Traffic Monitor Enabled</displayName>
+ <value>false</value>
+ <description>enable the log throughput calculation</description>
+ <required>true</required>
+ </property>
+ <property>
+ <name>dataSinkConfig.metricWindowSize</name>
+ <displayName>Window Size for Traffic Counting</displayName>
+ <value>10</value>
+ <description>window size to calculate the throughput</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.metricSinkBatchSize</name>
+ <displayName>Batch Size for Flushing Traffic Metrics</displayName>
+ <value>1</value>
+ <description>batch size of flushing metrics </description>
+ </property>
<!-- web app related configurations -->
<property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
index caea37a..f7ddad2 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
@@ -30,9 +30,6 @@ import org.junit.Test;
import java.util.*;
-/**
- * Created by yonzhang on 11/24/15.
- */
public class TestUserCommandReassembler {
private Map parseEvent(String log) throws Exception{
HDFSAuditLogParser deserializer = new HDFSAuditLogParser();
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
index 3319164..b72cb13 100644
--- a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
+++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
@@ -27,7 +27,7 @@ import com.typesafe.config.ConfigFactory;
*/
public class MapRFSAuditLogApplication extends AbstractHdfsAuditLogApplication {
@Override
- public BaseRichBolt getParserBolt() {
+ public BaseRichBolt getParserBolt(Config config) {
return new MapRFSAuditLogParserBolt();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
index c54dd28..074b828 100644
--- a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
@@ -21,10 +21,9 @@
-->
<application>
- <type>MapRFSAuditLogApplication</type>
+ <type>MAPR_HDFS_AUDIT_LOG_MONITOR_APP</type>
<name>MapRFS Audit Log Monitoring Application</name>
<version>0.5.0-incubating</version>
- <appClass>org.apache.eagle.security.auditlog.MapRFSAuditLogApplication</appClass>
<viewPath>/apps/example</viewPath>
<configuration>
<!-- topology related configurations -->
@@ -52,6 +51,12 @@
<value>2</value>
<description>number of sink tasks</description>
</property>
+ <property>
+ <name>topology.numOfMetricSinkTasks</name>
+ <displayName>Topology Metric Sink Tasks</displayName>
+ <value>1</value>
+ <description>number of metric sink tasks</description>
+ </property>
<!-- data source configurations -->
<property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
index ccf3c28..4118892 100644
--- a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
+++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
@@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory;
import javax.servlet.DispatcherType;
import java.util.EnumSet;
+import static org.apache.eagle.app.service.impl.ApplicationHealthCheckServiceImpl.HEALTH_CHECK_PATH;
+
class ServerApplication extends Application<ServerConfig> {
private static final Logger LOG = LoggerFactory.getLogger(ServerApplication.class);
@Inject
@@ -121,9 +123,11 @@ class ServerApplication extends Application<ServerConfig> {
environment.lifecycle().manage(updateAppStatusTask);
// Initialize application extended health checks.
- LOG.debug("Registering ApplicationHealthCheckService");
- applicationHealthCheckService.init(environment);
- environment.lifecycle().manage(new ManagedService(applicationHealthCheckService));
+ if (config.hasPath(HEALTH_CHECK_PATH)) {
+ LOG.debug("Registering ApplicationHealthCheckService");
+ applicationHealthCheckService.init(environment);
+ environment.lifecycle().manage(new ManagedService(applicationHealthCheckService));
+ }
// Load application shared extension services.
LOG.debug("Registering application shared extension services");
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index da6cd46..f6d61f6 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -98,14 +98,14 @@ public class TopologyCheckAppConfig implements Serializable {
hBaseConfig.hbaseMasterPrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.master.principal", null);
}
- if (config.hasPath("dataSourceConfig.mr")) {
+ if (config.hasPath("dataSourceConfig.mr") && config.getBoolean("dataSourceConfig.mr.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.MR);
mrConfig = new MRConfig();
mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null);
}
- if (config.hasPath("dataSourceConfig.hdfs")) {
+ if (config.hasPath("dataSourceConfig.hdfs") && config.getBoolean("dataSourceConfig.hdfs.enabled")) {
topologyTypes.add(TopologyConstants.TopologyType.HDFS);
hdfsConfig = new HdfsConfig();
hdfsConfig.namenodeUrls = config.getString("dataSourceConfig.hdfs.namenodeUrl").split(",\\s*");
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 0b83e9b..2089a2f 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -77,6 +77,19 @@
<displayName>Hdfs Namenode Web URL</displayName>
<description>hdfs namenode web url for HDFS monitor</description>
<value>http://sandbox.hortonworks.com:50070</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hdfs.enabled</name>
+ <displayName>HDFS Topology Check Enabled</displayName>
+ <description>HDFS topology status check enabled</description>
+ <value>false</value>
+ <required>true</required>
+ </property>
+ <property>
+ <name>dataSourceConfig.mr.enabled</name>
+ <displayName>MR Topology Check Enabled</displayName>
+ <description>MR topology status check enabled</description>
+ <value>false</value>
<required>true</required>
</property>
<property>
@@ -84,7 +97,6 @@
<displayName>Resource Manager URL</displayName>
<description>resource manager url for YARN monitor</description>
<value>http://sandbox.hortonworks.com:8088</value>
- <required>true</required>
</property>
<property>
<name>dataSourceConfig.mr.historyServerUrl</name>
@@ -94,8 +106,8 @@
</property>
<property>
<name>dataSourceConfig.hbase.enabled</name>
- <displayName>HBase Config Enabled</displayName>
- <description>enabled for HBase monitor</description>
+ <displayName>HBase Topology Check Enabled</displayName>
+ <description>HBase topology status check enabled</description>
<value>false</value>
<required>true</required>
</property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
index 4d22912..5d5f1d3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -33,8 +33,12 @@
}
dataSourceConfig : {
- hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070",
+ hdfs: {
+ enabled: false,
+ namenodeUrl: "http://sandbox.hortonworks.com:50070",
+ }
mr: {
+ enabled: false,
rmUrl: "http://sandbox.hortonworks.com:50030",
historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty
}
@@ -49,7 +53,7 @@
eagle.principal: "", #if not need, then empty
eagle.keytab: ""
}
- },
+ }
}
"dataSinkConfig": {
http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
index be84c1d..da52f65 100644
--- a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
@@ -28,20 +28,26 @@
}
dataSourceConfig : {
- hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070",
+ hdfs: {
+ enabled: false,
+ namenodeUrl: "http://sandbox.hortonworks.com:50070",
+ }
+ mr: {
+ enabled: false,
+ rmUrl: "http://sandbox.hortonworks.com:50030",
+ historyServerUrl : "http://sandbox.hortonworks.com:19888" #if not need, then empty
+ }
hbase: {
+ enabled: false,
zkQuorum: "sandbox.hortonworks.com",
zkPropertyClientPort : "2181",
zkZnodeParent: "/hbase-unsecure",
+ zkRetryTimes : "5",
kerberos : {
master.principal : "hadoop/_HOST@EXAMPLE.COM"
eagle.principal: "", #if not need, then empty
eagle.keytab: ""
}
- },
- mr: {
- rmUrl: "http://sandbox.hortonworks.com:8088",
- historyServerUrl : "http://sandbox.hortonworks.com:19888"
}
}