You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/16 06:55:55 UTC
incubator-eagle git commit: [MINOR] add stream data source config for
mr history job
Repository: incubator-eagle
Updated Branches:
refs/heads/master e24de5c7e -> 229d7b907
[MINOR] add stream data source config for mr history job
Author: wujinhu <wu...@126.com>
Closes #748 from wujinhu/EAGLE-796.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/229d7b90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/229d7b90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/229d7b90
Branch: refs/heads/master
Commit: 229d7b9073a430de7dbbb12d39ddc330646eb458
Parents: e24de5c
Author: wujinhu <wu...@126.com>
Authored: Fri Dec 16 14:55:47 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Fri Dec 16 14:55:47 2016 +0800
----------------------------------------------------------------------
.../apache/eagle/jpm/mr/history/storm/JobHistorySpout.java | 9 ++++++---
...eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml | 9 ++++++++-
2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/229d7b90/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 0cd30ae..d7daa5e 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -140,7 +140,12 @@ public class JobHistorySpout extends BaseRichSpout {
JobHistoryZKStateManager.instance().init(appConfig.getZkStateConfig());
JobHistoryZKStateManager.instance().ensureJobPartition(partitionId, numTotalPartitions);
interceptor.setSpoutOutputCollector(collector);
-
+ if (streamPublishers != null) {
+ for (StreamPublisher streamPublisher : streamPublishers) {
+ streamPublisher.setCollector(this.interceptor);
+ StreamPublisherManager.getInstance().addStreamPublisher(streamPublisher);
+ }
+ }
try {
jhfLCM = new JobHistoryDAOImpl(jobHistoryEndpointConfig);
driver = new JHFCrawlerDriverImpl(
@@ -189,8 +194,6 @@ public class JobHistorySpout extends BaseRichSpout {
if (streamPublishers != null) {
for (StreamPublisher streamPublisher : streamPublishers) {
declarer.declareStream(streamPublisher.stormStreamId(), new Fields("f1", "message"));
- streamPublisher.setCollector(this.interceptor);
- StreamPublisherManager.getInstance().addStreamPublisher(streamPublisher);
}
} else {
declarer.declare(new Fields("f1", "message"));
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/229d7b90/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index ccf3c6b..1ff9b85 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -96,6 +96,13 @@
<required>true</required>
</property>
<property>
+ <name>dataSourceConfig.zkConnection</name>
+ <displayName>Kafka Zookeeper Quorum</displayName>
+ <value>localhost:2181</value>
+ <description>kafka zookeeper connection</description>
+ <required>true</required>
+ </property>
+ <property>
<name>dataSinkConfig.serializerClass</name>
<displayName>Serializer Class For Kafka Message Value</displayName>
<value>kafka.serializer.StringEncoder</value>
@@ -221,7 +228,7 @@
<type>string</type>
</column>
<column>
- <name>queue</name>
+ <name>hostname</name>
<type>string</type>
</column>
<column>