You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/01/09 01:54:33 UTC

eagle git commit: [MINOR] Refine hdfs log throughput monitor

Repository: eagle
Updated Branches:
  refs/heads/master 257a3517b -> c9c475e2a


[MINOR] Refine hdfs log throughput monitor

1. refine IEagleServiceClient
2. refine hdfs log throughput monitor
3. add topology.message.timeout.secs in topology health check app config

Author: Zhao, Qingwen <qi...@apache.org>

Closes #764 from qingwen220/minor.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/c9c475e2
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/c9c475e2
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/c9c475e2

Branch: refs/heads/master
Commit: c9c475e2a6b6e3406764c5d7fbfa44a3c5eeead8
Parents: 257a351
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Mon Jan 9 09:54:10 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Jan 9 09:54:10 2017 +0800

----------------------------------------------------------------------
 .../eagle/service/client/IEagleServiceClient.java    |  2 ++
 .../service/client/impl/EagleServiceBaseClient.java  |  7 ++++++-
 .../history/MRHistoryJobApplicationHealthCheck.java  |  3 ++-
 .../JobConfigurationCreationServiceListener.java     |  3 ++-
 .../jpm/mr/history/parser/TaskFailureListener.java   |  3 ++-
 .../running/parser/MRJobEntityCreationHandler.java   |  5 +++--
 .../eagle/jpm/mr/running/parser/MRJobParserTest.java |  2 +-
 ....jpm.spark.history.SparkHistoryJobAppProvider.xml |  2 +-
 .../security/traffic/HadoopLogAccumulatorBolt.java   | 15 ++++++++++++---
 .../security/traffic/HadoopLogTrafficPersist.java    | 12 ++++--------
 .../eagle/security/traffic/SimpleWindowCounter.java  | 12 +++++++++---
 .../auditlog/AbstractHdfsAuditLogApplication.java    |  6 ++++--
 ...gle.security.auditlog.HdfsAuditLogAppProvider.xml | 10 ++++++++--
 ...pache.eagle.topology.TopologyCheckAppProvider.xml | 12 +++++++++---
 14 files changed, 65 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
index ce62eee..01489fe 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
@@ -28,6 +28,8 @@ public interface IEagleServiceClient extends IEagleServiceRequestBuilder, Closea
 
     Client getJerseyClient();
 
+    void setReadTimeout(int timeoutMs);
+
     IEagleServiceClient silence(boolean silence);
 
     /**

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
index 3b717d8..4abf014 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
@@ -122,6 +122,10 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient {
         return sb;
     }
 
+    public void setReadTimeout(int timeoutMs) {
+        client.setReadTimeout(timeoutMs);
+    }
+
     protected static String marshall(List<?> entities) throws JsonMappingException, JsonGenerationException, IOException {
         final JsonFactory factory = new JsonFactory();
         final ObjectMapper mapper = new ObjectMapper(factory);
@@ -132,7 +136,7 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient {
     protected <E extends TaggedLogAPIEntity> Map<String,List<E>> groupEntitiesByService(List<E> entities) throws EagleServiceClientException {
         Map<String,List<E>> serviceEntityMap = new HashMap<String, List<E>>();
         if(LOG.isDebugEnabled()) LOG.debug("Grouping entities by service name");
-        for(E entity: entities){
+        for(E entity: entities) {
             if(entity == null) {
                 LOG.warn("Skip null entity");
                 continue;
@@ -303,6 +307,7 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient {
             }
         }
         this.isStopped = true;
+        this.getJerseyClient().destroy();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
index 20506c0..6f337e7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
@@ -50,7 +50,8 @@ public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBa
                 eagleServiceConfig.username,
                 eagleServiceConfig.password);
 
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
 
         String message = "";
         try {

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 96f2b3b..6dc5791 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -63,7 +63,8 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
             eagleServiceConfig.username,
             eagleServiceConfig.password);
 
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
         List<JobConfigurationAPIEntity> list = new ArrayList<>();
         list.add(jobConfigurationEntity);
 

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 61be66f..40e6432 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -118,7 +118,8 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
             eagleServiceConfig.username,
             eagleServiceConfig.password);
 
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
 
         int tried = 0;
         while (tried <= MAX_RETRY_TIMES) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index abd0594..c2cbbe5 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -77,7 +77,8 @@ public class MRJobEntityCreationHandler {
             eagleServiceConfig.eagleServicePort,
             eagleServiceConfig.username,
             eagleServiceConfig.password);
-        client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+        client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
         try {
             return createEntities(client);
         } catch (Exception e) {
@@ -105,7 +106,7 @@ public class MRJobEntityCreationHandler {
                 entities.clear();
 
             } catch (Exception e) {
-                LOG.warn("exception found when flush entities, {}", e);
+                LOG.warn("exception found when flush entities", e);
                 if (!success && count < MAX_RETRY_COUNT) {
                     LOG.info("Sleep for a while before retrying");
                     Thread.sleep(10 * 1000);

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index f2e581c..a2fb6ca 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -401,7 +401,7 @@ public class MRJobParserTest {
         Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
         Assert.assertTrue(entities.isEmpty());
         verify(client, times(2)).create(any());
-        verify(client, times(2)).getJerseyClient();
+        verify(client, times(1)).getJerseyClient();
         verify(client, times(1)).close();
 
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index ef958cc..4c4d1cd 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -61,7 +61,7 @@
         <property>
             <name>topology.message.timeout.secs</name>
             <displayName>topology message timeout (secs)</displayName>
-            <description>default timeout is 30s</description>
+            <description>default timeout is 300s</description>
             <value>300</value>
         </property>
         <property>

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
index c5cc0df..3377b4a 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
@@ -50,6 +50,7 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
     private int taskId;
     private String site;
     private String appId;
+    private Config config;
     private HadoopLogTrafficPersist client;
     private SimpleWindowCounter accumulator;
     private OutputCollector collector;
@@ -67,15 +68,15 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
         } else {
             this.windowSize = DEFAULT_WINDOW_SIZE;
         }
-        this.accumulator = new SimpleWindowCounter(windowSize);
-        this.client = new HadoopLogTrafficPersist(config);
-
+        this.config = config;
     }
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         this.taskId = context.getThisTaskId();
         this.collector = collector;
+        this.client = new HadoopLogTrafficPersist(config);
+        this.accumulator = new SimpleWindowCounter(windowSize);
     }
 
     @Override
@@ -87,6 +88,9 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
             collector.ack(input);
             if (!isOrdered(timeInMin)) {
                 LOG.warn("data is out of order, the estimated throughput may be incorrect");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("time queue {} with event timestamp={}", accumulator.getTimeQueue().toString(), timeInMin);
+                }
                 return;
             }
             if (accumulator.isFull()) {
@@ -124,4 +128,9 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
     }
+
+    @Override
+    public void cleanup() {
+        this.client.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
index 29f61ca..59061c1 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
@@ -28,20 +28,20 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 public class HadoopLogTrafficPersist implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(HadoopLogTrafficPersist.class);
     private static final String SINK_BATCH_SIZE = "dataSinkConfig.metricSinkBatchSize";
-    private final Config config;
     private IEagleServiceClient client;
     private int batchSize;
-    private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
+    private List<TaggedLogAPIEntity> entityBucket = new ArrayList<>();
 
     public HadoopLogTrafficPersist(Config config) {
-        this.config = config;
         this.batchSize = config.hasPath(SINK_BATCH_SIZE) ? config.getInt(SINK_BATCH_SIZE) : 1;
+        this.client = new EagleServiceClientImpl(config);
     }
 
     public void emitMetric(GenericMetricEntity metricEntity) {
@@ -51,11 +51,9 @@ public class HadoopLogTrafficPersist implements Serializable {
         }
 
         try {
-            client = new EagleServiceClientImpl(config);
             GenericServiceAPIResponseEntity response = client.create(entityBucket);
             if (response.isSuccess()) {
-                LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
-
+                LOG.info("persist {} entities with the earliest time={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
             } else {
                 LOG.error("Service side error: {}", response.getException());
             }
@@ -63,14 +61,12 @@ public class HadoopLogTrafficPersist implements Serializable {
             LOG.error(e.getMessage(), e);
         } finally {
             entityBucket.clear();
-            close();
         }
     }
 
     public void close() {
         try {
             if (client != null) {
-                this.client.getJerseyClient().destroy();
                 this.client.close();
             }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
index 5293577..4b5cbc8 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
@@ -23,17 +23,19 @@ import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * non-thread safe
+ */
 public class SimpleWindowCounter implements Serializable {
 
     private int windowSize;
-
     private Map<Long, Long> counter;
     private Queue<Long> timeQueue;
 
     public SimpleWindowCounter(int size) {
         this.windowSize = size;
         counter = new ConcurrentHashMap<>(windowSize);
-        timeQueue = new PriorityQueue<>();
+        timeQueue = new PriorityQueue<>(windowSize, (a,b) -> a.compareTo(b));
     }
 
     public boolean insert(long timestamp, long countVal) {
@@ -63,7 +65,7 @@ public class SimpleWindowCounter implements Serializable {
         return counter.isEmpty();
     }
 
-    public synchronized Tuple2<Long, Long> poll() {
+    public Tuple2<Long, Long> poll() {
         long oldestTimestamp = timeQueue.poll();
         Tuple2<Long, Long> pair = new Tuple2<>(oldestTimestamp, counter.get(oldestTimestamp));
         counter.remove(oldestTimestamp);
@@ -74,4 +76,8 @@ public class SimpleWindowCounter implements Serializable {
         return timeQueue.peek();
     }
 
+    public Queue<Long> getTimeQueue() {
+        return timeQueue;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index bc8ceb1..b21d62d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -47,6 +47,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
     public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks";
     public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
     public final static String TRAFFIC_MONITOR_ENABLED = "dataSinkConfig.trafficMonitorEnabled";
+    private final static String TRAFFIC_MONITOR_TASK_NUM = "topology.numOfTrafficMonitorTasks";
 
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
@@ -59,6 +60,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
         int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM);
         int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
         int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+        int numOfTrafficMonitorTasks = config.hasPath(TRAFFIC_MONITOR_TASK_NUM) ? config.getInt(TRAFFIC_MONITOR_TASK_NUM) : numOfParserTasks;
 
         builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
 
@@ -88,8 +90,8 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
 
         if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) {
             HadoopLogAccumulatorBolt auditLogAccumulator = new HadoopLogAccumulatorBolt(config);
-            BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfParserTasks);
-            auditLogAccumulatorDeclarer.setNumTasks(numOfParserTasks).shuffleGrouping("parserBolt");
+            BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfTrafficMonitorTasks);
+            auditLogAccumulatorDeclarer.setNumTasks(numOfTrafficMonitorTasks).shuffleGrouping("parserBolt");
         }
 
         // ------------------------------

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index f82f8b3..49694a5 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -62,6 +62,12 @@
             <description>number of sink tasks</description>
         </property>
         <property>
+            <name>topology.numOfTrafficMonitorTasks</name>
+            <displayName>Topology Traffic Monitor Tasks</displayName>
+            <value>2</value>
+            <description>number of traffic monitor tasks</description>
+        </property>
+        <property>
             <name>topology.message.timeout.secs</name>
             <displayName>topology message timeout (secs)</displayName>
             <description>default timeout is 60s</description>
@@ -172,8 +178,8 @@
         <property>
             <name>dataSinkConfig.metricSinkBatchSize</name>
             <displayName>Batch Size for Flushing Traffic Metrics</displayName>
-            <value>1</value>
-            <description>batch size of flushing metrics </description>
+            <value>10</value>
+            <description>batch size of flushing metrics</description>
         </property>
 
         <!-- web app related configurations -->

http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index c5a0e84..bfe43ed 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -35,6 +35,12 @@
             <value>5</value>
         </property>
         <property>
+            <name>topology.message.timeout.secs</name>
+            <displayName>topology message timeout (secs)</displayName>
+            <description>default timeout is 60s</description>
+            <value>60</value>
+        </property>
+        <property>
             <name>topology.numDataFetcherSpout</name>
             <displayName>Spout Task Number</displayName>
             <description>spout task number</description>
@@ -42,15 +48,15 @@
         </property>
         <property>
             <name>topology.numEntityPersistBolt</name>
-            <displayName>Storage Bolt Task Number</displayName>
-            <description>number of persist tasks to the storage</description>
+            <displayName>Data Storage Task Number</displayName>
+            <description>number of persist tasks writing to the storage</description>
             <value>1</value>
         </property>
         <property>
             <name>topology.numOfKafkaSinkBolt</name>
             <displayName>Kafka Sink Task Number</displayName>
             <value>2</value>
-            <description>number of sinks to alert engine</description>
+            <description>number of sinks connected to alert engine</description>
         </property>
 
         <property>