You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/11/01 06:21:13 UTC
incubator-eagle git commit: [EAGLE-704] Update spark history config
to integrate with the new application framework
Repository: incubator-eagle
Updated Branches:
refs/heads/master 9954b4e11 -> 6b0ed3d0d
[EAGLE-704] Update spark history config to integrate with the new application framework
https://issues.apache.org/jira/browse/EAGLE-704
Author: Zhao, Qingwen <qi...@apache.org>
Closes #591 from qingwen220/EAGLE-704.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/6b0ed3d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/6b0ed3d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/6b0ed3d0
Branch: refs/heads/master
Commit: 6b0ed3d0de8fb7302d412f3a190e4793ea0c7977
Parents: 9954b4e
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Tue Nov 1 14:21:06 2016 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Tue Nov 1 14:21:06 2016 +0800
----------------------------------------------------------------------
.../alert/engine/model/AlertStreamEvent.java | 7 +-
.../jpm/spark/history/SparkHistoryJobApp.java | 11 +-
.../spark/history/SparkHistoryJobAppConfig.java | 62 +++---
.../SparkFilesystemInputStreamReaderImpl.java | 2 +-
.../status/JobHistoryZKStateManager.java | 2 +-
.../history/storm/SparkHistoryJobParseBolt.java | 6 -
...spark.history.SparkHistoryJobAppProvider.xml | 205 ++++++-------------
.../src/main/resources/application.conf | 45 ++--
eagle-server/pom.xml | 5 +
...org.apache.eagle.app.spi.ApplicationProvider | 3 +-
pom.xml | 2 +-
11 files changed, 128 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index 442c885..600643b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -108,12 +108,7 @@ public class AlertStreamEvent extends StreamEvent {
event.put(column.getName(), null);
continue;
}
- if (column.getName().equalsIgnoreCase("timestamp") && obj instanceof Long) {
- String eventTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(((Long) obj).longValue());
- event.put(column.getName(), eventTime);
- } else {
- event.put(column.getName(), obj.toString());
- }
+ event.put(column.getName(), obj.toString());
}
return event;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
index 446eb4e..8a3097d 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
@@ -36,18 +36,17 @@ public class SparkHistoryJobApp extends StormApplication {
// 2. Config topology.
TopologyBuilder topologyBuilder = new TopologyBuilder();
- config = sparkHistoryJobAppConfig.getConfig();
+
topologyBuilder.setSpout(
jobFetchSpoutName,
- new SparkHistoryJobSpout(sparkHistoryJobAppConfig),
- config.getInt("storm.parallelismConfig." + jobFetchSpoutName)
- ).setNumTasks(config.getInt("storm.tasks." + jobFetchSpoutName));
+ new SparkHistoryJobSpout(sparkHistoryJobAppConfig), sparkHistoryJobAppConfig.stormConfig.numOfSpoutExecutors
+ ).setNumTasks(sparkHistoryJobAppConfig.stormConfig.numOfSpoutTasks);
topologyBuilder.setBolt(
jobParseBoltName,
new SparkHistoryJobParseBolt(sparkHistoryJobAppConfig),
- config.getInt("storm.parallelismConfig." + jobParseBoltName)
- ).setNumTasks(config.getInt("storm.tasks." + jobParseBoltName)).shuffleGrouping(jobFetchSpoutName);
+ sparkHistoryJobAppConfig.stormConfig.numOfParserBoltExecutors
+ ).setNumTasks(sparkHistoryJobAppConfig.stormConfig.numOfParserBoltTasks).shuffleGrouping(jobFetchSpoutName);
return topologyBuilder.createTopology();
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 393a97c..5049b40 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -31,9 +31,10 @@ public class SparkHistoryJobAppConfig implements Serializable {
static final String SPARK_HISTORY_JOB_FETCH_SPOUT_NAME = "sparkHistoryJobFetchSpout";
static final String SPARK_HISTORY_JOB_PARSE_BOLT_NAME = "sparkHistoryJobParseBolt";
+ static final String DEFAULT_SPARK_JOB_HISTORY_ZOOKEEPER_ROOT = "/eagle/sparkJobHistory";
+
public ZKStateConfig zkStateConfig;
public JobHistoryEndpointConfig jobHistoryConfig;
- public BasicInfo info;
public EagleInfo eagleInfo;
public StormConfig stormConfig;
@@ -49,7 +50,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
this.zkStateConfig = new ZKStateConfig();
this.jobHistoryConfig = new JobHistoryEndpointConfig();
this.jobHistoryConfig.hdfs = new HashMap<>();
- this.info = new BasicInfo();
this.eagleInfo = new EagleInfo();
this.stormConfig = new StormConfig();
}
@@ -62,36 +62,40 @@ public class SparkHistoryJobAppConfig implements Serializable {
private void init(Config config) {
this.config = config;
- this.zkStateConfig.zkQuorum = config.getString("zkStateConfig.zkQuorum");
- this.zkStateConfig.zkRetryInterval = config.getInt("zkStateConfig.zkRetryInterval");
- this.zkStateConfig.zkRetryTimes = config.getInt("zkStateConfig.zkRetryTimes");
- this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zkStateConfig.zkSessionTimeoutMs");
- this.zkStateConfig.zkRoot = config.getString("zkStateConfig.zkRoot");
+ this.zkStateConfig.zkQuorum = config.getString("zookeeper.zkQuorum");
+ this.zkStateConfig.zkRetryInterval = config.getInt("zookeeper.zkRetryInterval");
+ this.zkStateConfig.zkRetryTimes = config.getInt("zookeeper.zkRetryTimes");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("zookeeper.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRoot = DEFAULT_SPARK_JOB_HISTORY_ZOOKEEPER_ROOT;
+ if (config.hasPath("zookeeper.zkRoot")) {
+ this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
+ }
- jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url");
- jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username");
- jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.password");
jobHistoryConfig.rms = config.getString("dataSourceConfig.rm.url").split(",\\s*");
jobHistoryConfig.baseDir = config.getString("dataSourceConfig.baseDir");
for (Map.Entry<String, ConfigValue> entry : config.getConfig("dataSourceConfig.hdfs").entrySet()) {
this.jobHistoryConfig.hdfs.put(entry.getKey(), entry.getValue().unwrapped().toString());
}
- info.site = config.getString("basic.cluster") + "-" + config.getString("basic.dataCenter");
- info.jobConf = config.getString("basic.jobConf.additional.info").split(",\\s*");
-
- this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
- this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
- this.eagleInfo.username = config.getString("eagleProps.eagle.service.username");
- this.eagleInfo.password = config.getString("eagleProps.eagle.service.password");
- this.eagleInfo.timeout = config.getInt("eagleProps.eagle.service.read.timeout");
+ this.eagleInfo.host = config.getString("service.host");
+ this.eagleInfo.port = config.getInt("service.port");
+ this.eagleInfo.username = config.getString("service.username");
+ this.eagleInfo.password = config.getString("service.password");
+ this.eagleInfo.timeout = 2;
+ if (config.hasPath("service.readTimeOutSeconds")) {
+ this.eagleInfo.timeout = config.getInt("service.readTimeOutSeconds");
+ }
this.eagleInfo.basePath = EagleServiceBaseClient.DEFAULT_BASE_PATH;
- if (config.hasPath("eagleProps.eagle.service.basePath")) {
- this.eagleInfo.basePath = config.getString("eagleProps.eagle.service.basePath");
+ if (config.hasPath("service.basePath")) {
+ this.eagleInfo.basePath = config.getString("service.basePath");
}
- this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
- this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
+ this.stormConfig.siteId = config.getString("siteId");
+ this.stormConfig.spoutCrawlInterval = config.getInt("topology.spoutCrawlInterval");
+ this.stormConfig.numOfSpoutExecutors = config.getInt("topology.numOfSpoutExecutors");
+ this.stormConfig.numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks");
+ this.stormConfig.numOfParserBoltExecutors = config.getInt("topology.numOfParseBoltExecutors");
+ this.stormConfig.numOfParserBoltTasks = config.getInt("topology.numOfParserBoltTasks");
}
public static class ZKStateConfig implements Serializable {
@@ -104,21 +108,17 @@ public class SparkHistoryJobAppConfig implements Serializable {
public static class JobHistoryEndpointConfig implements Serializable {
public String[] rms;
- public String historyServerUrl;
- public String historyServerUserName;
- public String historyServerUserPwd;
public String baseDir;
public Map<String, String> hdfs;
}
- public static class BasicInfo implements Serializable {
- public String site;
- public String[] jobConf;
- }
-
public static class StormConfig implements Serializable {
- public int spoutPending;
+ public String siteId;
public int spoutCrawlInterval;
+ public int numOfSpoutExecutors;
+ public int numOfSpoutTasks;
+ public int numOfParserBoltExecutors;
+ public int numOfParserBoltTasks;
}
public static class EagleInfo implements Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
index 57ced63..56998e6 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -34,7 +34,7 @@ public class SparkFilesystemInputStreamReaderImpl implements JHFInputStreamReade
public SparkFilesystemInputStreamReaderImpl(SparkHistoryJobAppConfig config, SparkApplicationInfo app) {
this.config = config;
- this.site = config.info.site;
+ this.site = config.stormConfig.siteId;
this.app = app;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 2ce8522..28581d5 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -51,7 +51,7 @@ public class JobHistoryZKStateManager {
}
public JobHistoryZKStateManager(SparkHistoryJobAppConfig config) {
- this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site;
+ this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.stormConfig.siteId;
try {
curator = newCurator(config);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index 9f8adc7..16f1144 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -24,10 +24,7 @@ import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
import org.apache.eagle.jpm.spark.history.crawl.SparkFilesystemInputStreamReaderImpl;
import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
import backtype.storm.task.OutputCollector;
@@ -49,7 +46,6 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobParseBolt.class);
private OutputCollector collector;
- private ResourceFetcher historyServerFetcher;
private SparkHistoryJobAppConfig config;
private JobHistoryZKStateManager zkState;
private Configuration hdfsConf;
@@ -67,8 +63,6 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
this.hdfsConf.set(entry.getKey(), entry.getValue());
LOG.info("conf key {}, conf value {}", entry.getKey(), entry.getValue());
}
- this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
- config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
this.zkState = new JobHistoryZKStateManager(config);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index b0d5987..8159edc 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -18,179 +18,102 @@
<application>
<type>SPARK_HISTORY_JOB_APP</type>
- <name>Spark History Job Monitoring</name>
+ <name>Spark History Job Monitor</name>
<version>0.5.0-incubating</version>
<appClass>org.apache.eagle.jpm.spark.history.SparkHistoryJobApp</appClass>
<configuration>
- <!-- org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig -->
+ <!-- topology config -->
<property>
- <name>basic.cluster</name>
- <displayName>cluster</displayName>
- <description>Cluster Name</description>
- <value>sandbox</value>
- </property>
- <property>
- <name>basic.dataCenter</name>
- <displayName>dataCenter</displayName>
- <description>Data Center</description>
- <value>sandbox</value>
- </property>
- <property>
- <name>basic.jobConf.additional.info</name>
- <displayName>jobConf.additional.info</displayName>
- <description>Additional info in Job Configs</description>
- <value></value>
- </property>
- <property>
- <name>dataSourceConfig.zkQuorum</name>
- <displayName>zkQuorum</displayName>
- <description>Zookeeper Quorum</description>
- <value>sandbox.hortonworks.com:2181</value>
- </property>
- <property>
- <name>dataSourceConfig.zkRoot</name>
- <displayName>zkRoot</displayName>
- <description>Zookeeper Root</description>
- <value>/sparkHistoryJob</value>
- </property>
- <property>
- <name>dataSourceConfig.zkPort</name>
- <displayName>zkPort</displayName>
- <description>Zookeeper Port</description>
- <value>2181</value>
- </property>
- <property>
- <name>dataSourceConfig.zkSessionTimeoutMs</name>
- <displayName>zkSessionTimeoutMs</displayName>
- <description>Zookeeper session timeoutMs</description>
- <value>15000</value>
- </property>
- <property>
- <name>zookeeperConfig.zkRetryTimes</name>
- <displayName>zkRetryTimes</displayName>
- <description>zookeeperConfig.zkRetryTimes</description>
- <value>3</value>
- </property>
- <property>
- <name>zookeeperConfig.zkRetryInterval</name>
- <displayName>zkRetryInterval</displayName>
- <description>zookeeperConfig.zkRetryInterval</description>
- <value>20000</value>
- </property>
- <property>
- <name>dataSourceConfig.spark.history.server.url</name>
- <displayName>spark.history.server.url</displayName>
- <description>Spark History Server URL</description>
- <value>http://sandbox.hortonworks.com:18080</value>
- </property>
- <property>
- <name>dataSourceConfig.spark.history.server.username</name>
- <displayName>spark.history.server.username</displayName>
- <description>Spark History Server Auth Username</description>
- <value></value>
- </property>
- <property>
- <name>dataSourceConfig.spark.history.server.password</name>
- <displayName>spark.history.server.password</displayName>
- <description>Spark History Server Auth Password</description>
- <value></value>
- </property>
- <property>
- <name>eagleProps.eagle.service.host</name>
- <description>eagleProps.eagle.service.host</description>
- <value>sandbox.hortonworks.com</value>
+ <name>workers</name>
+ <displayName>topology workers</displayName>
+ <description>topology workers</description>
+ <value>1</value>
</property>
<property>
- <name>eagleProps.eagle.service.port</name>
- <description>eagleProps.eagle.service.port</description>
- <value>9099</value>
+ <name>topology.numOfSpoutExecutors</name>
+ <displayName>spout executors</displayName>
+ <description>Parallelism of sparkHistoryJobFetchSpout </description>
+ <value>1</value>
</property>
<property>
- <name>eagleProps.eagle.service.username</name>
- <description>eagleProps.eagle.service.username</description>
- <value>admin</value>
+ <name>topology.numOfSpoutTasks</name>
+ <displayName>spout tasks</displayName>
+ <description>Tasks Num of sparkHistoryJobFetchSpout </description>
+ <value>4</value>
</property>
<property>
- <name>eagleProps.eagle.service.password</name>
- <description>eagleProps.eagle.service.password</description>
- <value>secret</value>
+ <name>topology.numOfParseBoltExecutors</name>
+ <displayName>parser bolt parallelism hint</displayName>
+ <description>Parallelism of sparkHistoryJobParseBolt </description>
+ <value>1</value>
</property>
<property>
- <name>eagleProps.eagle.service.basePath</name>
- <description>eagleProps.eagle.service.basePath</description>
- <value>/rest</value>
+ <name>topology.numOfParserBoltTasks</name>
+ <displayName>parser bolt tasks</displayName>
+ <description>Tasks Num of sparkHistoryJobParseBolt</description>
+ <value>4</value>
</property>
<property>
- <name>eagleProps.eagle.service.read.timeout</name>
- <displayName>eagleProps.eagle.service.read.timeout</displayName>
- <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
- <value>2</value>
+ <name>topology.spoutCrawlInterval</name>
+ <displayName>spout crawl interval</displayName>
+ <description>Spout crawl interval (in milliseconds)</description>
+ <value>10000</value>
</property>
<property>
- <name>eagleProps.eagleService.maxFlushNum</name>
- <displayName>eagleProps.eagleService.maxFlushNum</displayName>
- <value>500</value>
+ <name>topology.message.timeout.secs</name>
+ <displayName>topology message timeout (secs)</displayName>
+ <description>default timeout is 30s</description>
+ <value>300</value>
</property>
+ <!-- zookeeper config -->
<property>
- <name>dataSourceConfig.hdfs.eventLog</name>
- <displayName>dataSourceConfig.hdfs.eventLog</displayName>
- <value>/spark-history</value>
+ <name>zkStateConfig.zkQuorum</name>
+ <displayName>zookeeper quorum list</displayName>
+ <description>zookeeper to store topology metadata</description>
+ <value>sandbox.hortonworks.com:2181</value>
</property>
<property>
- <name>dataSourceConfig.hdfs.endpoint</name>
- <displayName>dataSourceConfig.hdfs.endpoint</displayName>
- <value>hdfs://sandbox.hortonworks.com:8020</value>
+ <name>zkStateConfig.zkSessionTimeoutMs</name>
+ <displayName>zookeeper session timeout (ms)</displayName>
+ <description>Zookeeper session timeoutMs</description>
+ <value>15000</value>
</property>
<property>
- <name>dataSourceConfig.hdfs.keytab</name>
- <displayName>dataSourceConfig.hdfs.keytab</displayName>
- <value></value>
+ <name>zkStateConfig.zkRetryTimes</name>
+ <displayName>zookeeper connection retry times</displayName>
+ <description>retry times for zookeeper connection</description>
+ <value>3</value>
</property>
<property>
- <name>dataSourceConfig.hdfs.principal</name>
- <displayName>dataSourceConfig.hdfs.principal</displayName>
- <value></value>
+ <name>zkStateConfig.zkRetryInterval</name>
+ <displayName>zookeeper connection retry interval</displayName>
+ <description>retry interval for zookeeper connection</description>
+ <value>20000</value>
</property>
+
+ <!-- datasource config -->
<property>
- <name>dataSourceConfig.rmUrl</name>
- <displayName>dataSourceConfig.rmUrl</displayName>
+ <name>dataSourceConfig.rm.url</name>
+ <displayName>resource manager url</displayName>
+ <description>url to fetch finished spark job list</description>
<value>http://sandbox.hortonworks.com:8088</value>
+ <required>true</required>
</property>
<property>
- <name>storm.pendingSpout</name>
- <displayName>pendingSpout</displayName>
- <value>1000</value>
- </property>
- <property>
- <name>storm.spoutCrawlInterval</name>
- <displayName>spoutCrawlInterval</displayName>
- <description>Spout crawl interval (in milliseconds)</description>
- <value>10000</value>
- </property>
- <property>
- <name>storm.parallelismConfig.sparkHistoryJobFetchSpout</name>
- <displayName>parallelismConfig.sparkHistoryJobFetchSpout</displayName>
- <description>Parallelism of sparkHistoryJobFetchSpout </description>
- <value>1</value>
- </property>
- <property>
- <name>storm.tasks.sparkHistoryJobFetchSpout</name>
- <displayName>tasks.sparkHistoryJobFetchSpout</displayName>
- <description>Tasks Num of sparkHistoryJobFetchSpout </description>
- <value>4</value>
- </property>
- <property>
- <name>storm.parallelismConfig.sparkHistoryJobParseBolt</name>
- <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
- <description>Parallelism of sparkHistoryJobParseBolt </description>
- <value>1</value>
+ <name>dataSourceConfig.hdfs.fs.defaultFS</name>
+ <displayName>hdfs url</displayName>
+ <description>target hdfs to crawl log data</description>
+ <value>hdfs://sandbox.hortonworks.com:8020</value>
+ <required>true</required>
</property>
<property>
- <name>storm.tasks.sparkHistoryJobParseBolt</name>
- <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
- <description>Tasks Num of sparkHistoryJobParseBolt</description>
- <value>4</value>
+ <name>dataSourceConfig.baseDir</name>
+ <displayName>hdfs base path for spark job data</displayName>
+ <description>hdfs base path for spark job data</description>
+ <value>/spark-history</value>
+ <required>true</required>
</property>
+
<property>
<name>spark.defaultVal.spark.executor.memory</name>
<displayName>spark.executor.memory</displayName>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 10da2d0..5f3fdac 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -15,22 +15,19 @@
{
+ "siteId": "sandbox",
"appId": "sparkHistoryJob",
"mode": "CLUSTER",
"workers" : 3,
topology.message.timeout.secs: 300,
- "basic":{
- "cluster":"sandbox",
- "dataCenter":"sandbox",
- jobConf.additional.info: ""
- },
- "eagleProps":{
- eagle.service.host:"sandbox.hortonworks.com",
- eagle.service.port: 9099,
- eagle.service.username: "admin",
- eagle.service.password : "secret",
- eagle.service.basePath : "/rest",
- eagle.service.read.timeout : 2
+
+ "service":{
+ host:"sandbox.hortonworks.com",
+ port: 9099,
+ username: "admin",
+ password : "secret",
+ basePath : "/rest",
+ readTimeOutSeconds : 2
},
"zkStateConfig" : {
"zkQuorum" : "sandbox.hortonworks.com:2181",
@@ -41,12 +38,9 @@
},
"dataSourceConfig":{
- spark.history.server.url : "http://sandbox.hortonworks.com:18080",
- spark.history.server.username : "",
- spark.history.server.password : "",
rm.url: "http://sandbox.hortonworks.com:8088",
"baseDir" : "/spark-history",
- "hdfs": {
+ hdfs: {
fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
#if not need, then do not set
# hdfs.kerberos.principal = ,
@@ -54,20 +48,15 @@
# ....
}
},
- "storm":{
- "pendingSpout": 1000,
- "spoutCrawlInterval": 10000,#in ms
- "parallelismConfig" : {
- "sparkHistoryJobFetchSpout" : 1,
- "sparkHistoryJobParseBolt" : 4
- },
- "tasks" : {
- "sparkHistoryJobFetchSpout" : 1,
- "sparkHistoryJobParseBolt" : 4
- }
+ "topology": {
+ spoutCrawlInterval: 10000, #in ms
+ numOfSpoutExecutors: 1
+ numOfSpoutTasks: 4
+ numOfParseBoltExecutors: 1
+ numOfParserBoltTasks: 4
},
"spark":{
- "defaultVal":{
+ "defaultVal": {
spark.executor.memory:"1g",
spark.driver.memory: "1g",
spark.driver.cores:1,
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index f58a166..96416f6 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -314,6 +314,11 @@
<artifactId>eagle-jpm-aggregation</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-spark-history</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- App: Hadoop Queue Running Monitoring-->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
index 8b78e66..c5bc978 100644
--- a/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
+++ b/eagle-topology-assembly/src/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -26,4 +26,5 @@ org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider
org.apache.eagle.jpm.aggregation.AggregationApplicationProvider
org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider
org.apache.eagle.topology.TopologyCheckAppProvider
-org.apache.eagle.metric.HadoopMetricMonitorAppProdiver
\ No newline at end of file
+org.apache.eagle.metric.HadoopMetricMonitorAppProdiver
+org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/6b0ed3d0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f2e08ef..8f2ef44 100755
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@
<spark.core.version>1.4.0</spark.core.version>
<!-- Client -->
- <kafka-client.version>0.9.0.0</kafka-client.version>
+ <!--<kafka-client.version>0.9.0.0</kafka-client.version>-->
<!-- Reflection -->
<reflections.version>0.9.8</reflections.version>