You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/12/29 04:11:46 UTC

eagle git commit: [EAGLE-846] HDFS audit log traffic monitoring

Repository: eagle
Updated Branches:
  refs/heads/master 62f8c78d3 -> 77fbff720


[EAGLE-846] HDFS audit log traffic monitoring

https://issues.apache.org/jira/browse/EAGLE-846

Author: Zhao, Qingwen <qi...@apache.org>
Author: Qingwen Zhao <qi...@gmail.com>

Closes #756 from qingwen220/EAGLE-846.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/77fbff72
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/77fbff72
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/77fbff72

Branch: refs/heads/master
Commit: 77fbff720145e08e5ea6ad30117317ef5b1e031d
Parents: 62f8c78
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Thu Dec 29 12:11:35 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Thu Dec 29 12:11:35 2016 +0800

----------------------------------------------------------------------
 .../app/environment/impl/StormEnvironment.java  |   4 +
 .../app/messaging/EntityStreamPersist.java      |  98 ++++++++++++++
 .../impl/ApplicationHealthCheckServiceImpl.java |   1 +
 .../org/apache/eagle/common/DateTimeUtil.java   |  22 ++++
 .../apache/eagle/common/TestDateTimeUtil.java   |  15 +++
 .../eagle/security/hdfs/HDFSAuditLogObject.java |   8 ++
 .../eagle/security/hdfs/HDFSAuditLogParser.java |  11 +-
 .../traffic/HadoopLogAccumulatorBolt.java       | 127 +++++++++++++++++++
 .../traffic/HadoopLogTrafficPersist.java        |  81 ++++++++++++
 .../security/traffic/SimpleWindowCounter.java   |  77 +++++++++++
 .../AbstractHdfsAuditLogApplication.java        |  13 +-
 .../auditlog/HdfsAuditLogApplication.java       |   4 +-
 .../auditlog/HdfsAuditLogParserBolt.java        |  33 +++--
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |  26 ++++
 .../auditlog/TestUserCommandReassembler.java    |   3 -
 .../auditlog/MapRFSAuditLogApplication.java     |   2 +-
 ...urity.auditlog.MapRFSAuditLogAppProvider.xml |   9 +-
 .../apache/eagle/server/ServerApplication.java  |  10 +-
 .../eagle/topology/TopologyCheckAppConfig.java  |   4 +-
 ....eagle.topology.TopologyCheckAppProvider.xml |  18 ++-
 .../src/main/resources/application.conf         |   8 +-
 .../src/test/resources/application.conf         |  16 ++-
 22 files changed, 554 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 59c8277..942a0ac 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -48,6 +48,10 @@ public class StormEnvironment extends AbstractEnvironment {
         return new MetricStreamPersist(metricDefinition, config);
     }
 
+    public EntityStreamPersist getEntityPersist(Config config) {
+        return new EntityStreamPersist(config);
+    }
+
     public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) {
         return new MetricSchemaGenerator(metricDefinition, config);
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
new file mode 100644
index 0000000..e216dc6
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/EntityStreamPersist.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.app.messaging;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class EntityStreamPersist extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityStreamPersist.class);
+
+    private final Config config;
+    private IEagleServiceClient client;
+    private OutputCollector collector;
+    private int batchSize;
+    private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
+
+    public EntityStreamPersist(Config config) {
+        this.config = config;
+        this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.client = new EagleServiceClientImpl(config);
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        List<? extends TaggedLogAPIEntity> entities = (List<? extends TaggedLogAPIEntity>) input.getValue(0);
+        entityBucket.addAll(entities);
+
+        if (entityBucket.size() < batchSize) {
+            return;
+        }
+
+        try {
+            GenericServiceAPIResponseEntity response = client.create(entityBucket);
+            if (response.isSuccess()) {
+                LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
+                collector.ack(input);
+            } else {
+                LOG.error("Service side error: {}", response.getException());
+                collector.reportError(new IllegalStateException(response.getException()));
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            collector.fail(input);
+        }
+        entityBucket.clear();
+    }
+
+    @Override
+    public void cleanup() {
+        try {
+            this.client.getJerseyClient().destroy();
+            this.client.close();
+        } catch (IOException e) {
+            LOG.error("Close client error: {}", e.getMessage(), e);
+        } finally {
+            super.cleanup();
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
index 1607b0f..8403d55 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationHealthCheckServiceImpl.java
@@ -49,6 +49,7 @@ public class ApplicationHealthCheckServiceImpl extends ApplicationHealthCheckSer
     private int initialDelay = 10;
     private int period = 300;
 
+    public static final String HEALTH_CHECK_PATH = "application.healthCheck";
     private static final String HEALTH_INITIAL_DELAY_PATH = "application.healthCheck.initialDelay";
     private static final String HEALTH_PERIOD_PATH = "application.healthCheck.period";
     private static final String HEALTH_PUBLISHER_PATH = "application.healthCheck.publisher";

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
index a69feda..98a5a4b 100644
--- a/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
+++ b/eagle-core/eagle-common/src/main/java/org/apache/eagle/common/DateTimeUtil.java
@@ -61,6 +61,14 @@ public class DateTimeUtil {
         return sdf.format(t);
     }
 
+    public static String secondsToHumanDate(long seconds, TimeZone timeZone) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(timeZone);
+        Date t = new Date();
+        t.setTime(seconds * 1000);
+        return sdf.format(t);
+    }
+
     public static String millisecondsToHumanDateWithMilliseconds(long milliseconds) {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
@@ -84,6 +92,13 @@ public class DateTimeUtil {
         return d.getTime() / 1000;
     }
 
+    public static long humanDateToSeconds(String date, TimeZone timeZone) throws ParseException {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        sdf.setTimeZone(timeZone);
+        Date d = sdf.parse(date);
+        return d.getTime() / 1000;
+    }
+
     public static long humanDateToMilliseconds(String date) throws ParseException {
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
         sdf.setTimeZone(CURRENT_TIME_ZONE);
@@ -91,6 +106,13 @@ public class DateTimeUtil {
         return d.getTime();
     }
 
+    public static long humanDateToMilliseconds(String date, TimeZone timeZone) throws ParseException {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+        sdf.setTimeZone(timeZone);
+        Date d = sdf.parse(date);
+        return d.getTime();
+    }
+
 
     public static long humanDateToMillisecondsWithoutException(String date) {
         try {

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
index a008723..42dad4a 100755
--- a/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
+++ b/eagle-core/eagle-common/src/test/java/org/apache/eagle/common/TestDateTimeUtil.java
@@ -19,8 +19,10 @@ package org.apache.eagle.common;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.text.ParseException;
 import java.util.Calendar;
 import java.util.GregorianCalendar;
+import java.util.TimeZone;
 
 public class TestDateTimeUtil {
     @Test
@@ -84,4 +86,17 @@ public class TestDateTimeUtil {
         //cal.setTimeInMillis(System.currentTimeMillis());
         System.out.println(cal.get(Calendar.DAY_OF_WEEK));
     }
+
+    @Test
+    public void testTimeZone() throws ParseException {
+        for (String s : TimeZone.getAvailableIDs()) {
+            System.out.println(s);
+        }
+        String date = "2016-12-23 07:35:49";
+        TimeZone timeZone = TimeZone.getTimeZone("GMT+8");
+        long timestamp = DateTimeUtil.humanDateToSeconds(date, timeZone);
+        String dateUTC = "2016-12-22 23:35:49";
+        timeZone = TimeZone.getTimeZone("UTC");
+        Assert.assertTrue(DateTimeUtil.secondsToHumanDate(timestamp, timeZone).equals(dateUTC));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
index a294ce1..1cc1a2a 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogObject.java
@@ -27,4 +27,12 @@ public class HDFSAuditLogObject {
     public String cmd;
     public String src;
     public String dst;
+
+    public static final String HDFS_TIMESTAMP_KEY = "timestamp";
+    public static final String HDFS_HOST_KEY = "host";
+    public static final String HDFS_ALLOWED_KEY = "allowed";
+    public static final String HDFS_USER_KEY = "user";
+    public static final String HDFS_CMD_KEY = "cmd";
+    public static final String HDFS_SRC_KEY = "src";
+    public static final String HDFS_DST_KEY = "dst";
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
index 7257975..b4f4017 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
@@ -22,15 +22,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.TimeZone;
 
 /**
  * e.g. 2015-09-21 21:36:52,172 INFO FSNamesystem.audit: allowed=true   ugi=hadoop (auth:KERBEROS)     ip=/x.x.x.x   cmd=getfileinfo src=/tmp   dst=null        perm=null       proto=rpc
  */
 
 public final class HDFSAuditLogParser implements Serializable {
-    private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class);
+    private static final Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class);
+    private TimeZone timeZone;
 
     public HDFSAuditLogParser() {
+        this.timeZone = DateTimeUtil.CURRENT_TIME_ZONE;
+    }
+
+    public HDFSAuditLogParser(TimeZone timeZone) {
+        this.timeZone = timeZone;
     }
 
     public static String parseUser(String ugi) {
@@ -91,7 +98,7 @@ public final class HDFSAuditLogParser implements Serializable {
         entity.dst = dst;
         entity.host = ip;
         entity.allowed = Boolean.valueOf(allowed);
-        entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data);
+        entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data, timeZone);
         return entity;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
new file mode 100644
index 0000000..c5cc0df
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.security.traffic;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.utils.Tuple2;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.eagle.app.utils.ApplicationExecutionConfig.APP_ID_KEY;
+import static org.apache.eagle.app.utils.ApplicationExecutionConfig.SITE_ID_KEY;
+
+public class HadoopLogAccumulatorBolt extends BaseRichBolt {
+    private static Logger LOG = LoggerFactory.getLogger(HadoopLogAccumulatorBolt.class);
+
+    private static final int DEFAULT_WINDOW_SIZE = 10;
+    private static final String HADOOP_LOG_METRIC_NAME = "hadoop.log.count.minute";
+    private static final String HDFS_COUNTER_WINDOW_SIZE = "dataSinkConfig.metricWindowSize";
+
+    private int taskId;
+    private String site;
+    private String appId;
+    private HadoopLogTrafficPersist client;
+    private SimpleWindowCounter accumulator;
+    private OutputCollector collector;
+    private int windowSize;
+
+    public HadoopLogAccumulatorBolt(Config config) {
+        if (config.hasPath(SITE_ID_KEY)) {
+            this.site = config.getString(SITE_ID_KEY);
+        }
+        if (config.hasPath(APP_ID_KEY)) {
+            this.appId = config.getString(APP_ID_KEY);
+        }
+        if (config.hasPath(HDFS_COUNTER_WINDOW_SIZE)) {
+            this.windowSize = config.getInt(HDFS_COUNTER_WINDOW_SIZE);
+        } else {
+            this.windowSize = DEFAULT_WINDOW_SIZE;
+        }
+        this.accumulator = new SimpleWindowCounter(windowSize);
+        this.client = new HadoopLogTrafficPersist(config);
+
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.taskId = context.getThisTaskId();
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(0);
+        long timeInMs = (long) toBeCopied.get(HDFSAuditLogObject.HDFS_TIMESTAMP_KEY);
+        long timeInMin = DateTimeUtil.roundDown(Calendar.MINUTE, timeInMs);
+        try {
+            collector.ack(input);
+            if (!isOrdered(timeInMin)) {
+                LOG.warn("data is out of order, the estimated throughput may be incorrect");
+                return;
+            }
+            if (accumulator.isFull()) {
+                Tuple2<Long, Long> pair = accumulator.poll();
+                GenericMetricEntity metric = generateMetric(pair.f0(), pair.f1());
+                client.emitMetric(metric);
+            } else {
+                accumulator.insert(timeInMin, 1);
+            }
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+        }
+    }
+
+    private boolean isOrdered(long timestamp) {
+        if (accumulator.isEmpty() || !accumulator.isFull()) {
+            return true;
+        }
+        return accumulator.peek() <= timestamp + windowSize * DateTimeUtil.ONEMINUTE;
+    }
+
+    private GenericMetricEntity generateMetric(long timestamp, long count) {
+        GenericMetricEntity metricEntity = new GenericMetricEntity();
+        Map<String, String> tags = new HashMap<>();
+        tags.put("appId", appId);
+        tags.put("site", site);
+        tags.put("taskId", String.valueOf(taskId));
+        metricEntity.setTimestamp(timestamp);
+        metricEntity.setTags(tags);
+        metricEntity.setPrefix(HADOOP_LOG_METRIC_NAME);
+        metricEntity.setValue(new double[] {count});
+        return metricEntity;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
new file mode 100644
index 0000000..29f61ca
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
@@ -0,0 +1,81 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.security.traffic;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class HadoopLogTrafficPersist implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(HadoopLogTrafficPersist.class);
+    private static final String SINK_BATCH_SIZE = "dataSinkConfig.metricSinkBatchSize";
+    private final Config config;
+    private IEagleServiceClient client;
+    private int batchSize;
+    private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
+
+    public HadoopLogTrafficPersist(Config config) {
+        this.config = config;
+        this.batchSize = config.hasPath(SINK_BATCH_SIZE) ? config.getInt(SINK_BATCH_SIZE) : 1;
+    }
+
+    public void emitMetric(GenericMetricEntity metricEntity) {
+        entityBucket.add(metricEntity);
+        if (entityBucket.size() < batchSize) {
+            return;
+        }
+
+        try {
+            client = new EagleServiceClientImpl(config);
+            GenericServiceAPIResponseEntity response = client.create(entityBucket);
+            if (response.isSuccess()) {
+                LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
+
+            } else {
+                LOG.error("Service side error: {}", response.getException());
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        } finally {
+            entityBucket.clear();
+            close();
+        }
+    }
+
+    public void close() {
+        try {
+            if (client != null) {
+                this.client.getJerseyClient().destroy();
+                this.client.close();
+            }
+        } catch (IOException e) {
+            LOG.error("Close client error: {}", e.getMessage(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
new file mode 100644
index 0000000..5293577
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
@@ -0,0 +1,77 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.security.traffic;
+
+import org.apache.eagle.common.utils.Tuple2;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SimpleWindowCounter implements Serializable {
+
+    private int windowSize;
+
+    private Map<Long, Long> counter;
+    private Queue<Long> timeQueue;
+
+    public SimpleWindowCounter(int size) {
+        this.windowSize = size;
+        counter = new ConcurrentHashMap<>(windowSize);
+        timeQueue = new PriorityQueue<>();
+    }
+
+    public boolean insert(long timestamp, long countVal) {
+        boolean success = true;
+        if (counter.containsKey(timestamp)) {
+            counter.put(timestamp, counter.get(timestamp) + countVal);
+        } else {
+            if (counter.size() < windowSize) {
+                counter.put(timestamp, countVal);
+                timeQueue.add(timestamp);
+            } else {
+                success =false;
+            }
+        }
+        return success;
+    }
+
+    public int getSize() {
+        return counter.size();
+    }
+
+    public boolean isFull() {
+        return counter.size() >= windowSize;
+    }
+
+    public boolean isEmpty() {
+        return counter.isEmpty();
+    }
+
+    public synchronized Tuple2<Long, Long> poll() {
+        long oldestTimestamp = timeQueue.poll();
+        Tuple2<Long, Long> pair = new Tuple2<>(oldestTimestamp, counter.get(oldestTimestamp));
+        counter.remove(oldestTimestamp);
+        return pair;
+    }
+
+    public long peek() {
+        return timeQueue.peek();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index 6d7022b..bc8ceb1 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -28,9 +28,11 @@ import com.typesafe.config.Config;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.EntityStreamPersist;
 import org.apache.eagle.app.messaging.StormStreamSink;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.dataproc.impl.storm.partition.*;
+import org.apache.eagle.security.traffic.HadoopLogAccumulatorBolt;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
 import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
@@ -44,6 +46,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
     public final static String SENSITIVITY_JOIN_TASK_NUM = "topology.numOfSensitivityJoinTasks";
     public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks";
     public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+    public final static String TRAFFIC_MONITOR_ENABLED = "dataSinkConfig.trafficMonitorEnabled";
 
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
@@ -63,7 +66,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
         // ingest -> parserBolt
         // ---------------------
 
-        BaseRichBolt parserBolt = getParserBolt();
+        BaseRichBolt parserBolt = getParserBolt(config);
         BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest");
         boltDeclarer.shuffleGrouping("ingest");
 
@@ -83,6 +86,12 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
         // sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
         sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt");
 
+        if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) {
+            HadoopLogAccumulatorBolt auditLogAccumulator = new HadoopLogAccumulatorBolt(config);
+            BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfParserTasks);
+            auditLogAccumulatorDeclarer.setNumTasks(numOfParserTasks).shuffleGrouping("parserBolt");
+        }
+
         // ------------------------------
         // sensitivityJoin -> ipZoneJoin
         // ------------------------------
@@ -101,7 +110,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
         return builder.createTopology();
     }
 
-    public abstract BaseRichBolt getParserBolt();
+    public abstract BaseRichBolt getParserBolt(Config config);
 
     public abstract String getSinkStreamName();
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
index 6d3c58c..5f300f3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
@@ -30,8 +30,8 @@ import com.typesafe.config.ConfigFactory;
  */
 public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication {
     @Override
-    public BaseRichBolt getParserBolt() {
-        return new HdfsAuditLogParserBolt();
+    public BaseRichBolt getParserBolt(Config config) {
+        return new HdfsAuditLogParserBolt(config);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
index 4590e8a..7b9d9a5 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
@@ -25,13 +25,17 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
 import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.TreeMap;
 
 /**
@@ -39,8 +43,19 @@ import java.util.TreeMap;
  */
 public class HdfsAuditLogParserBolt extends BaseRichBolt {
     private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class);
+    private static final String DATASOURCE_TIMEZONE_PATH = "dataSourceConfig.timeZone";
+
     private OutputCollector collector;
-    private static final HDFSAuditLogParser parser = new HDFSAuditLogParser();
+    private HDFSAuditLogParser parser;
+
+    public HdfsAuditLogParserBolt(Config config) {
+        if (config.hasPath(DATASOURCE_TIMEZONE_PATH)) {
+            TimeZone timeZone = TimeZone.getTimeZone(config.getString(DATASOURCE_TIMEZONE_PATH));
+            parser = new HDFSAuditLogParser(timeZone);
+        } else {
+            parser = new HDFSAuditLogParser();
+        }
+    }
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
@@ -54,14 +69,14 @@ public class HdfsAuditLogParserBolt extends BaseRichBolt {
         try {
             entity = parser.parse(logLine);
             Map<String, Object> map = new TreeMap<>();
-            map.put("src", entity.src);
-            map.put("dst", entity.dst);
-            map.put("host", entity.host);
-            map.put("timestamp", entity.timestamp);
-            map.put("allowed", entity.allowed);
-            map.put("user", entity.user);
-            map.put("cmd", entity.cmd);
-            collector.emit(Collections.singletonList(map));
+            map.put(HDFSAuditLogObject.HDFS_SRC_KEY, entity.src);
+            map.put(HDFSAuditLogObject.HDFS_DST_KEY, entity.dst);
+            map.put(HDFSAuditLogObject.HDFS_HOST_KEY, entity.host);
+            map.put(HDFSAuditLogObject.HDFS_TIMESTAMP_KEY, entity.timestamp);
+            map.put(HDFSAuditLogObject.HDFS_ALLOWED_KEY, entity.allowed);
+            map.put(HDFSAuditLogObject.HDFS_USER_KEY, entity.user);
+            map.put(HDFSAuditLogObject.HDFS_CMD_KEY, entity.cmd);
+            collector.emit(input, Collections.singletonList(map));
         } catch (Exception ex) {
             LOG.error("Failing parse audit log message {}", logLine, ex);
         } finally {

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 1108497..90f9e5b 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -90,6 +90,13 @@
             <description>scheme class</description>
             <required>true</required>
         </property>
+        <property>
+            <name>dataSourceConfig.timeZone</name>
+            <displayName>Log Time Zone</displayName>
+            <description>time zone of hdfs audit log </description>
+            <value>GMT</value>
+            <required>true</required>
+        </property>
 
         <!-- data enrich configurations -->
         <property>
@@ -150,6 +157,25 @@
             <value>0</value>
             <description>value controls when a produce request is considered completed</description>
         </property>
+        <property>
+            <name>dataSinkConfig.trafficMonitorEnabled</name>
+            <displayName>Log Traffic Monitor Enabled</displayName>
+            <value>false</value>
+            <description>enable the log throughput calculation</description>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSinkConfig.metricWindowSize</name>
+            <displayName>Window Size for Traffic Counting</displayName>
+            <value>10</value>
+            <description>window size to calculate the throughput</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.metricSinkBatchSize</name>
+            <displayName>Batch Size for Flushing Traffic Metrics</displayName>
+            <value>1</value>
+            <description>batch size of flushing metrics </description>
+        </property>
 
         <!-- web app related configurations -->
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
index caea37a..f7ddad2 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
@@ -30,9 +30,6 @@ import org.junit.Test;
 
 import java.util.*;
 
-/**
- * Created by yonzhang on 11/24/15.
- */
 public class TestUserCommandReassembler {
     private Map parseEvent(String log) throws Exception{
         HDFSAuditLogParser deserializer = new HDFSAuditLogParser();

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
index 3319164..b72cb13 100644
--- a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
+++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogApplication.java
@@ -27,7 +27,7 @@ import com.typesafe.config.ConfigFactory;
  */
 public class MapRFSAuditLogApplication extends AbstractHdfsAuditLogApplication {
     @Override
-    public BaseRichBolt getParserBolt() {
+    public BaseRichBolt getParserBolt(Config config) {
         return new MapRFSAuditLogParserBolt();
     }
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
index c54dd28..074b828 100644
--- a/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.MapRFSAuditLogAppProvider.xml
@@ -21,10 +21,9 @@
   -->
 
 <application>
-    <type>MapRFSAuditLogApplication</type>
+    <type>MAPR_HDFS_AUDIT_LOG_MONITOR_APP</type>
     <name>MapRFS Audit Log Monitoring Application</name>
     <version>0.5.0-incubating</version>
-    <appClass>org.apache.eagle.security.auditlog.MapRFSAuditLogApplication</appClass>
     <viewPath>/apps/example</viewPath>
     <configuration>
         <!-- topology related configurations -->
@@ -52,6 +51,12 @@
             <value>2</value>
             <description>number of sink tasks</description>
         </property>
+        <property>
+            <name>topology.numOfMetricSinkTasks</name>
+            <displayName>Topology Metric Sink Tasks</displayName>
+            <value>1</value>
+            <description>number of metric sink tasks</description>
+        </property>
 
         <!-- data source configurations -->
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
index ccf3c28..4118892 100644
--- a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
+++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
@@ -46,6 +46,8 @@ import org.slf4j.LoggerFactory;
 import javax.servlet.DispatcherType;
 import java.util.EnumSet;
 
+import static org.apache.eagle.app.service.impl.ApplicationHealthCheckServiceImpl.HEALTH_CHECK_PATH;
+
 class ServerApplication extends Application<ServerConfig> {
     private static final Logger LOG = LoggerFactory.getLogger(ServerApplication.class);
     @Inject
@@ -121,9 +123,11 @@ class ServerApplication extends Application<ServerConfig> {
         environment.lifecycle().manage(updateAppStatusTask);
 
         // Initialize application extended health checks.
-        LOG.debug("Registering ApplicationHealthCheckService");
-        applicationHealthCheckService.init(environment);
-        environment.lifecycle().manage(new ManagedService(applicationHealthCheckService));
+        if (config.hasPath(HEALTH_CHECK_PATH)) {
+            LOG.debug("Registering ApplicationHealthCheckService");
+            applicationHealthCheckService.init(environment);
+            environment.lifecycle().manage(new ManagedService(applicationHealthCheckService));
+        }
 
         // Load application shared extension services.
         LOG.debug("Registering application shared extension services");

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
index da6cd46..f6d61f6 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
+++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckAppConfig.java
@@ -98,14 +98,14 @@ public class TopologyCheckAppConfig implements Serializable {
             hBaseConfig.hbaseMasterPrincipal = getOptionalConfig("dataSourceConfig.hbase.kerberos.master.principal", null);
         }
 
-        if (config.hasPath("dataSourceConfig.mr")) {
+        if (config.hasPath("dataSourceConfig.mr") && config.getBoolean("dataSourceConfig.mr.enabled")) {
             topologyTypes.add(TopologyConstants.TopologyType.MR);
             mrConfig = new MRConfig();
             mrConfig.rmUrls = config.getString("dataSourceConfig.mr.rmUrl").split(",\\s*");
             mrConfig.historyServerUrl = getOptionalConfig("dataSourceConfig.mr.historyServerUrl", null);
         }
 
-        if (config.hasPath("dataSourceConfig.hdfs")) {
+        if (config.hasPath("dataSourceConfig.hdfs") && config.getBoolean("dataSourceConfig.hdfs.enabled")) {
             topologyTypes.add(TopologyConstants.TopologyType.HDFS);
             hdfsConfig = new HdfsConfig();
             hdfsConfig.namenodeUrls = config.getString("dataSourceConfig.hdfs.namenodeUrl").split(",\\s*");

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index 0b83e9b..2089a2f 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -77,6 +77,19 @@
             <displayName>Hdfs Namenode Web URL</displayName>
             <description>hdfs namenode web url for HDFS monitor</description>
             <value>http://sandbox.hortonworks.com:50070</value>
+        </property>
+        <property>
+            <name>dataSourceConfig.hdfs.enabled</name>
+            <displayName>HDFS Topology Check Enabled</displayName>
+            <description>HDFS topology status check enabled</description>
+            <value>false</value>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSourceConfig.mr.enabled</name>
+            <displayName>MR Topology Check Enabled</displayName>
+            <description>MR topology status check enabled</description>
+            <value>false</value>
             <required>true</required>
         </property>
         <property>
@@ -84,7 +97,6 @@
             <displayName>Resource Manager URL</displayName>
             <description>resource manager url for YARN monitor</description>
             <value>http://sandbox.hortonworks.com:8088</value>
-            <required>true</required>
         </property>
         <property>
             <name>dataSourceConfig.mr.historyServerUrl</name>
@@ -94,8 +106,8 @@
         </property>
         <property>
             <name>dataSourceConfig.hbase.enabled</name>
-            <displayName>HBase Config Enabled</displayName>
-            <description>enabled for HBase monitor</description>
+            <displayName>HBase Topology Check Enabled</displayName>
+            <description>HBase topology status check enabled</description>
             <value>false</value>
             <required>true</required>
         </property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
index 4d22912..5d5f1d3 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/application.conf
@@ -33,8 +33,12 @@
   }
 
   dataSourceConfig : {
-    hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070",
+    hdfs: {
+      enabled: false,
+      namenodeUrl: "http://sandbox.hortonworks.com:50070",
+    }
     mr: {
+      enabled: false,
       rmUrl: "http://sandbox.hortonworks.com:50030",
       historyServerUrl : "http://sandbox.hortonworks.com:19888"  #if not need, then empty
     }
@@ -49,7 +53,7 @@
         eagle.principal: "", #if not need, then empty
         eagle.keytab: ""
       }
-    },
+    }
   }
   
   "dataSinkConfig": {

http://git-wip-us.apache.org/repos/asf/eagle/blob/77fbff72/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
index be84c1d..da52f65 100644
--- a/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
+++ b/eagle-topology-check/eagle-topology-app/src/test/resources/application.conf
@@ -28,20 +28,26 @@
   }
 
   dataSourceConfig : {
-    hdfs.namenodeUrl: "http://sandbox.hortonworks.com:50070",
+    hdfs: {
+      enabled: false,
+      namenodeUrl: "http://sandbox.hortonworks.com:50070",
+    }
+    mr: {
+      enabled: false,
+      rmUrl: "http://sandbox.hortonworks.com:50030",
+      historyServerUrl : "http://sandbox.hortonworks.com:19888"  #if not need, then empty
+    }
     hbase: {
+      enabled: false,
       zkQuorum: "sandbox.hortonworks.com",
       zkPropertyClientPort : "2181",
       zkZnodeParent: "/hbase-unsecure",
+      zkRetryTimes : "5",
       kerberos : {
         master.principal : "hadoop/_HOST@EXAMPLE.COM"
         eagle.principal: "", #if not need, then empty
         eagle.keytab: ""
       }
-    },
-    mr: {
-      rmUrl: "http://sandbox.hortonworks.com:8088",
-      historyServerUrl : "http://sandbox.hortonworks.com:19888"
     }
   }