You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/07/05 18:07:41 UTC
[2/8] incubator-eagle git commit: EAGLE-276 eagle support for mr &
spark history job monitoring mr & spark job history monitoring
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
new file mode 100644
index 0000000..4e6bf03
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
@@ -0,0 +1,178 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.crawl.SparkHistoryFileInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplicationAttempt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SparkJobParseBolt extends BaseRichBolt {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkJobParseBolt.class);
+
+ private OutputCollector collector;
+ private ResourceFetcher historyServerFetcher;
+ private SparkHistoryCrawlConfig config;
+ private JobHistoryZKStateManager zkState;
+ private Configuration hdfsConf;
+
+ public SparkJobParseBolt(SparkHistoryCrawlConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ this.collector = outputCollector;
+ this.hdfsConf = new Configuration();
+ this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+ this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ this.hdfsConf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
+ this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+ this.historyServerFetcher = new SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
+ config.jobHistoryConfig.historyServerUserName, config.jobHistoryConfig.historyServerUserPwd);
+ this.zkState = new JobHistoryZKStateManager(config);
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ String appId = tuple.getStringByField("appId");
+ FileSystem hdfs = null;
+ try {
+ if (!zkState.hasApplication(appId)) {
+ //may already be processed due to some reason
+ collector.ack(tuple);
+ return;
+ }
+
+ SparkApplicationInfo info = zkState.getApplicationInfo(appId);
+ //first try to get attempts under the application
+ List<SparkApplicationAttempt> attempts = this.getAttemptList(appId);
+
+ if (attempts.isEmpty()) {
+ LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not found on history server.", appId, info.getName(), info.getUser(), info.getQueue());
+ } else {
+ hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+ for (SparkApplicationAttempt attempt : attempts) {
+ Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attempt.getAttemptId()));
+ JHFInputStreamReader reader = new SparkHistoryFileInputStreamReaderImpl(config.info.site , info);
+ reader.read(hdfs.open(attemptFile));
+ }
+ }
+
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
+ LOG.info("Successfully parse application {}", appId);
+ collector.ack(tuple);
+ } catch (Exception e) {
+ LOG.error("Fail to process application {}", appId, e);
+ zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FAILED);
+ collector.fail(tuple);
+ } finally {
+ if (null != hdfs) {
+ try {
+ hdfs.close();
+ } catch (Exception e) {
+ LOG.error("Fail to close hdfs");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+ }
+
+ private String getAppAttemptLogName(String appId, String attemptId) {
+ return String.format("%s_%s", appId, attemptId);
+ }
+
+ private List<SparkApplicationAttempt> getAttemptList(String appId) throws IOException {
+ FileSystem hdfs = null;
+ List<SparkApplicationAttempt> attempts = new ArrayList<>();
+ try {
+
+ SparkApplication app = null;
+ /*try {
+ List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
+ if (apps != null) {
+ app = (SparkApplication) apps.get(0);
+ attempts = app.getAttempts();
+ }
+ } catch (Exception e) {
+ LOG.warn("Fail to get application detail from history server for appId " + appId, e);
+ }*/
+
+
+ if (null == app) {
+ //history server may not have the info, just double check
+ hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+ Integer attemptId = 1;
+
+ boolean exists = true;
+ while (exists) {
+ Path attemptFile = new Path(this.config.hdfsConfig.baseDir + "/" + this.getAppAttemptLogName(appId, attemptId.toString()));
+ if (hdfs.exists(attemptFile)) {
+ SparkApplicationAttempt attempt = new SparkApplicationAttempt();
+ attempt.setAttemptId(attemptId.toString());
+ attempts.add(attempt);
+ attemptId++;
+ } else {
+ exists = false;
+ }
+ }
+ }
+ return attempts;
+ } finally {
+ if (null != hdfs) {
+ hdfs.close();
+ }
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ zkState.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
new file mode 100644
index 0000000..eb30f5e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHDFS {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class);
+ public static void main(String[] args) throws Exception{
+ SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig();
+
+ Configuration conf = new Configuration();
+ conf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+ conf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
+ conf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+
+ FileSystem hdfs = HDFSUtil.getFileSystem(conf);
+ Path path = new Path("/logs/spark-events/local-1463002514438");
+ boolean exists = hdfs.exists(path);
+ LOG.info("File exists:{}", exists);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
new file mode 100644
index 0000000..36f0836
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+{
+ "basic":{
+ "cluster":"sandbox",
+ "datacenter":"sandbox",
+ jobConf.additional.info: []
+ },
+ "eagleProps":{
+ eagle.service.host:"sandbox.hortonworks.com",
+ eagle.service.port: 9099,
+ eagle.service.userName: "admin",
+ eagle.service.pwd : "secret",
+ eagle.service.read_timeout : 2
+ },
+ "dataSourceConfig":{
+ "zkQuorum" : "sandbox.hortonworks.com:2181",
+ "zkRoot" : "/sparkJobHistory",
+ "zkSessionTimeoutMs" : 15000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 20000,
+ spark.history.server.url : "http://sandbox.hortonworks.com:18080/",
+ spark.history.server.username : "",
+ spark.history.server.pwd : "",
+ rm.url:["http://sandbox.hortonworks.com:8088"] ,
+ "hdfs": {
+ "baseDir": "/logs/spark-events",
+ "endPoint": "hdfs://sandbox.hortonworks.com:8020",
+ "principal": "",
+ "keytab" : ""
+ }
+ },
+ "storm":{
+ "mode": "local",
+ "workerNo": 2,
+ "name":"sparkHistory",
+ "messageTimeoutSec": 3000,
+ "pendingSpout": 1000,
+ "spoutCrawlInterval": 10000,#in ms
+ "parallelismConfig" : {
+ "sparkHistoryJobSpout" : 1,
+ "sparkHistoryJobBolt" : 6
+ },
+ "tasks" : {
+ "sparkHistoryJobSpout" : 1,
+ "sparkHistoryJobBolt" : 6
+ }
+ },
+ "spark":{
+ "defaultVal":{
+ spark.executor.memory:"1g",
+ spark.driver.memory: "1g",
+ spark.driver.cores:1,
+ spark.executor.cores:1,
+ spark.yarn.am.memory:"512m",
+ spark.yarn.am.cores:1,
+ spark.yarn.executor.memoryOverhead.factor: 10,
+ spark.yarn.driver.memoryOverhead.factor: 10,
+ spark.yarn.am.memoryOverhead.factor: 10,
+ spark.yarn.overhead.min: "384m"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml b/eagle-jpm/eagle-jpm-spark-running/pom.xml
new file mode 100644
index 0000000..7bf90d4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>eagle-jpm-spark-running</artifactId>
+ <name>eagle-jpm-spark-running</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-job-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jsoup</groupId>
+ <artifactId>jsoup</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml
new file mode 100644
index 0000000..ed7f658
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-jpm-parent</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>eagle-jpm-util</artifactId>
+ <name>eagle-jpm-util</name>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.9</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
new file mode 100644
index 0000000..0792f15
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public class Constants {
+
+ public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = "SparkAppService";
+ public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = "SparkJobService";
+ public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = "SparkStageService";
+ public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = "SparkTaskService";
+ public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = "SparkExecutorService";
+
+ public static final String APPLICATION_PREFIX = "application";
+ public static final String JOB_PREFIX = "job";
+ public static final String V2_APPS_URL = "ws/v1/cluster/apps";
+ public static final String ANONYMOUS_PARAMETER = "anonymous=true";
+
+ public static final String V2_APPS_RUNNING_URL = "ws/v1/cluster/apps?state=RUNNING";
+ public static final String V2_APPS_COMPLETED_URL = "ws/v1/cluster/apps?state=FINISHED";
+
+ public static final String SPARK_APPS_URL ="api/v1/applications";
+
+ public enum CompressionType {
+ GZIP, NONE
+ }
+ public enum JobState {
+ RUNNING, COMPLETED, ALL
+ }
+
+ public enum ResourceType {
+ COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
new file mode 100644
index 0000000..8adb001
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+
+public class HDFSUtil {
+
+ public static FileSystem getFileSystem(Configuration conf) throws IOException {
+ HDFSUtil.login(conf);
+ return FileSystem.get(conf);
+ }
+
+ public static void login(Configuration kConfig) throws IOException {
+ if(kConfig.get("hdfs.kerberos.principal") == null || kConfig.get("hdfs.kerberos.principal").isEmpty()){
+ return;
+ }
+ kConfig.setBoolean("hadoop.security.authorization", true);
+ kConfig.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(kConfig);
+ UserGroupInformation.loginUserFromKeytab(kConfig.get("hdfs.kerberos.principal"), kConfig.get("hdfs.keytab.file"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
new file mode 100644
index 0000000..8080147
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class JSONUtil {
+
+ public static String getString(JSONObject obj, String field) {
+ if (obj.containsKey(field)) {
+ return (String) obj.get(field);
+ }
+ return null;
+ }
+
+ public static Integer getInt(JSONObject obj, String field){
+ if(obj.containsKey(field)){
+ return ((Long)obj.get(field)).intValue();
+ }
+ return null;
+ }
+
+ public static Long getLong(JSONObject obj, String field){
+ if(obj.containsKey(field)){
+ return (Long)obj.get(field);
+ }
+ return null;
+ }
+
+ public static Boolean getBoolean(JSONObject obj, String field){
+ if(obj.containsKey(field)){
+ return (Boolean)obj.get(field);
+ }
+ return null;
+ }
+
+ public static JSONObject getJSONObject(JSONObject obj, String field){
+ if(obj.containsKey(field)){
+ return (JSONObject)obj.get(field);
+ }
+ return null;
+ }
+
+ public static JSONArray getJSONArray(JSONObject obj, String field){
+ if(obj.containsKey(field)){
+ return (JSONArray)obj.get(field);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
new file mode 100644
index 0000000..c5cc82f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class JobNameNormalization {
+ private static Logger logger = LoggerFactory.getLogger(JobNameNormalization.class);
+ private static JobNameNormalization instance = new JobNameNormalization();
+ private static final String JOB_NAME_NORMALIZATION_RULES_KEY = "job.name.normalization.rules.key";
+ private static final String PARAMETERIZED_PREFIX = "\\$";
+ private static final String MULTIPLE_RULE_DILIMITER = ";";
+ /**
+ * map from source string to target string
+ * source string is regular expression, for example ^(.*)[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}$
+ * target string is parameterized string, for example $1, $2
+ */
+ private List<JobNameNormalizationRule> _rules = new ArrayList<JobNameNormalizationRule>();
+
+ private enum NormalizationOp{
+ REPLACE("=>");
+ private String value;
+ private NormalizationOp(String value){
+ this.value = value;
+ }
+ public String toString(){
+ return value;
+ }
+ }
+
+ static class JobNameNormalizationRule{
+ Pattern pattern;
+ NormalizationOp op;
+ String target;
+ }
+
+ private JobNameNormalization(){
+ try{
+ // load normalization rules
+ Config conf = ConfigFactory.load();
+ String key = JOB_NAME_NORMALIZATION_RULES_KEY.toLowerCase();
+ String value = conf.getString(key);
+ if(value == null){
+ logger.info("no job name normalization rules are loaded");
+ return;
+ }
+ // multiple rules are concatenated with semicolon, i.e. ;
+ String rules[] = value.split(MULTIPLE_RULE_DILIMITER);
+ for(String rule : rules){
+ rule = rule.trim();
+ logger.info("jobNormalizationRule is loaded " + rule);
+ addRule(rule);
+ }
+ }catch(Exception ex){
+ logger.error("fail loading job name normalization rules", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public static JobNameNormalization getInstance(){
+ return instance;
+ }
+
+ private void addRule(String rule){
+ for(NormalizationOp op : NormalizationOp.values()){
+ // split the rule to be source and target string
+ String elements[] = rule.split(op.toString());
+ if(elements == null || elements.length != 2) return;
+ JobNameNormalizationRule r = new JobNameNormalizationRule();
+ r.pattern = Pattern.compile(elements[0].trim());
+ r.op = op;
+ r.target = elements[1].trim();
+ _rules.add(r);
+ break; //once one Op is matched, exit
+ }
+
+ }
+
+ public String normalize(String jobName){
+ String normalizedJobName = jobName;
+ // go through each rules and do actions
+ for(JobNameNormalizationRule rule : _rules){
+ Pattern p = rule.pattern;
+ Matcher m = p.matcher(jobName);
+ if(m.find()){
+ normalizedJobName = rule.target;
+ int c = m.groupCount();
+ for(int i=1; i<c+1; i++){
+ normalizedJobName = normalizedJobName.replaceAll(PARAMETERIZED_PREFIX+String.valueOf(i), m.group(i));
+ }
+ }
+ }
+ return normalizedJobName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
new file mode 100644
index 0000000..35014b1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public class SparkEntityConstant {
+
+ public enum SPARK_STAGE_STATUS{
+ COMPLETE, FAILED
+ }
+
+ public enum SPARK_JOB_STATUS{
+ SUCCEEDED, FAILED
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
new file mode 100644
index 0000000..1d38eea
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public enum SparkJobTagName {
+ SITE("site"),
+ SPARK_APP_ID("sprkAppId"),
+ SPARK_APP_ATTEMPT_ID("sprkAppAttemptId"),
+ SPARK_APP_NAME("sprkAppName"),
+ SPARK_APP_NORM_NAME("normSprkAppName"),
+ SPARK_JOB_ID("jobId"),
+ SPARK_SATGE_ID("stageId"),
+ SPARK_STAGE_ATTEMPT_ID("stageAttemptId"),
+ SPARK_TASK_INDEX("taskIndex"),
+ SPARK_TASK_ATTEMPT_ID("taskAttemptId"),
+ SPARK_USER("user"),
+ SPARK_QUEUE("queue"),
+ SPARK_EXECUTOR_ID("executorId");
+
+
+ private String tagName;
+ private SparkJobTagName(String tagName){
+ this.tagName = tagName;
+ }
+
+ public String toString(){
+ return this.tagName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
new file mode 100644
index 0000000..d5147b6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelector;
+import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelectorImpl;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppsWrapper;
+import org.apache.eagle.jpm.util.resourceFetch.url.JobListServiceURLBuilderImpl;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.eagle.jpm.util.resourceFetch.url.SparkCompleteJobServiceURLBuilderImpl;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class RMResourceFetcher implements ResourceFetcher{
+
+ private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class);
+ private final HAURLSelector selector;
+ private final ServiceURLBuilder jobListServiceURLBuilder;
+ private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
+
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public RMResourceFetcher(String[] RMBasePaths) {
+ this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
+ this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl();
+
+ this.selector = new HAURLSelectorImpl(RMBasePaths, jobListServiceURLBuilder, Constants.CompressionType.GZIP);
+ }
+
+ private void checkUrl() throws IOException {
+ if (!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), Constants.JobState.RUNNING.name()))) {
+ selector.reSelectUrl();
+ }
+ }
+
+ private List<Object> doFetchSparkFinishApplicationsList(String lastFinishTime) throws Exception {
+ List<AppInfo> result = null;
+ InputStream is = null;
+ try {
+ checkUrl();
+ final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), lastFinishTime);
+ LOG.info("Going to call yarn api to fetch finished spark job list: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, null, Constants.CompressionType.GZIP);
+ final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, AppsWrapper.class);
+ if (appWrapper != null && appWrapper.getApps() != null
+ && appWrapper.getApps().getApp() != null) {
+ result = appWrapper.getApps().getApp();
+ return Arrays.asList((Object)result);
+ }
+ return null;
+ }finally {
+ if (is != null) { try {is.close();} catch (Exception e){} }
+ }
+ }
+
+
+
+ public List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{
+ switch(resoureType) {
+ case COMPLETE_SPARK_JOB:
+ return doFetchSparkFinishApplicationsList((String)parameter[0]);
+
+ default:
+ throw new Exception("Not support ressourceType :" + resoureType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
new file mode 100644
index 0000000..b21d030
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+
+public interface ResourceFetcher {
+
+ List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
new file mode 100644
index 0000000..c13bee0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.eagle.jpm.util.resourceFetch.url.SparkJobServiceURLBuilderImpl;
+import org.apache.commons.codec.binary.Base64;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class SparkHistoryServerResourceFetcher implements ResourceFetcher{
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkHistoryServerResourceFetcher.class);
+
+ private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+ private String historyServerURL;
+ private final ServiceURLBuilder sparkDetailJobServiceURLBuilder;
+ private String auth;
+
+ static {
+ OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+ }
+
+ public SparkHistoryServerResourceFetcher(String historyServerURL, String userName, String pwd){
+ this.historyServerURL = historyServerURL;
+ this.sparkDetailJobServiceURLBuilder = new SparkJobServiceURLBuilderImpl();
+ this.auth = "Basic " + new String(new Base64().encode(String.format("%s:%s", userName, pwd).getBytes()));;
+ }
+
+ private List<Object> doFetchSparkApplicationDetail(String appId) throws Exception {
+ InputStream is = null;
+ try {
+ final String urlString = sparkDetailJobServiceURLBuilder.build(this.historyServerURL, appId);
+ LOG.info("Going to call spark history server api to fetch spark job: " + urlString);
+ is = InputStreamUtils.getInputStream(urlString, auth, Constants.CompressionType.NONE);
+ SparkApplication app = OBJ_MAPPER.readValue(is, SparkApplication.class);
+ return Arrays.asList((Object)app);
+ } catch (FileNotFoundException e) {
+ return null;
+ } finally {
+ if (is != null) { try {is.close();} catch (Exception e) { } }
+ }
+ }
+
+ public List<Object> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception{
+ switch(resoureType) {
+ case SPARK_JOB_DETAIL:
+ return doFetchSparkApplicationDetail((String)parameter[0]);
+ default:
+ throw new Exception("Not support resourceType :" + resoureType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
new file mode 100644
index 0000000..6d3fa45
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.apache.eagle.jpm.util.Constants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.zip.GZIPInputStream;
+
+public class InputStreamUtils {
+
+ private static final int CONNECTION_TIMEOUT = 10 * 1000;
+ private static final int READ_TIMEOUT = 5 * 60 * 1000;
+ private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
+ private static final String GZIP_COMPRESSION = "gzip";
+
+ private static InputStream openGZIPInputStream(URL url, String auth, int timeout) throws IOException {
+ final URLConnection connection = url.openConnection();
+ connection.setConnectTimeout(CONNECTION_TIMEOUT);
+ connection.setReadTimeout(timeout);
+ connection.addRequestProperty(GZIP_HTTP_HEADER, GZIP_COMPRESSION);
+ if (null != auth){
+ connection.setRequestProperty ("Authorization", auth);
+ }
+ return new GZIPInputStream(connection.getInputStream());
+ }
+
+ private static InputStream openInputStream(URL url, String auth, int timeout) throws IOException {
+ URLConnection connection = url.openConnection();
+ connection.setConnectTimeout(timeout);
+ if (null != auth){
+ connection.setRequestProperty ("Authorization", auth);
+ }
+
+ return connection.getInputStream();
+ }
+
+ public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType, int timeout) throws Exception {
+ final URL url = URLConnectionUtils.getUrl(urlString);
+ if (compressionType.equals(Constants.CompressionType.GZIP)) {
+ return openGZIPInputStream(url, auth, timeout);
+ }
+ else { // CompressionType.NONE
+ return openInputStream(url, auth, timeout);
+ }
+ }
+
+ public static InputStream getInputStream(String urlString, String auth, Constants.CompressionType compressionType) throws Exception {
+ return getInputStream(urlString, auth, compressionType, READ_TIMEOUT);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
new file mode 100644
index 0000000..2e7b248
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class JobUtils {
+
+ public static String checkAndAddLastSlash(String urlBase) {
+ if (!urlBase.endsWith("/")) {
+ return urlBase + "/";
+ }
+ return urlBase;
+ }
+
+ public static String getJobIDByAppID(String appID) {
+ if (appID.startsWith(Constants.APPLICATION_PREFIX)) {
+ return appID.replace(Constants.APPLICATION_PREFIX, Constants.JOB_PREFIX);
+ }
+ return null;
+ }
+
+ public static String getAppIDByJobID(String jobID) {
+ if (jobID.startsWith(Constants.JOB_PREFIX)) {
+ return jobID.replace(Constants.JOB_PREFIX, Constants.APPLICATION_PREFIX);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
new file mode 100644
index 0000000..d340d7b
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+
+public final class URLConnectionUtils {
+ //TODO: change some public method to private
+ private static final Logger LOG = LoggerFactory.getLogger(URLConnectionUtils.class);
+
+ public static URLConnection getConnection(String url) throws Exception {
+ if (url.startsWith("https://")) {
+ return getHTTPSConnection(url);
+ } else if (url.startsWith("http://")) {
+ return getHTTPConnection(url);
+ }
+ throw new Exception("Invalid input argument url: " + url);
+ }
+
+ public static URLConnection getHTTPConnection(String urlString) throws Exception {
+ final URL url = new URL(urlString);
+ return url.openConnection();
+ }
+
+ public static URL getUrl(String urlString) throws Exception {
+ if(urlString.toLowerCase().contains("https")){
+ return getHTTPSUrl(urlString);
+ }else if (urlString.toLowerCase().contains("http")) {
+ return getURL(urlString);
+ }
+ throw new Exception("Invalid input argument url: " + urlString);
+ }
+
+ public static URL getURL(String urlString) throws MalformedURLException {
+ return new URL(urlString);
+ }
+
+ public static URL getHTTPSUrl(String urlString) throws MalformedURLException, NoSuchAlgorithmException, KeyManagementException {
+ // Create a trust manager that does not validate certificate chains
+ final TrustManager[] trustAllCerts = new TrustManager[] {new TrustAllX509TrustManager()};
+ // Install the all-trusting trust manager
+ final SSLContext sc = SSLContext.getInstance("SSL");
+ sc.init(null, trustAllCerts, new java.security.SecureRandom());
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+ // Create all-trusting host name verifier
+ final HostnameVerifier allHostsValid = new HostnameVerifier() {
+ public boolean verify(String hostname, SSLSession session) {
+ return true;
+ }
+ };
+ HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
+ return new URL(urlString);
+ }
+
+ public static URLConnection getHTTPSConnection(String urlString) throws IOException, KeyManagementException, NoSuchAlgorithmException {
+ final URL url = getHTTPSUrl(urlString);
+ return url.openConnection();
+ }
+
+ public static class TrustAllX509TrustManager implements X509TrustManager {
+ @Override
+ public void checkClientTrusted(
+ java.security.cert.X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(
+ java.security.cert.X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
+
+ @Override
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
new file mode 100644
index 0000000..6eea7e3
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.ha;
+
+import java.io.IOException;
+
+public interface HAURLSelector {
+
+ boolean checkUrl(String url);
+
+ void reSelectUrl() throws IOException;
+
+ String getSelectedUrl();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
new file mode 100644
index 0000000..6518ca1
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.ha;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+public class HAURLSelectorImpl implements HAURLSelector {
+
+ private final String[] urls;
+ private volatile String selectedUrl;
+ private final ServiceURLBuilder builder;
+
+ private volatile boolean reselectInProgress;
+ private final Constants.CompressionType compressionType;
+ private static final long MAX_RETRY_TIME = 3;
+ private static final Logger LOG = LoggerFactory.getLogger(HAURLSelectorImpl.class);
+
+ public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, Constants.CompressionType compressionType) {
+ this.urls = urls;
+ this.compressionType = compressionType;
+ this.builder = builder;
+ }
+
+ public boolean checkUrl(String urlString) {
+ InputStream is = null;
+ try {
+ is = InputStreamUtils.getInputStream(urlString, null, compressionType);
+ }
+ catch (Exception ex) {
+ LOG.info("get inputstream from url: " + urlString + " failed. ");
+ return false;
+ }
+ finally {
+ if (is != null) { try { is.close(); } catch (IOException e) {/*Do nothing*/} }
+ }
+ return true;
+ }
+
+ @Override
+ public String getSelectedUrl() {
+ if (selectedUrl == null) {
+ selectedUrl = urls[0];
+ }
+ return selectedUrl;
+ }
+
+ @Override
+ public void reSelectUrl() throws IOException {
+ if (reselectInProgress) return;
+ synchronized(this) {
+ if (reselectInProgress) return;
+ reselectInProgress = true;
+ try {
+ LOG.info("Going to reselect url");
+ for (int i = 0; i < urls.length; i++) {
+ String urlToCheck = urls[i];
+ LOG.info("Going to try url :" + urlToCheck);
+ for (int time = 0; time < MAX_RETRY_TIME; time++) {
+ if (checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name()))) {
+ selectedUrl = urls[i];
+ LOG.info("Successfully switch to new url : " + selectedUrl);
+ return;
+ }
+ LOG.info("try url " + urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try again. ");
+ try {
+ Thread.sleep(5 * 1000);
+ }
+ catch (InterruptedException ex) { /* Do Nothing */}
+ }
+ }
+ throw new IOException("No alive url found: "+ StringUtils.join(";", Arrays.asList(this.urls)));
+ }
+ finally {
+ reselectInProgress = false;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
new file mode 100644
index 0000000..463ce1e
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppInfo {
+ String id;
+ String user;
+ String name;
+ String queue;
+ String state;
+ String finalStatus;
+ double progress;
+ String trackingUI;
+ String trackingUrl;
+ String diagnostics;
+ String clusterId;
+ String applicationType;
+ long startedTime;
+ long finishedTime;
+ long elapsedTime;
+ String amContainerLogs;
+ String amHostHttpAddress;
+
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public String getUser() {
+ return user;
+ }
+ public void setUser(String user) {
+ this.user = user;
+ }
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public String getQueue() {
+ return queue;
+ }
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+ public String getState() {
+ return state;
+ }
+ public void setState(String state) {
+ this.state = state;
+ }
+ public String getFinalStatus() {
+ return finalStatus;
+ }
+ public void setFinalStatus(String finalStatus) {
+ this.finalStatus = finalStatus;
+ }
+ public double getProgress() {
+ return progress;
+ }
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+ public String getTrackingUI() {
+ return trackingUI;
+ }
+ public void setTrackingUI(String trackingUI) {
+ this.trackingUI = trackingUI;
+ }
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ }
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+ public void setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+ public String getClusterId() {
+ return clusterId;
+ }
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+ public String getApplicationType() {
+ return applicationType;
+ }
+ public void setApplicationType(String applicationType) {
+ this.applicationType = applicationType;
+ }
+ public long getStartedTime() {
+ return startedTime;
+ }
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ }
+ public long getFinishedTime() {
+ return finishedTime;
+ }
+ public void setFinishedTime(long finishedTime) {
+ this.finishedTime = finishedTime;
+ }
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+ public String getAmContainerLogs() {
+ return amContainerLogs;
+ }
+ public void setAmContainerLogs(String amContainerLogs) {
+ this.amContainerLogs = amContainerLogs;
+ }
+ public String getAmHostHttpAddress() {
+ return amHostHttpAddress;
+ }
+ public void setAmHostHttpAddress(String amHostHttpAddress) {
+ this.amHostHttpAddress = amHostHttpAddress;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
new file mode 100644
index 0000000..741fa1d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Applications {
+
+ private List<AppInfo> app;
+
+ public List<AppInfo> getApp() {
+ return app;
+ }
+
+ public void setApp(List<AppInfo> app) {
+ this.app = app;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
new file mode 100644
index 0000000..d791685
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppsWrapper {
+
+ private Applications apps;
+
+ public Applications getApps() {
+ return apps;
+ }
+
+ public void setApps(Applications apps) {
+ this.apps = apps;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
new file mode 100644
index 0000000..5d25d84
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplication {
+ String id;
+ String name;
+ List<SparkApplicationAttempt> attempts;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List<SparkApplicationAttempt> getAttempts() {
+ return attempts;
+ }
+
+ public void setAttempts(List<SparkApplicationAttempt> attempts) {
+ this.attempts = attempts;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
new file mode 100644
index 0000000..6e91c03
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationAttempt {
+ String attemptId;
+ String sparkUser;
+ String startTime;
+ String endTime;
+ boolean completed;
+
+ public String getAttemptId() {
+ return attemptId;
+ }
+
+ public void setAttemptId(String attemptId) {
+ this.attemptId = attemptId;
+ }
+
+ public String getSparkUser() {
+ return sparkUser;
+ }
+
+ public void setSparkUser(String sparkUser) {
+ this.sparkUser = sparkUser;
+ }
+
+ public String getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(String startTime) {
+ this.startTime = startTime;
+ }
+
+ public String getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(String endTime) {
+ this.endTime = endTime;
+ }
+
+ public boolean isCompleted() {
+ return completed;
+ }
+
+ public void setCompleted(boolean completed) {
+ this.completed = completed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
new file mode 100644
index 0000000..5508863
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationWrapper {
+
+ SparkApplication app;
+
+ public SparkApplication getApp() {
+ return app;
+ }
+
+ public void setApp(SparkApplication app) {
+ this.app = app;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
new file mode 100644
index 0000000..3e21e8d
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class JobListServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String ... parameters) {
+ // {rmUrl}/ws/v1/cluster/apps?state=RUNNING
+ String jobState = parameters[1];
+ if (jobState.equals(Constants.JobState.RUNNING.name())) {
+ return parameters[0] + "/" + Constants.V2_APPS_RUNNING_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+ }
+ else if (jobState.equals(Constants.JobState.COMPLETED.name())) {
+ return parameters[0] + "/" + Constants.V2_APPS_COMPLETED_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+ }
+ else if (jobState.equals(Constants.JobState.ALL.name())) {
+ return parameters[0] + "/" + Constants.V2_APPS_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
new file mode 100644
index 0000000..597e359
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+public interface ServiceURLBuilder {
+ String build(String... parameters);
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
new file mode 100644
index 0000000..d4d235f
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class SparkCompleteJobServiceURLBuilderImpl implements ServiceURLBuilder {
+
+ public String build(String... parameters) {
+ return String.format("%s/%s?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin=%s&%s", parameters[0], Constants.V2_APPS_URL, parameters[1], Constants.ANONYMOUS_PARAMETER);
+ }
+}