You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/01/09 01:54:33 UTC
eagle git commit: [MINOR] Refine hdfs log throughput monitor
Repository: eagle
Updated Branches:
refs/heads/master 257a3517b -> c9c475e2a
[MINOR] Refine hdfs log throughput monitor
1. refine IEagleServiceClient
2. refine hdfs log throughput monitor
3. add topology.message.timeout.secs in topology health check app config
Author: Zhao, Qingwen <qi...@apache.org>
Closes #764 from qingwen220/minor.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/c9c475e2
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/c9c475e2
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/c9c475e2
Branch: refs/heads/master
Commit: c9c475e2a6b6e3406764c5d7fbfa44a3c5eeead8
Parents: 257a351
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Mon Jan 9 09:54:10 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Mon Jan 9 09:54:10 2017 +0800
----------------------------------------------------------------------
.../eagle/service/client/IEagleServiceClient.java | 2 ++
.../service/client/impl/EagleServiceBaseClient.java | 7 ++++++-
.../history/MRHistoryJobApplicationHealthCheck.java | 3 ++-
.../JobConfigurationCreationServiceListener.java | 3 ++-
.../jpm/mr/history/parser/TaskFailureListener.java | 3 ++-
.../running/parser/MRJobEntityCreationHandler.java | 5 +++--
.../eagle/jpm/mr/running/parser/MRJobParserTest.java | 2 +-
....jpm.spark.history.SparkHistoryJobAppProvider.xml | 2 +-
.../security/traffic/HadoopLogAccumulatorBolt.java | 15 ++++++++++++---
.../security/traffic/HadoopLogTrafficPersist.java | 12 ++++--------
.../eagle/security/traffic/SimpleWindowCounter.java | 12 +++++++++---
.../auditlog/AbstractHdfsAuditLogApplication.java | 6 ++++--
...gle.security.auditlog.HdfsAuditLogAppProvider.xml | 10 ++++++++--
...pache.eagle.topology.TopologyCheckAppProvider.xml | 12 +++++++++---
14 files changed, 65 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
index ce62eee..01489fe 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/IEagleServiceClient.java
@@ -28,6 +28,8 @@ public interface IEagleServiceClient extends IEagleServiceRequestBuilder, Closea
Client getJerseyClient();
+ void setReadTimeout(int timeoutMs);
+
IEagleServiceClient silence(boolean silence);
/**
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
index 3b717d8..4abf014 100644
--- a/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
+++ b/eagle-core/eagle-query/eagle-client-base/src/main/java/org/apache/eagle/service/client/impl/EagleServiceBaseClient.java
@@ -122,6 +122,10 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient {
return sb;
}
+ public void setReadTimeout(int timeoutMs) {
+ client.setReadTimeout(timeoutMs);
+ }
+
protected static String marshall(List<?> entities) throws JsonMappingException, JsonGenerationException, IOException {
final JsonFactory factory = new JsonFactory();
final ObjectMapper mapper = new ObjectMapper(factory);
@@ -132,7 +136,7 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient {
protected <E extends TaggedLogAPIEntity> Map<String,List<E>> groupEntitiesByService(List<E> entities) throws EagleServiceClientException {
Map<String,List<E>> serviceEntityMap = new HashMap<String, List<E>>();
if(LOG.isDebugEnabled()) LOG.debug("Grouping entities by service name");
- for(E entity: entities){
+ for(E entity: entities) {
if(entity == null) {
LOG.warn("Skip null entity");
continue;
@@ -303,6 +307,7 @@ public abstract class EagleServiceBaseClient implements IEagleServiceClient {
}
}
this.isStopped = true;
+ this.getJerseyClient().destroy();
}
@Override
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
index 20506c0..6f337e7 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java
@@ -50,7 +50,8 @@ public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBa
eagleServiceConfig.username,
eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
String message = "";
try {
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
index 96f2b3b..6dc5791 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java
@@ -63,7 +63,8 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity
eagleServiceConfig.username,
eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
List<JobConfigurationAPIEntity> list = new ArrayList<>();
list.add(jobConfigurationEntity);
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
index 61be66f..40e6432 100755
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java
@@ -118,7 +118,8 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener {
eagleServiceConfig.username,
eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
int tried = 0;
while (tried <= MAX_RETRY_TIMES) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index abd0594..c2cbbe5 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -77,7 +77,8 @@ public class MRJobEntityCreationHandler {
eagleServiceConfig.eagleServicePort,
eagleServiceConfig.username,
eagleServiceConfig.password);
- client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ //client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
+ client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000);
try {
return createEntities(client);
} catch (Exception e) {
@@ -105,7 +106,7 @@ public class MRJobEntityCreationHandler {
entities.clear();
} catch (Exception e) {
- LOG.warn("exception found when flush entities, {}", e);
+ LOG.warn("exception found when flush entities", e);
if (!success && count < MAX_RETRY_COUNT) {
LOG.info("Sleep for a while before retrying");
Thread.sleep(10 * 1000);
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
index f2e581c..a2fb6ca 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
+++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java
@@ -401,7 +401,7 @@ public class MRJobParserTest {
Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) == null);
Assert.assertTrue(entities.isEmpty());
verify(client, times(2)).create(any());
- verify(client, times(2)).getJerseyClient();
+ verify(client, times(1)).getJerseyClient();
verify(client, times(1)).close();
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index ef958cc..4c4d1cd 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -61,7 +61,7 @@
<property>
<name>topology.message.timeout.secs</name>
<displayName>topology message timeout (secs)</displayName>
- <description>default timeout is 30s</description>
+ <description>default timeout is 300s</description>
<value>300</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
index c5cc0df..3377b4a 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogAccumulatorBolt.java
@@ -50,6 +50,7 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
private int taskId;
private String site;
private String appId;
+ private Config config;
private HadoopLogTrafficPersist client;
private SimpleWindowCounter accumulator;
private OutputCollector collector;
@@ -67,15 +68,15 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
} else {
this.windowSize = DEFAULT_WINDOW_SIZE;
}
- this.accumulator = new SimpleWindowCounter(windowSize);
- this.client = new HadoopLogTrafficPersist(config);
-
+ this.config = config;
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.taskId = context.getThisTaskId();
this.collector = collector;
+ this.client = new HadoopLogTrafficPersist(config);
+ this.accumulator = new SimpleWindowCounter(windowSize);
}
@Override
@@ -87,6 +88,9 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
collector.ack(input);
if (!isOrdered(timeInMin)) {
LOG.warn("data is out of order, the estimated throughput may be incorrect");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("time queue {} with event timestamp={}", accumulator.getTimeQueue().toString(), timeInMin);
+ }
return;
}
if (accumulator.isFull()) {
@@ -124,4 +128,9 @@ public class HadoopLogAccumulatorBolt extends BaseRichBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
+
+ @Override
+ public void cleanup() {
+ this.client.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
index 29f61ca..59061c1 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/HadoopLogTrafficPersist.java
@@ -28,20 +28,20 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class HadoopLogTrafficPersist implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(HadoopLogTrafficPersist.class);
private static final String SINK_BATCH_SIZE = "dataSinkConfig.metricSinkBatchSize";
- private final Config config;
private IEagleServiceClient client;
private int batchSize;
- private List<TaggedLogAPIEntity> entityBucket = new CopyOnWriteArrayList<>();
+ private List<TaggedLogAPIEntity> entityBucket = new ArrayList<>();
public HadoopLogTrafficPersist(Config config) {
- this.config = config;
this.batchSize = config.hasPath(SINK_BATCH_SIZE) ? config.getInt(SINK_BATCH_SIZE) : 1;
+ this.client = new EagleServiceClientImpl(config);
}
public void emitMetric(GenericMetricEntity metricEntity) {
@@ -51,11 +51,9 @@ public class HadoopLogTrafficPersist implements Serializable {
}
try {
- client = new EagleServiceClientImpl(config);
GenericServiceAPIResponseEntity response = client.create(entityBucket);
if (response.isSuccess()) {
- LOG.info("persist {} entities with starttime={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
-
+ LOG.info("persist {} entities with the earliest time={}", entityBucket.size(), entityBucket.get(0).getTimestamp());
} else {
LOG.error("Service side error: {}", response.getException());
}
@@ -63,14 +61,12 @@ public class HadoopLogTrafficPersist implements Serializable {
LOG.error(e.getMessage(), e);
} finally {
entityBucket.clear();
- close();
}
}
public void close() {
try {
if (client != null) {
- this.client.getJerseyClient().destroy();
this.client.close();
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
index 5293577..4b5cbc8 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/traffic/SimpleWindowCounter.java
@@ -23,17 +23,19 @@ import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+/**
+ * non-thread safe
+ */
public class SimpleWindowCounter implements Serializable {
private int windowSize;
-
private Map<Long, Long> counter;
private Queue<Long> timeQueue;
public SimpleWindowCounter(int size) {
this.windowSize = size;
counter = new ConcurrentHashMap<>(windowSize);
- timeQueue = new PriorityQueue<>();
+ timeQueue = new PriorityQueue<>(windowSize, (a,b) -> a.compareTo(b));
}
public boolean insert(long timestamp, long countVal) {
@@ -63,7 +65,7 @@ public class SimpleWindowCounter implements Serializable {
return counter.isEmpty();
}
- public synchronized Tuple2<Long, Long> poll() {
+ public Tuple2<Long, Long> poll() {
long oldestTimestamp = timeQueue.poll();
Tuple2<Long, Long> pair = new Tuple2<>(oldestTimestamp, counter.get(oldestTimestamp));
counter.remove(oldestTimestamp);
@@ -74,4 +76,8 @@ public class SimpleWindowCounter implements Serializable {
return timeQueue.peek();
}
+ public Queue<Long> getTimeQueue() {
+ return timeQueue;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index bc8ceb1..b21d62d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -47,6 +47,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks";
public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
public final static String TRAFFIC_MONITOR_ENABLED = "dataSinkConfig.trafficMonitorEnabled";
+ private final static String TRAFFIC_MONITOR_TASK_NUM = "topology.numOfTrafficMonitorTasks";
@Override
public StormTopology execute(Config config, StormEnvironment environment) {
@@ -59,6 +60,7 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM);
int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+ int numOfTrafficMonitorTasks = config.hasPath(TRAFFIC_MONITOR_TASK_NUM) ? config.getInt(TRAFFIC_MONITOR_TASK_NUM) : numOfParserTasks;
builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
@@ -88,8 +90,8 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) {
HadoopLogAccumulatorBolt auditLogAccumulator = new HadoopLogAccumulatorBolt(config);
- BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfParserTasks);
- auditLogAccumulatorDeclarer.setNumTasks(numOfParserTasks).shuffleGrouping("parserBolt");
+ BoltDeclarer auditLogAccumulatorDeclarer = builder.setBolt("logAccumulator", auditLogAccumulator, numOfTrafficMonitorTasks);
+ auditLogAccumulatorDeclarer.setNumTasks(numOfTrafficMonitorTasks).shuffleGrouping("parserBolt");
}
// ------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index f82f8b3..49694a5 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -62,6 +62,12 @@
<description>number of sink tasks</description>
</property>
<property>
+ <name>topology.numOfTrafficMonitorTasks</name>
+ <displayName>Topology Traffic Monitor Tasks</displayName>
+ <value>2</value>
+ <description>number of traffic monitor tasks</description>
+ </property>
+ <property>
<name>topology.message.timeout.secs</name>
<displayName>topology message timeout (secs)</displayName>
<description>default timeout is 60s</description>
@@ -172,8 +178,8 @@
<property>
<name>dataSinkConfig.metricSinkBatchSize</name>
<displayName>Batch Size for Flushing Traffic Metrics</displayName>
- <value>1</value>
- <description>batch size of flushing metrics </description>
+ <value>10</value>
+ <description>batch size of flushing metrics</description>
</property>
<!-- web app related configurations -->
http://git-wip-us.apache.org/repos/asf/eagle/blob/c9c475e2/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
index c5a0e84..bfe43ed 100644
--- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
+++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml
@@ -35,6 +35,12 @@
<value>5</value>
</property>
<property>
+ <name>topology.message.timeout.secs</name>
+ <displayName>topology message timeout (secs)</displayName>
+ <description>default timeout is 60s</description>
+ <value>60</value>
+ </property>
+ <property>
<name>topology.numDataFetcherSpout</name>
<displayName>Spout Task Number</displayName>
<description>spout task number</description>
@@ -42,15 +48,15 @@
</property>
<property>
<name>topology.numEntityPersistBolt</name>
- <displayName>Storage Bolt Task Number</displayName>
- <description>number of persist tasks to the storage</description>
+ <displayName>Data Storage Task Number</displayName>
+ <description>number of persist tasks writing to the storage</description>
<value>1</value>
</property>
<property>
<name>topology.numOfKafkaSinkBolt</name>
<displayName>Kafka Sink Task Number</displayName>
<value>2</value>
- <description>number of sinks to alert engine</description>
+ <description>number of sinks connected to alert engine</description>
</property>
<property>