You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:28 UTC
[32/52] [abbrv] incubator-eagle git commit: [EAGLE-511] Fix
NullPointerException for spark history job
[EAGLE-511] Fix NullPointerException for spark history job
Author: pkuwm <ih...@gmail.com>
Closes #404 from pkuwm/EAGLE-511.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9488afc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9488afc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9488afc1
Branch: refs/heads/master
Commit: 9488afc15e3d721f55aaaf10da46673d3ebb69a7
Parents: 3f7004f
Author: pkuwm <ih...@gmail.com>
Authored: Fri Sep 2 10:58:45 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Fri Sep 2 10:58:45 2016 +0800
----------------------------------------------------------------------
.../environment/impl/StormExecutionRuntime.java | 4 +--
.../jpm/spark/crawl/JHFSparkEventReader.java | 28 +++++++++++---------
.../eagle/jpm/spark/crawl/JHFSparkParser.java | 2 +-
.../eagle/jpm/spark/entity/SparkTask.java | 6 ++---
.../spark/history/SparkHistoryJobAppConfig.java | 2 --
.../src/main/resources/application.conf | 3 +--
.../org/apache/eagle/jpm/util/JSONUtils.java | 15 ++++++-----
7 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 1b989ac..04cc19b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -98,9 +98,9 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
String topologyName = config.getString("appId");
Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
StormTopology topology = executor.execute(config, environment);
- LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
+ LOG.info("Starting {} ({}), mode: {}",topologyName,executor.getClass().getCanonicalName(), config.getString("mode"));
Config conf = getStormConfig();
- if(config.getString("mode").equals(ApplicationEntity.Mode.CLUSTER.name())){
+ if(ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))){
String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
if(jarFile == null){
jarFile = DynamicJarPathFinder.findPath(executor.getClass());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index 6c68b48..22b715a 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.eagle.jpm.spark.entity.*;
import org.apache.eagle.jpm.util.*;
import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import org.json.simple.JSONArray;
@@ -30,6 +31,7 @@ import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.*;
public class JHFSparkEventReader {
@@ -44,7 +46,7 @@ public class JHFSparkEventReader {
private Map<Integer, SparkJob> jobs;
private Map<String, SparkStage> stages;
private Map<Integer, Set<String>> jobStageMap;
- private Map<Integer, SparkTask> tasks;
+ private Map<Long, SparkTask> tasks;
private EagleServiceClientImpl client;
private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
@@ -61,7 +63,7 @@ public class JHFSparkEventReader {
jobs = new HashMap<Integer, SparkJob>();
stages = new HashMap<String, SparkStage>();
jobStageMap = new HashMap<Integer, Set<String>>();
- tasks = new HashMap<Integer, SparkTask>();
+ tasks = new HashMap<Long, SparkTask>();
executors = new HashMap<String, SparkExecutor>();
stageTaskStatusMap = new HashMap<>();
conf = ConfigFactory.load();
@@ -72,7 +74,7 @@ public class JHFSparkEventReader {
return this.app;
}
- public void read(JSONObject eventObj) throws Exception {
+ public void read(JSONObject eventObj) {
String eventType = (String) eventObj.get("Event");
if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
handleAppStarted(eventObj);
@@ -168,7 +170,7 @@ public class JHFSparkEventReader {
this.lastEventTime = appStartTime;
}
- private void handleExecutorAdd(JSONObject event) throws Exception {
+ private void handleExecutorAdd(JSONObject event) {
String executorID = (String) event.get("Executor ID");
long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
this.lastEventTime = executorAddTime;
@@ -178,7 +180,7 @@ public class JHFSparkEventReader {
}
- private void handleBlockManagerAdd(JSONObject event) throws Exception {
+ private void handleBlockManagerAdd(JSONObject event) {
long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
this.lastEventTime = timestamp;
@@ -197,7 +199,7 @@ public class JHFSparkEventReader {
private void handleTaskEnd(JSONObject event) {
JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
- int taskId = JSONUtils.getInt(taskInfo, "Task ID");
+ long taskId = JSONUtils.getLong(taskInfo, "Task ID");
SparkTask task = tasks.get(taskId);
if (task == null) {
return;
@@ -261,10 +263,10 @@ public class JHFSparkEventReader {
task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
- int taskId = JSONUtils.getInt(taskInfo, "Task ID");
+ long taskId = JSONUtils.getLong(taskInfo, "Task ID");
task.setTaskId(taskId);
- task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Index")));
+ task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
this.lastEventTime = launchTime;
@@ -323,7 +325,7 @@ public class JHFSparkEventReader {
String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
- if (!stages.containsKey(this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)))) {
+ if (!stages.containsKey(key)) {
//may be further attempt for one stage
String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
if (stages.containsKey(baseAttempt)) {
@@ -651,7 +653,7 @@ public class JHFSparkEventReader {
}
- private SparkExecutor initiateExecutor(String executorID, long startTime) throws Exception {
+ private SparkExecutor initiateExecutor(String executorID, long startTime) {
if (!executors.containsKey(executorID)) {
SparkExecutor executor = new SparkExecutor();
executor.setTags(new HashMap<>(this.app.getTags()));
@@ -703,9 +705,9 @@ public class JHFSparkEventReader {
return client;
}
- private void doFlush(List entities) throws Exception {
- LOG.info("start flushing entities of total number " + entities.size());
+ private void doFlush(List entities) throws IOException, EagleServiceClientException {
client.create(entities);
- LOG.info("finish flushing entities of total number " + entities.size());
+ int size = (entities == null ? 0 : entities.size());
+ LOG.info("finish flushing entities of total number " + size);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
index 2ba3c73..02fc5cf 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
@@ -50,7 +50,7 @@ public class JHFSparkParser implements JHFParserBase {
try {
this.eventReader.read(eventObj);
} catch (Exception e) {
- logger.error("Fail to read eventObj. Exception: " + e);
+ e.printStackTrace();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
index fb2fce5..5d8f1d3 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
@@ -34,7 +34,7 @@ import org.codehaus.jackson.annotate.JsonIgnoreProperties;
public class SparkTask extends TaggedLogAPIEntity {
@Column("a")
- private int taskId;
+ private long taskId;
@Column("b")
private long launchTime;
@Column("c")
@@ -80,7 +80,7 @@ public class SparkTask extends TaggedLogAPIEntity {
@Column("v")
private boolean failed;
- public int getTaskId() {
+ public long getTaskId() {
return taskId;
}
@@ -177,7 +177,7 @@ public class SparkTask extends TaggedLogAPIEntity {
valueChanged("failed");
}
- public void setTaskId(int taskId) {
+ public void setTaskId(long taskId) {
this.taskId = taskId;
valueChanged("taskId");
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index ed499db..0fc74d7 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -82,7 +82,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
- this.stormConfig.mode = config.getString("storm.mode");
this.stormConfig.topologyName = config.getString("storm.name");
this.stormConfig.workerNo = config.getInt("storm.worker.num");
this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
@@ -118,7 +117,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
}
public static class StormConfig implements Serializable {
- public String mode;
public int workerNo;
public int timeoutSec;
public String topologyName;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 483e2e9..58dd552 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -46,7 +46,6 @@
},
"storm":{
worker.num: 2,
- "mode": "local",
"name":"sparkHistoryJob",
"messageTimeoutSec": 3000,
"pendingSpout": 1000,
@@ -75,5 +74,5 @@
}
},
"appId": "sparkHistoryJob",
- "mode": "LOCAL"
+ "mode": "CLUSTER"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
index 38500b0..8a12cc5 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
@@ -30,23 +30,24 @@ public class JSONUtils {
}
try {
- return obj.get(field).toString();
+ return (String) obj.get(field);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
}
- public static int getInt(JSONObject obj, String field) {
+ public static int getInt(JSONObject obj, String field) throws JSONException {
if (obj == null || StringUtils.isEmpty(field)) {
return 0;
}
-
+ Object object = obj.get(field);
try {
- return (int) obj.get(field);
- } catch (JSONException e) {
- e.printStackTrace();
- return 0;
+ return object instanceof Number ? ((Number) object).intValue()
+ : Integer.parseInt((String) object);
+ } catch (Exception e) {
+ throw new JSONException("JSONObject[" + field
+ + "] is not an int.");
}
}