You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:27 UTC
[31/52] [abbrv] incubator-eagle git commit: [EAGLE-502] Always retry
to parse spark history job when failure happens.
[EAGLE-502] Always retry to parse spark history job when failure happens.
Author: pkuwm <ih...@gmail.com>
Closes #390 from pkuwm/EAGLE-502.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3f7004f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3f7004f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3f7004f1
Branch: refs/heads/master
Commit: 3f7004f1cc939ba2e633e38248c322d2216e48a9
Parents: e778775
Author: pkuwm <ih...@gmail.com>
Authored: Fri Sep 2 10:51:31 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Fri Sep 2 10:51:31 2016 +0800
----------------------------------------------------------------------
.../jpm/spark/crawl/JHFSparkEventReader.java | 6 +--
.../status/JobHistoryZKStateManager.java | 26 ++++--------
.../history/storm/SparkHistoryJobSpout.java | 44 ++++++--------------
.../src/main/resources/application.conf | 2 +-
.../jpm/spark/running/SparkRunningJobApp.java | 2 +-
.../storm/SparkRunningJobFetchSpout.java | 4 +-
.../running/storm/SparkRunningJobParseBolt.java | 1 +
.../jpm/util/resourcefetch/ResourceFetcher.java | 2 +-
.../SparkHistoryServerResourceFetcher.java | 6 +--
.../hbase/HbaseMetadataBrowseWebResource.java | 16 ++++---
10 files changed, 41 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index fe02da5..6c68b48 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -17,14 +17,14 @@
package org.apache.eagle.jpm.spark.crawl;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.commons.lang.ArrayUtils;
import org.apache.eagle.jpm.spark.entity.*;
import org.apache.eagle.jpm.util.*;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
@@ -353,7 +353,7 @@ public class JHFSparkEventReader {
stage.setCompleteTime(completeTime);
this.lastEventTime = completeTime;
- if (stageInfo.containsKey("Failure Reason")) {
+ if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
} else {
stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 7a95e56..9fafc1f 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -35,10 +35,11 @@ import java.util.Iterator;
import java.util.List;
public class JobHistoryZKStateManager {
- public static final Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
+ private final static Logger LOG = LoggerFactory.getLogger(JobHistoryZKStateManager.class);
+
+ private final static String START_TIMESTAMP = "lastAppTime";
private String zkRoot;
private CuratorFramework curator;
- private static String START_TIMESTAMP = "lastAppTime";
private CuratorFramework newCurator(SparkHistoryJobAppConfig config) throws Exception {
return CuratorFrameworkFactory.newClient(
@@ -72,7 +73,7 @@ public class JobHistoryZKStateManager {
InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
try {
lock.acquire();
- Iterator<String> iter = curator.getChildren().forPath(jobPath).iterator();
+ Iterator<String> iter = curator.getChildren().forPath(jobPath).iterator();
while (iter.hasNext()) {
String appId = iter.next();
String path = jobPath + "/" + appId;
@@ -104,9 +105,7 @@ public class JobHistoryZKStateManager {
InterProcessLock lock = new InterProcessReadWriteLock(curator,jobPath).writeLock();
try {
lock.acquire();
- Iterator<String> iter = curator.getChildren().forPath(jobPath).iterator();
- while (iter.hasNext()) {
- String appId = iter.next();
+ (curator.getChildren().forPath(jobPath)).forEach(appId -> {
String path = jobPath + "/" + appId;
try {
if (curator.checkExists().forPath(path) != null) {
@@ -119,7 +118,7 @@ public class JobHistoryZKStateManager {
LOG.error("fail to read unprocessed job", e);
throw new RuntimeException(e);
}
- }
+ });
} catch (Exception e) {
LOG.error("fail to read unprocessed jobs", e);
@@ -174,10 +173,7 @@ public class JobHistoryZKStateManager {
public boolean hasApplication(String appId) {
String path = zkRoot + "/jobs/" + appId;
try {
- if (curator.checkExists().forPath(path) != null) {
- return true;
- }
- return false;
+ return curator.checkExists().forPath(path) != null;
} catch (Exception e) {
LOG.error("fail to check whether application exists", e);
throw new RuntimeException(e);
@@ -191,7 +187,7 @@ public class JobHistoryZKStateManager {
curator.delete().deletingChildrenIfNeeded().forPath(path);
}
- name = name.replace("/","_");
+ name = name.replace("/", "_");
if (name.length() > 50) {
name = name.substring(0, 50);
}
@@ -226,7 +222,6 @@ public class JobHistoryZKStateManager {
}
public void updateApplicationStatus(String appId, Enum<ZKStateConstant.AppStatus> status) {
-
String path = zkRoot + "/jobs/" + appId ;
InterProcessLock lock = new InterProcessReadWriteLock(curator,zkRoot + "/jobs").readLock();
try {
@@ -238,8 +233,7 @@ public class JobHistoryZKStateManager {
curator.setData().forPath(path, status.toString().getBytes("UTF-8"));
}
} else {
- String errorMsg = String.format("fail to update for application with path %s", path);
- LOG.error(errorMsg);
+ LOG.error("Failed to update for application with path: " + path);
}
} catch (Exception e) {
LOG.error("fail to update application status", e);
@@ -252,8 +246,6 @@ public class JobHistoryZKStateManager {
} catch (Exception e) {
LOG.error("fail to release lock",e);
}
-
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
index db60744..5602b4c 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -19,6 +19,13 @@
package org.apache.eagle.jpm.spark.history.storm;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
@@ -26,16 +33,10 @@ import org.apache.eagle.jpm.util.Constants;
import org.apache.eagle.jpm.util.resourcefetch.RMResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.ResourceFetcher;
import org.apache.eagle.jpm.util.resourcefetch.model.AppInfo;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.Calendar;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,9 +47,6 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
private SparkHistoryJobAppConfig config;
private ResourceFetcher rmFetch;
private long lastFinishAppTime = 0;
- private Map<String, Integer> failTimes;
-
- private static final int FAIL_MAX_TIMES = 5;
public SparkHistoryJobSpout(SparkHistoryJobAppConfig config) {
this.config = config;
@@ -57,7 +55,6 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
rmFetch = new RMResourceFetcher(config.jobHistoryConfig.rms);
- this.failTimes = new HashMap<>();
this.collector = spoutOutputCollector;
this.zkState = new JobHistoryZKStateManager(config);
this.lastFinishAppTime = zkState.readLastFinishedTimestamp();
@@ -75,7 +72,6 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
LOG.info("Last finished time = {}", calendar.getTime());
if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) {
List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime));
- //List<AppInfo> appInfos = (null != apps ? (List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
if (appInfos != null) {
LOG.info("Get " + appInfos.size() + " from yarn resource manager.");
for (AppInfo app : appInfos) {
@@ -120,29 +116,15 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
@Override
public void fail(Object msgId) {
- String appId = (String) msgId;
- int failTimes = 0;
- if (this.failTimes.containsKey(appId)) {
- failTimes = this.failTimes.get(appId);
- }
- failTimes++;
- if (failTimes >= FAIL_MAX_TIMES) {
- this.failTimes.remove(appId);
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.FINISHED);
- LOG.error(String.format("Application %s has failed for over %s times, drop it.", appId, FAIL_MAX_TIMES));
- } else {
- this.failTimes.put(appId, failTimes);
- collector.emit(new Values(appId), appId);
- zkState.updateApplicationStatus(appId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
- }
+ // Sleep 3 seconds and retry.
+ Utils.sleep(3000);
+
+ collector.emit(new Values(msgId), msgId);
+ zkState.updateApplicationStatus((String)msgId, ZKStateConstant.AppStatus.SENT_FOR_PARSE);
}
@Override
public void ack(Object msgId) {
- String appId = (String) msgId;
- if (this.failTimes.containsKey(appId)) {
- this.failTimes.remove(appId);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 289c6f7..483e2e9 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -17,7 +17,7 @@
{
"basic":{
"cluster":"sandbox",
- "datacenter":"sandbox",
+ "dataCenter":"sandbox",
jobConf.additional.info: ""
},
"eagleProps":{
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
index 5e21406..2ee2a04 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobApp.java
@@ -65,4 +65,4 @@ public class SparkRunningJobApp extends StormApplication {
return topologyBuilder.createTopology();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
index 7162bac..76c7815 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobFetchSpout.java
@@ -67,6 +67,7 @@ public class SparkRunningJobFetchSpout extends BaseRichSpout {
this.sparkRunningJobManager = new SparkRunningJobManager(zkStateConfig);
}
+ @SuppressWarnings("unchecked")
@Override
public void nextTuple() {
LOG.info("Start to fetch spark running jobs");
@@ -154,8 +155,7 @@ public class SparkRunningJobFetchSpout extends BaseRichSpout {
//content of path /apps/spark/running/yarnAppId/appId is SparkAppEntity(current attempt)
//as we know, a yarn application may contains many spark applications
//so, the returned results is a Map, key is yarn appId
- Map<String, Map<String, SparkAppEntity>> result = this.sparkRunningJobManager.recover();
- return result;
+ return this.sparkRunningJobManager.recover();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
index a497e29..9c0ffef 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/storm/SparkRunningJobParseBolt.java
@@ -71,6 +71,7 @@ public class SparkRunningJobParseBolt extends BaseRichBolt {
this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
}
+ @SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
AppInfo appInfo = (AppInfo)tuple.getValue(1);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
index f920ddb..4999315 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ResourceFetcher.java
@@ -22,6 +22,6 @@ import java.util.List;
public interface ResourceFetcher<T> {
//continue to refactor later
- List<T> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception;
+ List<T> getResource(Constants.ResourceType resourceType, Object... parameter) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
index ce2d9b8..d9390c1 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/SparkHistoryServerResourceFetcher.java
@@ -77,12 +77,12 @@ public class SparkHistoryServerResourceFetcher implements ResourceFetcher<SparkA
}
}
- public List<SparkApplication> getResource(Constants.ResourceType resoureType, Object... parameter) throws Exception {
- switch (resoureType) {
+ public List<SparkApplication> getResource(Constants.ResourceType resourceType, Object... parameter) throws Exception {
+ switch (resourceType) {
case SPARK_JOB_DETAIL:
return doFetchSparkApplicationDetail((String) parameter[0]);
default:
- throw new Exception("Not support resourceType :" + resoureType);
+ throw new Exception("Not support resourceType :" + resourceType);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3f7004f1/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
index 68ea552..69be2e5 100644
--- a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
@@ -73,7 +73,7 @@ public class HbaseMetadataBrowseWebResource {
private Configuration convert(Map<String, Object> originalConfig) throws Exception {
Configuration config = new Configuration();
for (Map.Entry<String, Object> entry : originalConfig.entrySet()) {
- config.set(entry.getKey().toString(), entry.getValue().toString());
+ config.set(entry.getKey(), entry.getValue().toString());
}
return config;
}
@@ -125,12 +125,11 @@ public class HbaseMetadataBrowseWebResource {
response.setException(EagleExceptionWrapper.wrap(ex));
}
if(tables != null) {
- String resource = null;
for (String table : tables) {
- resource = String.format("%s:%s", namespace, table);
+ String resource = String.format("%s:%s", namespace, table);
Set<String> childSensitiveTypes = new HashSet<>();
- String senstiveType = checkSensitivity(site, resource, childSensitiveTypes);
- values.add(new HbaseResourceEntity(resource, namespace, table, null, senstiveType, childSensitiveTypes));
+ String sensitiveType = checkSensitivity(site, resource, childSensitiveTypes);
+ values.add(new HbaseResourceEntity(resource, namespace, table, null, sensitiveType, childSensitiveTypes));
}
}
response.setObj(values);
@@ -157,12 +156,11 @@ public class HbaseMetadataBrowseWebResource {
response.setException(EagleExceptionWrapper.wrap(ex));
}
if(columns != null) {
- String resource = null;
for (String col : columns) {
- resource = String.format("%s:%s:%s", namespace, table, col);
+ String resource = String.format("%s:%s:%s", namespace, table, col);
Set<String> childSensitiveTypes = new HashSet<>();
- String senstiveType = checkSensitivity(site, resource, childSensitiveTypes);
- values.add(new HbaseResourceEntity(resource, namespace, table, col, senstiveType, childSensitiveTypes));
+ String sensitiveType = checkSensitivity(site, resource, childSensitiveTypes);
+ values.add(new HbaseResourceEntity(resource, namespace, table, col, sensitiveType, childSensitiveTypes));
}
}
response.setObj(values);