You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/24 06:09:50 UTC
incubator-eagle git commit: [EAGLE-673] add numOfPublishExecutors to
alert engine topology
Repository: incubator-eagle
Updated Branches:
refs/heads/master 56e7048ff -> e520e4011
[EAGLE-673] add numOfPublishExecutors to alert engine topology
Author: wujinhu <wu...@126.com>
Closes #555 from wujinhu/EAGLE-673.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e520e401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e520e401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e520e401
Branch: refs/heads/master
Commit: e520e4011796ebdea52f80ca43b9bffddf3aa50a
Parents: 56e7048
Author: wujinhu <wu...@126.com>
Authored: Mon Oct 24 14:09:43 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Mon Oct 24 14:09:43 2016 +0800
----------------------------------------------------------------------
...agle.alert.app.AlertUnitTopologyAppProvider.xml | 7 +++++++
.../src/main/resources/application.conf | 1 +
.../alert/engine/runner/UnitTopologyRunner.java | 17 ++++++++++++-----
3 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
index 6ef96c7..28f7db4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/META-INF/providers/org.apache.eagle.alert.app.AlertUnitTopologyAppProvider.xml
@@ -51,6 +51,13 @@
<required>false</required>
</property>
<property>
+ <name>topology.numOfPublishExecutors</name>
+ <displayName>Publisher Executor Number</displayName>
+ <value>1</value>
+ <description>Number of publish executors</description>
+ <required>false</required>
+ </property>
+ <property>
<name>topology.numOfPublishTasks</name>
<displayName>Publisher Tasks Number</displayName>
<value>1</value>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
index 1a25cfa..46f5b08 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/resources/application.conf
@@ -22,6 +22,7 @@
"numOfSpoutTasks" : 1,
"numOfRouterBolts" : 4,
"numOfAlertBolts" : 10,
+ "numOfPublishExecutors" : 1,
"numOfPublishTasks" : 1,
"messageTimeoutSecs": 3600,
"localMode" : "true"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e520e401/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
index 88cfb9b..287d5db 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java
@@ -63,6 +63,7 @@ public class UnitTopologyRunner {
public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts";
public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts";
public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks";
+ public static final String PUBLISH_EXECUTOR_NUM = "topology.numOfPublishExecutors";
public static final String LOCAL_MODE = "topology.localMode";
public static final String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs";
public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600;
@@ -88,6 +89,7 @@ public class UnitTopologyRunner {
int numOfSpoutTasks,
int numOfRouterBolts,
int numOfAlertBolts,
+ int numOfPublishExecutors,
int numOfPublishTasks,
Config config,
boolean localMode) {
@@ -104,7 +106,7 @@ public class UnitTopologyRunner {
}
stormConfig.setNumWorkers(numOfTotalWorkers);
- StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+ StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config);
if (localMode) {
LOG.info("Submitting as local mode");
@@ -126,10 +128,11 @@ public class UnitTopologyRunner {
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+ int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
boolean localMode = config.getBoolean(LOCAL_MODE);
int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM);
- run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config, localMode);
+ run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config, localMode);
}
public IMetadataChangeNotifyService getMetadataChangeNotifyService() {
@@ -144,6 +147,7 @@ public class UnitTopologyRunner {
int numOfSpoutTasks,
int numOfRouterBolts,
int numOfAlertBolts,
+ int numOfPublishExecutors,
int numOfPublishTasks,
Config config) {
StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts];
@@ -199,7 +203,7 @@ public class UnitTopologyRunner {
}
// connect alert bolt and alert publish bolt, this is the last bolt in the pipeline
- BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt).setNumTasks(numOfPublishTasks);
+ BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks);
for (int i = 0; i < numOfAlertBolts; i++) {
boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0));
}
@@ -211,9 +215,10 @@ public class UnitTopologyRunner {
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+ int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
- return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+ return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config);
}
// ---------------------------
@@ -224,15 +229,17 @@ public class UnitTopologyRunner {
int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM);
int numOfAlertBolts = config.getInt(ALERT_TASK_NUM);
+ int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM);
int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM);
- return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishTasks, config);
+ return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config);
}
public static Topology buildTopologyMetadata(String topologyId,
int numOfSpoutTasks,
int numOfRouterBolts,
int numOfAlertBolts,
+ int numOfPublishExecutors,
int numOfPublishTasks,
Config config) {
Topology topology = new Topology();