You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:19 UTC
[23/52] [abbrv] incubator-eagle git commit: [EAGLE-495] Convert spark
history job using application framework.
[EAGLE-495] Convert spark history job using application framework.
Author: pkuwm <ih...@gmail.com>
Closes #393 from pkuwm/EAGLE-495.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4f4fd0c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4f4fd0c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4f4fd0c4
Branch: refs/heads/master
Commit: 4f4fd0c4f606a8bc8ab83c66510aca4226becef8
Parents: a10eeb7
Author: pkuwm <ih...@gmail.com>
Authored: Mon Aug 29 10:19:09 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Mon Aug 29 10:19:09 2016 +0800
----------------------------------------------------------------------
.../jpm/spark/crawl/JHFSparkEventReader.java | 13 +-
eagle-jpm/eagle-jpm-spark-history/pom.xml | 5 +
.../jpm/spark/history/SparkHistoryJobApp.java | 54 ++++
.../spark/history/SparkHistoryJobAppConfig.java | 133 +++++++++
.../history/SparkHistoryJobAppProvider.java | 27 ++
.../jpm/spark/history/SparkHistoryJobMain.java | 25 ++
.../history/config/SparkHistoryCrawlConfig.java | 123 ---------
.../status/JobHistoryZKStateManager.java | 6 +-
.../history/storm/FinishedSparkJobSpout.java | 154 -----------
.../history/storm/SparkHistoryJobParseBolt.java | 201 ++++++++++++++
.../history/storm/SparkHistoryJobSpout.java | 154 +++++++++++
.../history/storm/SparkHistoryTopology.java | 81 ------
.../spark/history/storm/SparkJobParseBolt.java | 201 --------------
.../eagle/jpm/spark/history/storm/TestHDFS.java | 47 ----
...spark.history.SparkHistoryJobAppProvider.xml | 271 +++++++++++++++++++
...org.apache.eagle.app.spi.ApplicationProvider | 16 ++
.../src/main/resources/application.conf | 34 +--
.../java/SparkHistoryJobAppProviderTest.java | 32 +++
.../running/SparkRunningJobAppProvider.java | 2 +-
19 files changed, 947 insertions(+), 632 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index edb3854..1cd5a77 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -17,6 +17,7 @@
package org.apache.eagle.jpm.spark.crawl;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.eagle.jpm.spark.entity.*;
import org.apache.eagle.jpm.util.*;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
@@ -109,12 +110,12 @@ public class JHFSparkEventReader {
app.setConfig(new JobConfig());
JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
- List<String> jobConfs = conf.getStringList("basic.jobConf.additional.info");
+ String[] additionalJobConf = conf.getString("basic.jobConf.additional.info").split(",\\s*");
String[] props = {"spark.yarn.app.id", "spark.executor.memory", "spark.driver.host", "spark.driver.port",
"spark.driver.memory", "spark.scheduler.pool", "spark.executor.cores", "spark.yarn.am.memory",
"spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", "spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", "spark.master"};
- jobConfs.addAll(Arrays.asList(props));
- for (String prop : jobConfs) {
+ String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, props);
+ for (String prop : jobConf) {
if (sparkProps.containsKey(prop)) {
app.getConfig().getConfig().put(prop, (String) sparkProps.get(prop));
}
@@ -698,10 +699,10 @@ public class JHFSparkEventReader {
private EagleServiceBaseClient initiateClient() {
String host = conf.getString("eagleProps.eagle.service.host");
int port = conf.getInt("eagleProps.eagle.service.port");
- String userName = conf.getString("eagleProps.eagle.service.userName");
- String pwd = conf.getString("eagleProps.eagle.service.pwd");
+ String userName = conf.getString("eagleProps.eagle.service.username");
+ String pwd = conf.getString("eagleProps.eagle.service.password");
client = new EagleServiceClientImpl(host, port, userName, pwd);
- int timeout = conf.getInt("eagleProps.eagle.service.read_timeout");
+ int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
client.getJerseyClient().setReadTimeout(timeout * 1000);
return client;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/pom.xml b/eagle-jpm/eagle-jpm-spark-history/pom.xml
index e144117..1c9c8b4 100644
--- a/eagle-jpm/eagle-jpm-spark-history/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-history/pom.xml
@@ -100,6 +100,11 @@
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-app-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
new file mode 100644
index 0000000..180b1e8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApp.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.jpm.spark.history.storm.SparkHistoryJobSpout;
+import org.apache.eagle.jpm.spark.history.storm.SparkHistoryJobParseBolt;
+
+public class SparkHistoryJobApp extends StormApplication {
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ // 1. Init conf
+ SparkHistoryJobAppConfig sparkHistoryJobAppConfig = SparkHistoryJobAppConfig.getInstance(config);
+
+ final String jobFetchSpoutName = SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_FETCH_SPOUT_NAME;
+ final String jobParseBoltName = SparkHistoryJobAppConfig.SPARK_HISTORY_JOB_PARSE_BOLT_NAME;
+
+ // 2. Config topology.
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ config = sparkHistoryJobAppConfig.getConfig();
+ topologyBuilder.setSpout(
+ jobFetchSpoutName,
+ new SparkHistoryJobSpout(sparkHistoryJobAppConfig),
+ config.getInt("storm.parallelismConfig." + jobFetchSpoutName)
+ ).setNumTasks(config.getInt("storm.tasks." + jobFetchSpoutName));
+
+ topologyBuilder.setBolt(
+ jobParseBoltName,
+ new SparkHistoryJobParseBolt(sparkHistoryJobAppConfig),
+ config.getInt("storm.parallelismConfig." + jobParseBoltName)
+ ).setNumTasks(config.getInt("storm.tasks." + jobParseBoltName)).shuffleGrouping(jobFetchSpoutName);
+
+ return topologyBuilder.createTopology();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
new file mode 100644
index 0000000..ed499db
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import java.io.Serializable;
+
+public class SparkHistoryJobAppConfig implements Serializable {
+ final static String SPARK_HISTORY_JOB_FETCH_SPOUT_NAME = "sparkHistoryJobFetchSpout";
+ final static String SPARK_HISTORY_JOB_PARSE_BOLT_NAME = "sparkHistoryJobParseBolt";
+
+ public ZKStateConfig zkStateConfig;
+ public JobHistoryEndpointConfig jobHistoryConfig;
+ public HDFSConfig hdfsConfig;
+ public BasicInfo info;
+ public EagleInfo eagleInfo;
+ public StormConfig stormConfig;
+
+ private Config config;
+
+ private static SparkHistoryJobAppConfig manager = new SparkHistoryJobAppConfig();
+
+ public Config getConfig() {
+ return config;
+ }
+
+ public SparkHistoryJobAppConfig() {
+ this.zkStateConfig = new ZKStateConfig();
+ this.jobHistoryConfig = new JobHistoryEndpointConfig();
+ this.hdfsConfig = new HDFSConfig();
+ this.info = new BasicInfo();
+ this.eagleInfo = new EagleInfo();
+ this.stormConfig = new StormConfig();
+ }
+
+ public static SparkHistoryJobAppConfig getInstance(Config config) {
+ manager.init(config);
+ return manager;
+ }
+
+ private void init(Config config) {
+ this.config = config;
+
+ this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+ this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+ this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+ this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+ this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
+
+ jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url");
+ jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username");
+ jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.password");
+ jobHistoryConfig.rms = config.getString("dataSourceConfig.rm.url").split(",\\s*");
+
+ this.hdfsConfig.baseDir = config.getString("dataSourceConfig.hdfs.eventLog");
+ this.hdfsConfig.endpoint = config.getString("dataSourceConfig.hdfs.endPoint");
+ this.hdfsConfig.principal = config.getString("dataSourceConfig.hdfs.principal");
+ this.hdfsConfig.keytab = config.getString("dataSourceConfig.hdfs.keytab");
+
+ info.site = config.getString("basic.cluster") + "-" + config.getString("basic.dataCenter");
+ info.jobConf = config.getString("basic.jobConf.additional.info").split(",\\s*");
+
+ this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
+ this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
+
+ this.stormConfig.mode = config.getString("storm.mode");
+ this.stormConfig.topologyName = config.getString("storm.name");
+ this.stormConfig.workerNo = config.getInt("storm.worker.num");
+ this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
+ this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
+ this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
+ }
+
+ public static class ZKStateConfig implements Serializable {
+ public String zkQuorum;
+ public String zkRoot;
+ public int zkSessionTimeoutMs;
+ public int zkRetryTimes;
+ public int zkRetryInterval;
+ }
+
+ public static class JobHistoryEndpointConfig implements Serializable {
+ public String[] rms;
+ public String historyServerUrl;
+ public String historyServerUserName;
+ public String historyServerUserPwd;
+ }
+
+ public static class HDFSConfig implements Serializable {
+ public String endpoint;
+ public String baseDir;
+ public String principal;
+ public String keytab;
+ }
+
+ public static class BasicInfo implements Serializable {
+ public String site;
+ public String[] jobConf;
+ }
+
+ public static class StormConfig implements Serializable {
+ public String mode;
+ public int workerNo;
+ public int timeoutSec;
+ public String topologyName;
+ public int spoutPending;
+ public int spoutCrawlInterval;
+ }
+
+ public static class EagleInfo implements Serializable {
+ public String host;
+ public int port;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
new file mode 100644
index 0000000..343d9c2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+public class SparkHistoryJobAppProvider extends AbstractApplicationProvider<SparkHistoryJobApp> {
+ @Override
+ public SparkHistoryJobApp getApplication() {
+ return new SparkHistoryJobApp();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java
new file mode 100644
index 0000000..e47e5b2
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobMain.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.spark.history;
+
+public class SparkHistoryJobMain {
+ public static void main(String[] args) {
+ new SparkHistoryJobApp().run(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
deleted file mode 100644
index e6cd2f6..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/config/SparkHistoryCrawlConfig.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.config;
-
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import java.io.Serializable;
-
-public class SparkHistoryCrawlConfig implements Serializable {
- public ZKStateConfig zkStateConfig;
- public JobHistoryEndpointConfig jobHistoryConfig;
- public HDFSConfig hdfsConfig;
- public BasicInfo info;
- public EagleInfo eagleInfo;
- public StormConfig stormConfig;
-
- private Config config;
-
- public Config getConfig() {
- return config;
- }
-
- public SparkHistoryCrawlConfig() {
- this.config = ConfigFactory.load();
-
- this.zkStateConfig = new ZKStateConfig();
- this.zkStateConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
- this.zkStateConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
- this.zkStateConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
- this.zkStateConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
- this.zkStateConfig.zkRoot = config.getString("dataSourceConfig.zkRoot");
-
- this.jobHistoryConfig = new JobHistoryEndpointConfig();
- jobHistoryConfig.historyServerUrl = config.getString("dataSourceConfig.spark.history.server.url");
- jobHistoryConfig.historyServerUserName = config.getString("dataSourceConfig.spark.history.server.username");
- jobHistoryConfig.historyServerUserPwd = config.getString("dataSourceConfig.spark.history.server.pwd");
- jobHistoryConfig.rms = config.getStringList("dataSourceConfig.rm.url").toArray(new String[0]);
-
- this.hdfsConfig = new HDFSConfig();
- this.hdfsConfig.baseDir = config.getString("dataSourceConfig.hdfs.baseDir");
- this.hdfsConfig.endpoint = config.getString("dataSourceConfig.hdfs.endPoint");
- this.hdfsConfig.principal = config.getString("dataSourceConfig.hdfs.principal");
- this.hdfsConfig.keytab = config.getString("dataSourceConfig.hdfs.keytab");
-
- this.info = new BasicInfo();
- info.site = String.format("%s-%s",config.getString("basic.cluster"),config.getString("basic.datacenter"));
- info.jobConf = config.getStringList("basic.jobConf.additional.info").toArray(new String[0]);
-
- this.eagleInfo = new EagleInfo();
- this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
- this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
-
- this.stormConfig = new StormConfig();
- this.stormConfig.mode = config.getString("storm.mode");
- this.stormConfig.topologyName = config.getString("storm.name");
- this.stormConfig.workerNo = config.getInt("storm.workerNo");
- this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
- this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
- this.stormConfig.spoutCrawlInterval = config.getInt("storm.spoutCrawlInterval");
- }
-
- public static class ZKStateConfig implements Serializable {
- public String zkQuorum;
- public String zkRoot;
- public int zkSessionTimeoutMs;
- public int zkRetryTimes;
- public int zkRetryInterval;
- }
-
- public static class JobHistoryEndpointConfig implements Serializable {
- public String[] rms;
- public String historyServerUrl;
- public String historyServerUserName;
- public String historyServerUserPwd;
- }
-
- public static class HDFSConfig implements Serializable {
- public String endpoint;
- public String baseDir;
- public String principal;
- public String keytab;
- }
-
- public static class BasicInfo implements Serializable {
- public String site;
- public String[] jobConf;
- }
-
- public static class StormConfig implements Serializable {
- public String mode;
- public int workerNo;
- public int timeoutSec;
- public String topologyName;
- public int spoutPending;
- public int spoutCrawlInterval;
- }
-
- public static class EagleInfo implements Serializable {
- public String host;
- public int port;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 382375f..7a95e56 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -20,7 +20,7 @@
package org.apache.eagle.jpm.spark.history.status;
import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
@@ -40,7 +40,7 @@ public class JobHistoryZKStateManager {
private CuratorFramework curator;
private static String START_TIMESTAMP = "lastAppTime";
- private CuratorFramework newCurator(SparkHistoryCrawlConfig config) throws Exception {
+ private CuratorFramework newCurator(SparkHistoryJobAppConfig config) throws Exception {
return CuratorFrameworkFactory.newClient(
config.zkStateConfig.zkQuorum,
config.zkStateConfig.zkSessionTimeoutMs,
@@ -49,7 +49,7 @@ public class JobHistoryZKStateManager {
);
}
- public JobHistoryZKStateManager(SparkHistoryCrawlConfig config) {
+ public JobHistoryZKStateManager(SparkHistoryJobAppConfig config) {
this.zkRoot = config.zkStateConfig.zkRoot + "/" + config.info.site;
try {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
deleted file mode 100644
index bf04b55..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
-import org.apache.eagle.jpm.util.Constants;
-import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class FinishedSparkJobSpout extends BaseRichSpout {
- private static final Logger LOG = LoggerFactory.getLogger(FinishedSparkJobSpout.class);
- private SpoutOutputCollector collector;
- private JobHistoryZKStateManager zkState;
- private SparkHistoryCrawlConfig config;
- private ResourceFetcher rmFetch;
- private long lastFinishAppTime = 0;
- private Map<String, Integer> failTimes;
-
- private static final int FAIL_MAX_TIMES = 5;
-
- public FinishedSparkJobSpout(SparkHistoryCrawlConfig config) {
- this.config = config;
- }
-
- @Override
- public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
- rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms);
- this.failTimes = new HashMap<>();
- this.collector = spoutOutputCollector;
- this.zkState = new JobHistoryZKStateManager(config);
- this.lastFinishAppTime = zkState.readLastFinishedTimestamp();
- zkState.resetApplications();
- }
-
-
- @Override
- public void nextTuple() {
- LOG.info("Start to run tuple");
- try {
- Calendar calendar = Calendar.getInstance();
- long fetchTime = calendar.getTimeInMillis();
- calendar.setTimeInMillis(this.lastFinishAppTime);
- LOG.info("Last finished time = {}", calendar.getTime());
- if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) {
- List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime));
- //List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
- if (appInfos != null) {
- LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
- for (AppInfo app : appInfos) {
- String appId = app.getId();
- if (!zkState.hasApplication(appId)) {
- zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName());
- }
- }
- }
- this.lastFinishAppTime = fetchTime;
- zkState.updateLastUpdateTime(fetchTime);
- }
-
- List<String> appIds = zkState.loadApplications(10);
- for (String appId: appIds) {
- collector.emit(new Values(appId), appId);
- LOG.info("emit " + appId);
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
- }
- LOG.info("{} apps sent.", appIds.size());
-
- if (appIds.isEmpty()) {
- this.takeRest(60);
- }
- } catch (Exception e) {
- LOG.error("Fail to run next tuple", e);
- }
- }
-
- private void takeRest(int seconds) {
- try {
- Thread.sleep(seconds * 1000);
- } catch (InterruptedException e) {
- LOG.warn("exception found {}", e);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- outputFieldsDeclarer.declare(new Fields("appId"));
- }
-
- @Override
- public void fail(Object msgId) {
- String appId = (String) msgId;
- int failTimes = 0;
- if (this.failTimes.containsKey(appId)) {
- failTimes = this.failTimes.get(appId);
- }
- failTimes++;
- if (failTimes >= FAIL_MAX_TIMES) {
- this.failTimes.remove(appId);
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
- LOG.error(String.format("Application %s has failed for over %s times, drop it.", appId, FAIL_MAX_TIMES));
- } else {
- this.failTimes.put(appId, failTimes);
- collector.emit(new Values(appId), appId);
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
- }
- }
-
- @Override
- public void ack(Object msgId) {
- String appId = (String) msgId;
- if (this.failTimes.containsKey(appId)) {
- this.failTimes.remove(appId);
- }
-
- }
-
- @Override
- public void close() {
- super.close();
- zkState.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
new file mode 100644
index 0000000..e88c62f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -0,0 +1,201 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class SparkHistoryJobParseBolt extends BaseRichBolt {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobParseBolt.class);
+
+ private OutputCollector collector;
+ private ResourceFetcher historyServerFetcher;
+ private SparkHistoryJobAppConfig config;
+ private JobHistoryZKStateManager zkState;
+ private Configuration hdfsConf;
+
+ public SparkHistoryJobParseBolt(SparkHistoryJobAppConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ this.collector = outputCollector;
+ this.hdfsConf = new Configuration();
+ this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+ this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
+ this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+ this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
+ config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
+ this.zkState = new JobHistoryZKStateManager(config);
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ String appId = tuple.getStringByField("appId");
+ if (!zkState.hasApplication(appId)) {
+ //may already be processed due to some reason
+ collector.ack(tuple);
+ return;
+ }
+
+ try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
+ SparkApplicationInfo info = zkState.getApplicationInfo(appId);
+ //first try to get attempts under the application
+
+ Set<String> inprogressSet = new HashSet<String>();
+ List<String> attemptLogNames = this.getAttemptLogNameList(appId, hdfs, inprogressSet);
+
+ if (attemptLogNames.isEmpty()) {
+ LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.",
+ appId, info.getName(), info.getUser(), info.getQueue());
+ } else {
+ for (String attemptLogName : attemptLogNames) {
+ String extension = "";
+ if (inprogressSet.contains(attemptLogName)) {
+ extension = ".inprogress";
+ }
+ LOG.info("Attempt log name: " + attemptLogName + extension);
+
+ Path attemptFile = getFilePath(attemptLogName, extension);
+ JHFInputStreamReader reader = new SparkFilesystemInputStreamReaderImpl(config.info.site, info);
+ reader.read(hdfs.open(attemptFile));
+ }
+ }
+
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+ LOG.info("Successfully parse application {}", appId);
+ collector.ack(tuple);
+ } catch (Exception e) {
+ LOG.error("Fail to process application {}", appId, e);
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
+ collector.fail(tuple);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+
+ private String getAppAttemptLogName(String appId, String attemptId) {
+ if (attemptId.equals("0")) {
+ return appId;
+ }
+ return appId + "_" + attemptId;
+ }
+
+ private Path getFilePath(String appAttemptLogName, String extension) {
+ String attemptLogDir = this.config.hdfsConfig.baseDir + "/" + appAttemptLogName + extension;
+ return new Path(attemptLogDir);
+ }
+
+ private List<String> getAttemptLogNameList(String appId, FileSystem hdfs, Set<String> inprogressSet)
+ throws IOException {
+ List<String> attempts = new ArrayList<String>();
+ SparkApplication app = null;
+ /*try {
+ List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
+ if (apps != null) {
+ app = (SparkApplication) apps.get(0);
+ attempts = app.getAttempts();
+ }
+ } catch (Exception e) {
+ LOG.warn("Fail to get application detail from history server for appId " + appId, e);
+ }*/
+
+
+ if (null == app) {
+ // history server may not have the info, just double check.
+ // TODO: if attemptId is not "1, 2, 3,...", we should change the logic.
+ // Use getResourceManagerVersion() to compare YARN/RM versions.
+ // attemptId might be: "appId_000001"
+ int attemptId = 0;
+
+ boolean exists = true;
+ while (exists) {
+ // For Yarn version 2.4.x
+ // log name: application_1464382345557_269065_1
+ String attemptIdString = Integer.toString(attemptId);
+
+ // For Yarn version >= 2.7,
+ // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
+// String attemptIdFormatted = String.format("%06d", attemptId);
+//
+// // remove "application_" to get the number part of appID.
+// String sparkAppIdNum = appId.substring(12);
+// String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
+
+ String appAttemptLogName = this.getAppAttemptLogName(appId, attemptIdString);
+ LOG.info("Attempt ID: {}, App Attempt Log: {}", attemptIdString, appAttemptLogName);
+
+ String extension = "";
+ Path attemptFile = getFilePath(appAttemptLogName, extension);
+ extension = ".inprogress";
+ Path inprogressFile = getFilePath(appAttemptLogName, extension);
+ Path logFile = null;
+ // Check if history log exists.
+ if (hdfs.exists(attemptFile)) {
+ logFile = attemptFile;
+ } else if (hdfs.exists(inprogressFile)) {
+ logFile = inprogressFile;
+ inprogressSet.add(appAttemptLogName);
+ } else if (attemptId > 0) {
+ exists = false;
+ }
+
+ if (logFile != null) {
+ attempts.add(appAttemptLogName);
+ }
+ attemptId++;
+ }
+ }
+ return attempts;
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ zkState.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
new file mode 100644
index 0000000..db60744
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SparkHistoryJobSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryJobSpout.class);
+ private SpoutOutputCollector collector;
+ private JobHistoryZKStateManager zkState;
+ private SparkHistoryJobAppConfig config;
+ private ResourceFetcher rmFetch;
+ private long lastFinishAppTime = 0;
+ private Map<String, Integer> failTimes;
+
+ private static final int FAIL_MAX_TIMES = 5;
+
+ public SparkHistoryJobSpout(SparkHistoryJobAppConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms);
+ this.failTimes = new HashMap<>();
+ this.collector = spoutOutputCollector;
+ this.zkState = new JobHistoryZKStateManager(config);
+ this.lastFinishAppTime = zkState.readLastFinishedTimestamp();
+ zkState.resetApplications();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void nextTuple() {
+ LOG.info("Start to run tuple");
+ try {
+ Calendar calendar = Calendar.getInstance();
+ long fetchTime = calendar.getTimeInMillis();
+ calendar.setTimeInMillis(this.lastFinishAppTime);
+ LOG.info("Last finished time = {}", calendar.getTime());
+ if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) {
+ List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime));
+ //List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
+ if (appInfos != null) {
+ LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
+ for (AppInfo app : appInfos) {
+ String appId = app.getId();
+ if (!zkState.hasApplication(appId)) {
+ zkState.addFinishedApplication(appId, app.getQueue(), app.getState(), app.getFinalStatus(), app.getUser(), app.getName());
+ }
+ }
+ }
+ this.lastFinishAppTime = fetchTime;
+ zkState.updateLastUpdateTime(fetchTime);
+ }
+
+ List<String> appIds = zkState.loadApplications(10);
+ for (String appId: appIds) {
+ collector.emit(new Values(appId), appId);
+ LOG.info("emit " + appId);
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
+ }
+ LOG.info("{} apps sent.", appIds.size());
+
+ if (appIds.isEmpty()) {
+ this.takeRest(60);
+ }
+ } catch (Exception e) {
+ LOG.error("Fail to run next tuple", e);
+ }
+ }
+
+ private void takeRest(int seconds) {
+ try {
+ Thread.sleep(seconds * 1000);
+ } catch (InterruptedException e) {
+ LOG.warn("exception found {}", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("appId"));
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ String appId = (String) msgId;
+ int failTimes = 0;
+ if (this.failTimes.containsKey(appId)) {
+ failTimes = this.failTimes.get(appId);
+ }
+ failTimes++;
+ if (failTimes >= FAIL_MAX_TIMES) {
+ this.failTimes.remove(appId);
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+ LOG.error(String.format("Application %s has failed for over %s times, drop it.", appId, FAIL_MAX_TIMES));
+ } else {
+ this.failTimes.put(appId, failTimes);
+ collector.emit(new Values(appId), appId);
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ String appId = (String) msgId;
+ if (this.failTimes.containsKey(appId)) {
+ this.failTimes.remove(appId);
+ }
+
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ zkState.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
deleted file mode 100644
index 423dbef..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryTopology.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.topology.TopologyBuilder;
-
-public class SparkHistoryTopology {
-
- private SparkHistoryCrawlConfig sparkHistoryCrawlConfig;
-
- public SparkHistoryTopology(SparkHistoryCrawlConfig config) {
- this.sparkHistoryCrawlConfig = config;
- }
-
- public TopologyBuilder getBuilder() {
- TopologyBuilder builder = new TopologyBuilder();
- String spoutName = "sparkHistoryJobSpout";
- String boltName = "sparkHistoryJobBolt";
- com.typesafe.config.Config config = this.sparkHistoryCrawlConfig.getConfig();
- builder.setSpout(spoutName,
- new FinishedSparkJobSpout(sparkHistoryCrawlConfig),
- config.getInt("storm.parallelismConfig." + spoutName)
- ).setNumTasks(config.getInt("storm.tasks." + spoutName));
-
- builder.setBolt(boltName,
- new SparkJobParseBolt(sparkHistoryCrawlConfig),
- config.getInt("storm.parallelismConfig." + boltName)
- ).setNumTasks(config.getInt("storm.tasks." + boltName)).shuffleGrouping(spoutName);
- return builder;
- }
-
-
- public static void main(String[] args) {
- try {
- SparkHistoryCrawlConfig crawlConfig = new SparkHistoryCrawlConfig();
-
- Config conf = new Config();
- conf.setNumWorkers(crawlConfig.stormConfig.workerNo);
- conf.setMessageTimeoutSecs(crawlConfig.stormConfig.timeoutSec);
- //conf.setMaxSpoutPending(crawlConfig.stormConfig.spoutPending);
- //conf.put(Config.TOPOLOGY_DEBUG, true);
-
-
- if (crawlConfig.stormConfig.mode.equals("local")) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(
- crawlConfig.stormConfig.topologyName,
- conf,
- new SparkHistoryTopology(crawlConfig).getBuilder().createTopology());
- } else {
- StormSubmitter.submitTopology(
- crawlConfig.stormConfig.topologyName,
- conf,
- new SparkHistoryTopology(crawlConfig).getBuilder().createTopology());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
deleted file mode 100644
index c515d32..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
-import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
-import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
-import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.SparkHistoryServerResourceFetcher;
-import org.apache.eagle.jpm.util.resourcefetch.model.SparkApplication;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-public class SparkJobParseBolt extends BaseRichBolt {
-
- private static final Logger LOG = LoggerFactory.getLogger(SparkJobParseBolt.class);
-
- private OutputCollector collector;
- private ResourceFetcher historyServerFetcher;
- private SparkHistoryCrawlConfig config;
- private JobHistoryZKStateManager zkState;
- private Configuration hdfsConf;
-
- public SparkJobParseBolt(SparkHistoryCrawlConfig config) {
- this.config = config;
- }
-
- @Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.collector = outputCollector;
- this.hdfsConf = new Configuration();
- this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
- this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
- this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
- this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
- this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
- config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
- this.zkState = new JobHistoryZKStateManager(config);
- }
-
- @Override
- public void execute(Tuple tuple) {
- String appId = tuple.getStringByField("appId");
- if (!zkState.hasApplication(appId)) {
- //may already be processed due to some reason
- collector.ack(tuple);
- return;
- }
-
- try (FileSystem hdfs = HDFSUtil.getFileSystem(this.hdfsConf)) {
- SparkApplicationInfo info = zkState.getApplicationInfo(appId);
- //first try to get attempts under the application
-
- Set<String> inprogressSet = new HashSet<String>();
- List<String> attemptLogNames = this.getAttemptLogNameList(appId, hdfs, inprogressSet);
-
- if (attemptLogNames.isEmpty()) {
- LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.",
- appId, info.getName(), info.getUser(), info.getQueue());
- } else {
- for (String attemptLogName : attemptLogNames) {
- String extension = "";
- if (inprogressSet.contains(attemptLogName)) {
- extension = ".inprogress";
- }
- LOG.info("Attempt log name: " + attemptLogName + extension);
-
- Path attemptFile = getFilePath(attemptLogName, extension);
- JHFInputStreamReader reader = new SparkFilesystemInputStreamReaderImpl(config.info.site, info);
- reader.read(hdfs.open(attemptFile));
- }
- }
-
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
- LOG.info("Successfully parse application {}", appId);
- collector.ack(tuple);
- } catch (Exception e) {
- LOG.error("Fail to process application {}", appId, e);
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
- collector.fail(tuple);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
- }
-
- private String getAppAttemptLogName(String appId, String attemptId) {
- if (attemptId.equals("0")) {
- return appId;
- }
- return appId + "_" + attemptId;
- }
-
- private Path getFilePath(String appAttemptLogName, String extension) {
- String attemptLogDir = this.config.hdfsConfig.baseDir + "/" + appAttemptLogName + extension;
- return new Path(attemptLogDir);
- }
-
- private List<String> getAttemptLogNameList(String appId, FileSystem hdfs, Set<String> inprogressSet)
- throws IOException {
- List<String> attempts = new ArrayList<String>();
- SparkApplication app = null;
- /*try {
- List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
- if (apps != null) {
- app = (SparkApplication) apps.get(0);
- attempts = app.getAttempts();
- }
- } catch (Exception e) {
- LOG.warn("Fail to get application detail from history server for appId " + appId, e);
- }*/
-
-
- if (null == app) {
- // history server may not have the info, just double check.
- // TODO: if attemptId is not "1, 2, 3,...", we should change the logic.
- // Use getResourceManagerVersion() to compare YARN/RM versions.
- // attemptId might be: "appId_000001"
- int attemptId = 0;
-
- boolean exists = true;
- while (exists) {
- // For Yarn version 2.4.x
- // log name: application_1464382345557_269065_1
- String attemptIdString = Integer.toString(attemptId);
-
- // For Yarn version >= 2.7,
- // log name: "application_1468625664674_0003_appattempt_1468625664674_0003_000001"
-// String attemptIdFormatted = String.format("%06d", attemptId);
-//
-// // remove "application_" to get the number part of appID.
-// String sparkAppIdNum = appId.substring(12);
-// String attemptIdString = "appattempt_" + sparkAppIdNum + "_" + attemptIdFormatted;
-
- String appAttemptLogName = this.getAppAttemptLogName(appId, attemptIdString);
- LOG.info("Attempt ID: {}, App Attempt Log: {}", attemptIdString, appAttemptLogName);
-
- String extension = "";
- Path attemptFile = getFilePath(appAttemptLogName, extension);
- extension = ".inprogress";
- Path inprogressFile = getFilePath(appAttemptLogName, extension);
- Path logFile = null;
- // Check if history log exists.
- if (hdfs.exists(attemptFile)) {
- logFile = attemptFile;
- } else if (hdfs.exists(inprogressFile)) {
- logFile = inprogressFile;
- inprogressSet.add(appAttemptLogName);
- } else if (attemptId > 0) {
- exists = false;
- }
-
- if (logFile != null) {
- attempts.add(appAttemptLogName);
- }
- attemptId++;
- }
- }
- return attempts;
- }
-
- @Override
- public void cleanup() {
- super.cleanup();
- zkState.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
deleted file mode 100644
index f4284e1..0000000
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.storm;
-
-import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.util.HDFSUtil;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestHDFS {
- private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class);
-
- public static void main(String[] args) throws Exception {
- SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig();
-
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", config.hdfsConfig.endpoint);
- conf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
- conf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
-
- FileSystem hdfs = HDFSUtil.getFileSystem(conf);
- Path path = new Path("/logs/spark-events/local-1463002514438");
- boolean exists = hdfs.exists(path);
- LOG.info("File exists:{}", exists);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
new file mode 100644
index 0000000..26842b8
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -0,0 +1,271 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>SPARK_HISTORY_JOB_APP</type>
+ <name>Spark History Job Monitoring</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.jpm.spark.history.SparkHistoryJobApp</appClass>
+ <viewPath>/apps/jpm</viewPath>
+ <configuration>
+ <!-- org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig -->
+ <property>
+ <name>basic.cluster</name>
+ <displayName>cluster</displayName>
+ <description>Cluster Name</description>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>basic.dataCenter</name>
+ <displayName>dataCenter</displayName>
+ <description>Data Center</description>
+ <value>sandbox</value>
+ </property>
+ <property>
+ <name>basic.jobConf.additional.info</name>
+ <displayName>jobConf.additional.info</displayName>
+ <description>Additional info in Job Configs</description>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkQuorum</name>
+ <displayName>zkQuorum</displayName>
+ <description>Zookeeper Quorum</description>
+ <value>sandbox.hortonworks.com:2181</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkRoot</name>
+ <displayName>zkRoot</displayName>
+ <description>Zookeeper Root</description>
+ <value>/sparkHistoryJob</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkPort</name>
+ <displayName>zkPort</displayName>
+ <description>Zookeeper Port</description>
+ <value>2181</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkSessionTimeoutMs</name>
+ <displayName>zkSessionTimeoutMs</displayName>
+ <description>Zookeeper session timeoutMs</description>
+ <value>15000</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRetryTimes</name>
+ <displayName>zkRetryTimes</displayName>
+ <description>zookeeperConfig.zkRetryTimes</description>
+ <value>3</value>
+ </property>
+ <property>
+ <name>zookeeperConfig.zkRetryInterval</name>
+ <displayName>zkRetryInterval</displayName>
+ <description>zookeeperConfig.zkRetryInterval</description>
+ <value>20000</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.spark.history.server.url</name>
+ <displayName>spark.history.server.url</displayName>
+ <description>Spark History Server URL</description>
+ <value>http://sandbox.hortonworks.com:18080</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.spark.history.server.username</name>
+ <displayName>spark.history.server.username</displayName>
+ <description>Spark History Server Auth Username</description>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.spark.history.server.password</name>
+ <displayName>spark.history.server.password</displayName>
+ <description>Spark History Server Auth Password</description>
+ <value></value>
+ </property>
+ <property>
+ <name>eagleProps.eagle.service.host</name>
+ <description>eagleProps.eagle.service.host</description>
+ <value>sandbox.hortonworks.com</value>
+ </property>
+ <property>
+ <name>eagleProps.eagle.service.port</name>
+ <description>eagleProps.eagle.service.port</description>
+ <value>9099</value>
+ </property>
+ <property>
+ <name>eagleProps.eagle.service.username</name>
+ <description>eagleProps.eagle.service.username</description>
+ <value>admin</value>
+ </property>
+ <property>
+ <name>eagleProps.eagle.service.password</name>
+ <description>eagleProps.eagle.service.password</description>
+ <value>secret</value>
+ </property>
+ <property>
+ <name>eagleProps.eagle.service.read.timeout</name>
+ <displayName>eagleProps.eagle.service.read.timeout</displayName>
+ <description>The maximum amount of time (in seconds) the app is trying to read from eagle service</description>
+ <value>2</value>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.maxFlushNum</name>
+ <displayName>eagleProps.eagleService.maxFlushNum</displayName>
+ <value>500</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hdfs.eventLog</name>
+ <displayName>dataSourceConfig.hdfs.eventLog</displayName>
+ <value>/spark-history</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hdfs.endpoint</name>
+ <displayName>dataSourceConfig.hdfs.endpoint</displayName>
+ <value>hdfs://sandbox.hortonworks.com:8020</value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hdfs.keytab</name>
+ <displayName>dataSourceConfig.hdfs.keytab</displayName>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.hdfs.principal</name>
+ <displayName>dataSourceConfig.hdfs.principal</displayName>
+ <value></value>
+ </property>
+ <property>
+ <name>dataSourceConfig.rmUrl</name>
+ <displayName>dataSourceConfig.rmUrl</displayName>
+ <value>http://sandbox.hortonworks.com:8088</value>
+ </property>
+ <property>
+ <name>storm.mode</name>
+ <displayName>mode</displayName>
+ <description>Storm Mode: local or cluster</description>
+ <value>local</value>
+ </property>
+ <property>
+ <name>storm.worker.num</name>
+ <displayName>worker.num</displayName>
+ <description>The number of workers</description>
+ <value>2</value>
+ </property>
+ <property>
+ <name>name</name>
+ <displayName>name</displayName>
+ <description>Name of the topology</description>
+ <value>sparkHistoryJob</value>
+ </property>
+ <property>
+ <name>storm.messageTimeoutSec</name>
+ <displayName>messageTimeoutSec</displayName>
+ <description>Message timeout (in seconds)</description>
+ <value>3000</value>
+ </property>
+ <property>
+ <name>storm.pendingSpout</name>
+ <displayName>pendingSpout</displayName>
+ <value>1000</value>
+ </property>
+ <property>
+ <name>storm.spoutCrawlInterval</name>
+ <displayName>spoutCrawlInterval</displayName>
+ <description>Spout crawl interval (in milliseconds)</description>
+ <value>10000</value>
+ </property>
+ <property>
+ <name>storm.parallelismConfig.sparkHistoryJobFetchSpout</name>
+ <displayName>parallelismConfig.sparkHistoryJobFetchSpout</displayName>
+ <description>Parallelism of sparkHistoryJobFetchSpout </description>
+ <value>1</value>
+ </property>
+ <property>
+ <name>storm.tasks.sparkHistoryJobFetchSpout</name>
+ <displayName>tasks.sparkHistoryJobFetchSpout</displayName>
+ <description>Tasks Num of sparkHistoryJobFetchSpout </description>
+ <value>4</value>
+ </property>
+ <property>
+ <name>storm.parallelismConfig.sparkHistoryJobParseBolt</name>
+ <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
+ <description>Parallelism of sparkHistoryJobParseBolt </description>
+ <value>1</value>
+ </property>
+ <property>
+ <name>storm.tasks.sparkHistoryJobParseBolt</name>
+ <displayName>parallelismConfig.sparkHistoryJobParseBolt</displayName>
+ <description>Tasks Num of sparkHistoryJobParseBolt</description>
+ <value>4</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.executor.memory</name>
+ <displayName>spark.executor.memory</displayName>
+ <value>1g</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.driver.memory</name>
+ <displayName>spark.driver.memory</displayName>
+ <value>1g</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.driver.cores</name>
+ <displayName>spark.driver.cores</displayName>
+ <value>1</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.executor.cores</name>
+ <displayName>spark.executor.cores</displayName>
+ <value>1</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.yarn.am.memory</name>
+ <displayName>spark.yarn.am.memory</displayName>
+ <value>512m</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.yarn.am.cores</name>
+ <displayName>spark.yarn.am.cores</displayName>
+ <value>1</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.yarn.executor.memoryOverhead.factor</name>
+ <displayName>spark.yarn.executor.memoryOverhead.factor</displayName>
+ <value>10</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.yarn.driver.memoryOverhead.factor</name>
+ <displayName>spark.yarn.driver.memoryOverhead.factor</displayName>
+ <value>10</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.yarn.am.memoryOverhead.factor</name>
+ <displayName>spark.yarn.am.memoryOverhead.factor</displayName>
+ <value>10</value>
+ </property>
+ <property>
+ <name>spark.defaultVal.spark.yarn.overhead.min</name>
+ <displayName>spark.yarn.overhead.min</displayName>
+ <value>384m</value>
+ </property>
+ </configuration>
+ <docs>
+ <install>
+ </install>
+ <uninstall>
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..7adb50c
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 65aaa36..289c6f7 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -18,14 +18,14 @@
"basic":{
"cluster":"sandbox",
"datacenter":"sandbox",
- jobConf.additional.info: []
+ jobConf.additional.info: ""
},
"eagleProps":{
eagle.service.host:"sandbox.hortonworks.com",
eagle.service.port: 9099,
- eagle.service.userName: "admin",
- eagle.service.pwd : "secret",
- eagle.service.read_timeout : 2
+ eagle.service.username: "admin",
+ eagle.service.password : "secret",
+ eagle.service.read.timeout : 2
},
"dataSourceConfig":{
"zkQuorum" : "sandbox.hortonworks.com:2181",
@@ -35,29 +35,29 @@
"zkRetryInterval" : 20000,
spark.history.server.url : "http://sandbox.hortonworks.com:18080",
spark.history.server.username : "",
- spark.history.server.pwd : "",
- rm.url:["http://sandbox.hortonworks.com:8088"] ,
+ spark.history.server.password : "",
+ rm.url: "http://sandbox.hortonworks.com:8088",
"hdfs": {
- "baseDir": "/spark-history",
+ "eventLog": "/spark-history",
"endPoint": "hdfs://sandbox.hortonworks.com:8020",
"principal": "",
"keytab" : ""
- }
+ }
},
"storm":{
+ worker.num: 2,
"mode": "local",
- "workerNo": 2,
- "name":"sparkHistory",
- "messageTimeoutSec": 3000,
+ "name":"sparkHistoryJob",
+ "messageTimeoutSec": 3000,
"pendingSpout": 1000,
"spoutCrawlInterval": 10000,#in ms
"parallelismConfig" : {
- "sparkHistoryJobSpout" : 1,
- "sparkHistoryJobBolt" : 6
+ "sparkHistoryJobFetchSpout" : 1,
+ "sparkHistoryJobParseBolt" : 4
},
"tasks" : {
- "sparkHistoryJobSpout" : 1,
- "sparkHistoryJobBolt" : 6
+ "sparkHistoryJobFetchSpout" : 1,
+ "sparkHistoryJobParseBolt" : 4
}
},
"spark":{
@@ -73,5 +73,7 @@
spark.yarn.am.memoryOverhead.factor: 10,
spark.yarn.overhead.min: "384m"
}
- }
+ },
+ "appId": "sparkHistoryJob",
+ "mode": "LOCAL"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java b/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java
new file mode 100644
index 0000000..cf6e932
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/test/java/SparkHistoryJobAppProviderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.test.ApplicationSimulator;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider;
+import org.junit.Test;
+
+public class SparkHistoryJobAppProviderTest extends ApplicationTestBase {
+ @Inject
+ ApplicationSimulator simulator;
+
+ @Test
+ public void testRunWithProvider(){
+ simulator.start(SparkHistoryJobAppProvider.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4f4fd0c4/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
index 3d20af7..dc79b15 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppProvider.java
@@ -23,4 +23,4 @@ public class SparkRunningJobAppProvider extends AbstractApplicationProvider<Spar
public SparkRunningJobApp getApplication() {
return new SparkRunningJobApp();
}
-}
\ No newline at end of file
+}