You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2016/09/02 02:58:52 UTC

incubator-eagle git commit: [EAGLE-511] Fix NullPointerException for spark history job

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 3f7004f1c -> 9488afc15


[EAGLE-511] Fix NullPointerException for spark history job

Author: pkuwm <ih...@gmail.com>

Closes #404 from pkuwm/EAGLE-511.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9488afc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9488afc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9488afc1

Branch: refs/heads/develop
Commit: 9488afc15e3d721f55aaaf10da46673d3ebb69a7
Parents: 3f7004f
Author: pkuwm <ih...@gmail.com>
Authored: Fri Sep 2 10:58:45 2016 +0800
Committer: Qingwen Zhao <qi...@gmail.com>
Committed: Fri Sep 2 10:58:45 2016 +0800

----------------------------------------------------------------------
 .../environment/impl/StormExecutionRuntime.java |  4 +--
 .../jpm/spark/crawl/JHFSparkEventReader.java    | 28 +++++++++++---------
 .../eagle/jpm/spark/crawl/JHFSparkParser.java   |  2 +-
 .../eagle/jpm/spark/entity/SparkTask.java       |  6 ++---
 .../spark/history/SparkHistoryJobAppConfig.java |  2 --
 .../src/main/resources/application.conf         |  3 +--
 .../org/apache/eagle/jpm/util/JSONUtils.java    | 15 ++++++-----
 7 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 1b989ac..04cc19b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -98,9 +98,9 @@ public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,
         String topologyName = config.getString("appId");
         Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
         StormTopology topology = executor.execute(config, environment);
-        LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
+        LOG.info("Starting {} ({}), mode: {}",topologyName,executor.getClass().getCanonicalName(), config.getString("mode"));
         Config conf = getStormConfig();
-        if(config.getString("mode").equals(ApplicationEntity.Mode.CLUSTER.name())){
+        if(ApplicationEntity.Mode.CLUSTER.name().equalsIgnoreCase(config.getString("mode"))){
             String jarFile = config.hasPath("jarPath") ? config.getString("jarPath") : null;
             if(jarFile == null){
                 jarFile = DynamicJarPathFinder.findPath(executor.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index 6c68b48..22b715a 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.ArrayUtils;
 import org.apache.eagle.jpm.spark.entity.*;
 import org.apache.eagle.jpm.util.*;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
 import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import org.json.simple.JSONArray;
@@ -30,6 +31,7 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.*;
 
 public class JHFSparkEventReader {
@@ -44,7 +46,7 @@ public class JHFSparkEventReader {
     private Map<Integer, SparkJob> jobs;
     private Map<String, SparkStage> stages;
     private Map<Integer, Set<String>> jobStageMap;
-    private Map<Integer, SparkTask> tasks;
+    private Map<Long, SparkTask> tasks;
     private EagleServiceClientImpl client;
     private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
 
@@ -61,7 +63,7 @@ public class JHFSparkEventReader {
         jobs = new HashMap<Integer, SparkJob>();
         stages = new HashMap<String, SparkStage>();
         jobStageMap = new HashMap<Integer, Set<String>>();
-        tasks = new HashMap<Integer, SparkTask>();
+        tasks = new HashMap<Long, SparkTask>();
         executors = new HashMap<String, SparkExecutor>();
         stageTaskStatusMap = new HashMap<>();
         conf = ConfigFactory.load();
@@ -72,7 +74,7 @@ public class JHFSparkEventReader {
         return this.app;
     }
 
-    public void read(JSONObject eventObj) throws Exception {
+    public void read(JSONObject eventObj) {
         String eventType = (String) eventObj.get("Event");
         if (eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString())) {
             handleAppStarted(eventObj);
@@ -168,7 +170,7 @@ public class JHFSparkEventReader {
         this.lastEventTime = appStartTime;
     }
 
-    private void handleExecutorAdd(JSONObject event) throws Exception {
+    private void handleExecutorAdd(JSONObject event) {
         String executorID = (String) event.get("Executor ID");
         long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
         this.lastEventTime = executorAddTime;
@@ -178,7 +180,7 @@ public class JHFSparkEventReader {
 
     }
 
-    private void handleBlockManagerAdd(JSONObject event) throws Exception {
+    private void handleBlockManagerAdd(JSONObject event) {
         long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
         long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
         this.lastEventTime = timestamp;
@@ -197,7 +199,7 @@ public class JHFSparkEventReader {
 
     private void handleTaskEnd(JSONObject event) {
         JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
-        int taskId = JSONUtils.getInt(taskInfo, "Task ID");
+        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
         SparkTask task = tasks.get(taskId);
         if (task == null) {
             return;
@@ -261,10 +263,10 @@ public class JHFSparkEventReader {
         task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
 
         JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
-        int taskId = JSONUtils.getInt(taskInfo, "Task ID");
+        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
         task.setTaskId(taskId);
 
-        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Index")));
+        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Long.toString(JSONUtils.getLong(taskInfo, "Index")));
         task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
         long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
         this.lastEventTime = launchTime;
@@ -323,7 +325,7 @@ public class JHFSparkEventReader {
         String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
         stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
 
-        if (!stages.containsKey(this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId)))) {
+        if (!stages.containsKey(key)) {
             //may be further attempt for one stage
             String baseAttempt = this.generateStageKey(Integer.toString(stageId), "0");
             if (stages.containsKey(baseAttempt)) {
@@ -651,7 +653,7 @@ public class JHFSparkEventReader {
     }
 
 
-    private SparkExecutor initiateExecutor(String executorID, long startTime) throws Exception {
+    private SparkExecutor initiateExecutor(String executorID, long startTime) {
         if (!executors.containsKey(executorID)) {
             SparkExecutor executor = new SparkExecutor();
             executor.setTags(new HashMap<>(this.app.getTags()));
@@ -703,9 +705,9 @@ public class JHFSparkEventReader {
         return client;
     }
 
-    private void doFlush(List entities) throws Exception {
-        LOG.info("start flushing entities of total number " + entities.size());
+    private void doFlush(List entities) throws IOException, EagleServiceClientException {
         client.create(entities);
-        LOG.info("finish flushing entities of total number " + entities.size());
+        int size = (entities == null ? 0 : entities.size());
+        LOG.info("finish flushing entities of total number " + size);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
index 2ba3c73..02fc5cf 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
@@ -50,7 +50,7 @@ public class JHFSparkParser implements JHFParserBase {
                     try {
                         this.eventReader.read(eventObj);
                     } catch (Exception e) {
-                        logger.error("Fail to read eventObj. Exception: " + e);
+                        e.printStackTrace();
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
index fb2fce5..5d8f1d3 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/entity/SparkTask.java
@@ -34,7 +34,7 @@ import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 public class SparkTask extends TaggedLogAPIEntity {
 
     @Column("a")
-    private int taskId;
+    private long taskId;
     @Column("b")
     private long launchTime;
     @Column("c")
@@ -80,7 +80,7 @@ public class SparkTask extends TaggedLogAPIEntity {
     @Column("v")
     private boolean failed;
 
-    public int getTaskId() {
+    public long getTaskId() {
         return taskId;
     }
 
@@ -177,7 +177,7 @@ public class SparkTask extends TaggedLogAPIEntity {
         valueChanged("failed");
     }
 
-    public void setTaskId(int taskId) {
+    public void setTaskId(long taskId) {
         this.taskId = taskId;
         valueChanged("taskId");
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index ed499db..0fc74d7 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -82,7 +82,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
         this.eagleInfo.host = config.getString("eagleProps.eagle.service.host");
         this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
 
-        this.stormConfig.mode = config.getString("storm.mode");
         this.stormConfig.topologyName = config.getString("storm.name");
         this.stormConfig.workerNo = config.getInt("storm.worker.num");
         this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
@@ -118,7 +117,6 @@ public class SparkHistoryJobAppConfig implements Serializable {
     }
 
     public static class StormConfig implements Serializable {
-        public String mode;
         public int workerNo;
         public int timeoutSec;
         public String topologyName;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 483e2e9..58dd552 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -46,7 +46,6 @@
   },
   "storm":{
     worker.num: 2,
-    "mode": "local",
     "name":"sparkHistoryJob",
     "messageTimeoutSec": 3000,
     "pendingSpout": 1000,
@@ -75,5 +74,5 @@
     }
   },
   "appId": "sparkHistoryJob",
-  "mode": "LOCAL"
+  "mode": "CLUSTER"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9488afc1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
index 38500b0..8a12cc5 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
@@ -30,23 +30,24 @@ public class JSONUtils {
         }
 
         try {
-            return obj.get(field).toString();
+            return (String) obj.get(field);
         } catch (JSONException e) {
             e.printStackTrace();
             return null;
         }
     }
 
-    public static int getInt(JSONObject obj, String field) {
+    public static int getInt(JSONObject obj, String field) throws JSONException {
         if (obj == null || StringUtils.isEmpty(field)) {
             return 0;
         }
-
+        Object object = obj.get(field);
         try {
-            return (int) obj.get(field);
-        } catch (JSONException e) {
-            e.printStackTrace();
-            return 0;
+            return object instanceof Number ? ((Number) object).intValue()
+                    : Integer.parseInt((String) object);
+        } catch (Exception e) {
+            throw new JSONException("JSONObject[" + field
+                    + "] is not an int.");
         }
     }