You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/09/07 17:42:20 UTC

[24/52] [abbrv] incubator-eagle git commit: [EAGLE-504] Fix JSONUtils to avoid null pointer exceptions when getting json values.

[EAGLE-504] Fix JSONUtils to avoid null pointer exceptions when getting json values.

Author: pkuwm <ih...@gmail.com>

Closes #396 from pkuwm/EAGLE-504.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3cc18301
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3cc18301
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3cc18301

Branch: refs/heads/master
Commit: 3cc183017da19889cff6bb2c57ecfe41dfbc71e8
Parents: 4f4fd0c
Author: pkuwm <ih...@gmail.com>
Authored: Mon Aug 29 13:57:49 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Mon Aug 29 13:57:49 2016 +0800

----------------------------------------------------------------------
 .../jpm/spark/crawl/JHFSparkEventReader.java    | 165 +++++++++----------
 .../eagle/jpm/spark/crawl/JHFSparkParser.java   |  10 +-
 eagle-jpm/eagle-jpm-util/pom.xml                |   4 +
 .../org/apache/eagle/jpm/util/JSONUtil.java     |  66 --------
 .../org/apache/eagle/jpm/util/JSONUtils.java    | 108 ++++++++++++
 5 files changed, 197 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3cc18301/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
index 1cd5a77..fe02da5 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -149,17 +149,17 @@ public class JHFSparkEventReader {
         entities.addAll(this.executors.values());
         entities.add(this.app);
 
-        long appStartTime = JSONUtil.getLong(event, "Timestamp");
+        long appStartTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
         for (TaggedLogAPIEntity entity : entities) {
-            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtil.getString(event, "App ID"));
-            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtil.getString(event, "App Name"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), JSONUtils.getString(event, "App ID"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), JSONUtils.getString(event, "App Name"));
             // In yarn-client mode, attemptId is not available in the log, so we set attemptId = 1.
-            String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtil.getString(event, "App Attempt ID");
+            String attemptId = isClientMode(this.app.getConfig()) ? "1" : JSONUtils.getString(event, "App Attempt ID");
             entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), attemptId);
             // the second argument of getNormalizeName() is changed to null because the original code contains sensitive text
             // original second argument looks like: this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
-            entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtil.getString(event, "App Name"), null));
-            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtil.getString(event, "User"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
+            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), JSONUtils.getString(event, "User"));
 
             entity.setTimestamp(appStartTime);
         }
@@ -170,25 +170,25 @@ public class JHFSparkEventReader {
 
     private void handleExecutorAdd(JSONObject event) throws Exception {
         String executorID = (String) event.get("Executor ID");
-        long executorAddTime = JSONUtil.getLong(event, "Timestamp");
+        long executorAddTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
         this.lastEventTime = executorAddTime;
         SparkExecutor executor = this.initiateExecutor(executorID, executorAddTime);
 
-        JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor Info");
+        JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor Info");
 
     }
 
     private void handleBlockManagerAdd(JSONObject event) throws Exception {
-        long maxMemory = JSONUtil.getLong(event, "Maximum Memory");
-        long timestamp = JSONUtil.getLong(event, "Timestamp");
+        long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
+        long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
         this.lastEventTime = timestamp;
-        JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager ID");
-        String executorID = JSONUtil.getString(blockInfo, "Executor ID");
-        String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, "Host"), JSONUtil.getLong(blockInfo, "Port"));
+        JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager ID");
+        String executorID = JSONUtils.getString(blockInfo, "Executor ID");
+        String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + JSONUtils.getLong(blockInfo, "Port");
 
         SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
         executor.setMaxMemory(maxMemory);
-        executor.setHostPort(hostport);
+        executor.setHostPort(hostAndPort);
     }
 
     private void handleTaskStart(JSONObject event) {
@@ -196,47 +196,47 @@ public class JHFSparkEventReader {
     }
 
     private void handleTaskEnd(JSONObject event) {
-        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
-        int taskId = JSONUtil.getInt(taskInfo, "Task ID");
+        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+        int taskId = JSONUtils.getInt(taskInfo, "Task ID");
         SparkTask task = tasks.get(taskId);
         if (task == null) {
             return;
         }
 
-        task.setFailed(JSONUtil.getBoolean(taskInfo, "Failed"));
-        JSONObject taskMetrics = JSONUtil.getJSONObject(event, "Task Metrics");
+        task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
+        JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task Metrics");
         if (null != taskMetrics) {
-            task.setExecutorDeserializeTime(JSONUtil.getLong(taskMetrics, "Executor Deserialize Time"));
-            task.setExecutorRunTime(JSONUtil.getLong(taskMetrics, "Executor Run Time"));
-            task.setJvmGcTime(JSONUtil.getLong(taskMetrics, "JVM GC Time"));
-            task.setResultSize(JSONUtil.getLong(taskMetrics, "Result Size"));
-            task.setResultSerializationTime(JSONUtil.getLong(taskMetrics, "Result Serialization Time"));
-            task.setMemoryBytesSpilled(JSONUtil.getLong(taskMetrics, "Memory Bytes Spilled"));
-            task.setDiskBytesSpilled(JSONUtil.getLong(taskMetrics, "Disk Bytes Spilled"));
-
-            JSONObject inputMetrics = JSONUtil.getJSONObject(taskMetrics, "Input Metrics");
+            task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, "Executor Deserialize Time", lastEventTime));
+            task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor Run Time", lastEventTime));
+            task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", lastEventTime));
+            task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
+            task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, "Result Serialization Time", lastEventTime));
+            task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory Bytes Spilled"));
+            task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk Bytes Spilled"));
+
+            JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, "Input Metrics");
             if (null != inputMetrics) {
-                task.setInputBytes(JSONUtil.getLong(inputMetrics, "Bytes Read"));
-                task.setInputRecords(JSONUtil.getLong(inputMetrics, "Records Read"));
+                task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes Read"));
+                task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records Read"));
             }
 
-            JSONObject outputMetrics = JSONUtil.getJSONObject(taskMetrics, "Output Metrics");
+            JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, "Output Metrics");
             if (null != outputMetrics) {
-                task.setOutputBytes(JSONUtil.getLong(outputMetrics, "Bytes Written"));
-                task.setOutputRecords(JSONUtil.getLong(outputMetrics, "Records Written"));
+                task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes Written"));
+                task.setOutputRecords(JSONUtils.getLong(outputMetrics, "Records Written"));
             }
 
-            JSONObject shuffleWriteMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Write Metrics");
+            JSONObject shuffleWriteMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
             if (null != shuffleWriteMetrics) {
-                task.setShuffleWriteBytes(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
-                task.setShuffleWriteRecords(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
+                task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes Written"));
+                task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Records Written"));
             }
 
-            JSONObject shuffleReadMetrics = JSONUtil.getJSONObject(taskMetrics, "Shuffle Read Metrics");
+            JSONObject shuffleReadMetrics = JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
             if (null != shuffleReadMetrics) {
-                task.setShuffleReadLocalBytes(JSONUtil.getLong(shuffleReadMetrics, "Local Bytes Read"));
-                task.setShuffleReadRemoteBytes(JSONUtil.getLong(shuffleReadMetrics, "Remote Bytes Read"));
-                task.setShuffleReadRecords(JSONUtil.getLong(shuffleReadMetrics, "Total Records Read"));
+                task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local Bytes Read"));
+                task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote Bytes Read"));
+                task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records Read"));
             }
         } else {
             //for tasks success without task metrics, save in the end if no other information
@@ -257,25 +257,25 @@ public class JHFSparkEventReader {
         task.setTags(new HashMap<>(this.app.getTags()));
         task.setTimestamp(app.getTimestamp());
 
-        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtil.getLong(event, "Stage ID")));
-        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtil.getLong(event, "Stage Attempt ID")));
+        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage ID")));
+        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
 
-        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
-        int taskId = JSONUtil.getInt(taskInfo, "Task ID");
+        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+        int taskId = JSONUtils.getInt(taskInfo, "Task ID");
         task.setTaskId(taskId);
 
-        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Integer.toString(JSONUtil.getInt(taskInfo, "Index")));
-        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtil.getInt(taskInfo, "Attempt")));
-        long launchTime = JSONUtil.getLong(taskInfo, "Launch Time");
+        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Index")));
+        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
+        long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", lastEventTime);
         this.lastEventTime = launchTime;
         if (taskId == 0) {
             this.setFirstTaskLaunchTime(launchTime);
         }
         task.setLaunchTime(launchTime);
-        task.setExecutorId(JSONUtil.getString(taskInfo, "Executor ID"));
-        task.setHost(JSONUtil.getString(taskInfo, "Host"));
-        task.setTaskLocality(JSONUtil.getString(taskInfo, "Locality"));
-        task.setSpeculative(JSONUtil.getBoolean(taskInfo, "Speculative"));
+        task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
+        task.setHost(JSONUtils.getString(taskInfo, "Host"));
+        task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
+        task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
 
         tasks.put(task.getTaskId(), task);
         return task;
@@ -285,18 +285,14 @@ public class JHFSparkEventReader {
         this.firstTaskLaunchTime = launchTime;
     }
 
-    private long getFirstTaskLaunchTime() {
-        return this.firstTaskLaunchTime;
-    }
-
     private void handleJobStart(JSONObject event) {
         SparkJob job = new SparkJob();
         job.setTags(new HashMap<>(this.app.getTags()));
         job.setTimestamp(app.getTimestamp());
 
-        int jobId = JSONUtil.getInt(event, "Job ID");
+        int jobId = JSONUtils.getInt(event, "Job ID");
         job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), Integer.toString(jobId));
-        long submissionTime = JSONUtil.getLong(event, "Submission Time");
+        long submissionTime = JSONUtils.getLong(event, "Submission Time", lastEventTime);
         job.setSubmissionTime(submissionTime);
         this.lastEventTime = submissionTime;
 
@@ -307,22 +303,23 @@ public class JHFSparkEventReader {
         this.jobs.put(jobId, job);
         this.jobStageMap.put(jobId, new HashSet<String>());
 
-        JSONArray stages = JSONUtil.getJSONArray(event, "Stage Infos");
-        job.setNumStages(stages.size());
-        for (int i = 0; i < stages.size(); i++) {
+        JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
+        int stagesSize = (stages == null ? 0 : stages.size());
+        job.setNumStages(stagesSize);
+        for (int i = 0; i < stagesSize; i++) {
             JSONObject stageInfo = (JSONObject) stages.get(i);
-            int stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-            int stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
-            String stageName = JSONUtil.getString(stageInfo, "Stage Name");
-            int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
+            int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+            int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+            String stageName = JSONUtils.getString(stageInfo, "Stage Name");
+            int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
             this.initiateStage(jobId, stageId, stageAttemptId, stageName, numTasks);
         }
     }
 
     private void handleStageSubmit(JSONObject event) {
-        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
-        int stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-        int stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
         String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
         stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
 
@@ -333,28 +330,26 @@ public class JHFSparkEventReader {
                 SparkStage stage = stages.get(baseAttempt);
                 String jobId = stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
 
-                String stageName = JSONUtil.getString(event, "Stage Name");
-                int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
+                String stageName = JSONUtils.getString(event, "Stage Name");
+                int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
                 this.initiateStage(Integer.parseInt(jobId), stageId, stageAttemptId, stageName, numTasks);
             }
         }
     }
 
     private void handleStageComplete(JSONObject event) {
-        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
-        int stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-        int stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt ID");
+        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
         String key = this.generateStageKey(Integer.toString(stageId), Integer.toString(stageAttemptId));
         SparkStage stage = stages.get(key);
 
         // If "Submission Time" is not available, use the "Launch Time" of "Task ID" = 0.
-        Long submissionTime = JSONUtil.getLong(stageInfo, "Submission Time");
-        if (submissionTime == null) {
-            submissionTime = this.getFirstTaskLaunchTime();
-        }
+        Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", firstTaskLaunchTime);
+
         stage.setSubmitTime(submissionTime);
 
-        long completeTime = JSONUtil.getLong(stageInfo, "Completion Time");
+        long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", lastEventTime);
         stage.setCompleteTime(completeTime);
         this.lastEventTime = completeTime;
 
@@ -366,23 +361,23 @@ public class JHFSparkEventReader {
     }
 
     private void handleExecutorRemoved(JSONObject event) {
-        String executorID = JSONUtil.getString(event, "Executor ID");
+        String executorID = JSONUtils.getString(event, "Executor ID");
         SparkExecutor executor = executors.get(executorID);
-        long removedTime = JSONUtil.getLong(event, "Timestamp");
+        long removedTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
         executor.setEndTime(removedTime);
         this.lastEventTime = removedTime;
     }
 
     private void handleJobEnd(JSONObject event) {
-        int jobId = JSONUtil.getInt(event, "Job ID");
+        int jobId = JSONUtils.getInt(event, "Job ID");
         SparkJob job = jobs.get(jobId);
 
-        long completionTime = JSONUtil.getLong(event, "Completion Time");
+        long completionTime = JSONUtils.getLong(event, "Completion Time", lastEventTime);
         job.setCompletionTime(completionTime);
         this.lastEventTime = completionTime;
 
-        JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result");
-        String result = JSONUtil.getString(jobResult, "Result");
+        JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
+        String result = JSONUtils.getString(jobResult, "Result");
         if (result.equalsIgnoreCase("JobSucceeded")) {
             job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
         } else {
@@ -391,7 +386,7 @@ public class JHFSparkEventReader {
     }
 
     private void handleAppEnd(JSONObject event) {
-        long endTime = JSONUtil.getLong(event, "Timestamp");
+        long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
         app.setEndTime(endTime);
         this.lastEventTime = endTime;
     }
@@ -634,7 +629,7 @@ public class JHFSparkEventReader {
 
 
     private String generateStageKey(String stageId, String stageAttemptId) {
-        return String.format("%s-%s", stageId, stageAttemptId);
+        return stageId + "-" + stageAttemptId;
     }
 
     private void initiateStage(int jobId, int stageId, int stageAttemptId, String name, int numTasks) {
@@ -679,7 +674,7 @@ public class JHFSparkEventReader {
     }
 
     private void flushEntities(Object entity, boolean forceFlush) {
-        this.flushEntities(Arrays.asList(entity), forceFlush);
+        this.flushEntities(Collections.singletonList(entity), forceFlush);
     }
 
     private void flushEntities(Collection entities, boolean forceFlush) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3cc18301/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
index 05cdd7e..2ba3c73 100644
--- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
+++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
@@ -45,13 +45,13 @@ public class JHFSparkParser implements JHFParserBase {
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
             for (String line = reader.readLine(); line != null; line = reader.readLine()) {
                 isValidJson = true;
-                try {
-                    JSONObject eventObj = parseAndValidateJSON(line);
-                    if (isValidJson) {
+                JSONObject eventObj = parseAndValidateJSON(line);
+                if (isValidJson) {
+                    try {
                         this.eventReader.read(eventObj);
+                    } catch (Exception e) {
+                        logger.error("Fail to read eventObj. Exception: " + e);
                     }
-                } catch(Exception e) {
-                    logger.error(String.format("Fail to parse %s.", line), e);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3cc18301/eagle-jpm/eagle-jpm-util/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml
index 8868dbe..e424e49 100644
--- a/eagle-jpm/eagle-jpm-util/pom.xml
+++ b/eagle-jpm/eagle-jpm-util/pom.xml
@@ -37,6 +37,10 @@
             <version>1.1.1</version>
         </dependency>
         <dependency>
+            <groupId>org.json</groupId>
+            <artifactId>json</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${storm.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3cc18301/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
deleted file mode 100644
index 9804a3b..0000000
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.util;
-
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-
-public class JSONUtil {
-
-    public static String getString(JSONObject obj, String field) {
-        if (obj.containsKey(field)) {
-            return (String) obj.get(field);
-        }
-        return null;
-    }
-
-    public static Integer getInt(JSONObject obj, String field) {
-        if (obj.containsKey(field)) {
-            return ((Long) obj.get(field)).intValue();
-        }
-        return null;
-    }
-
-    public static Long getLong(JSONObject obj, String field) {
-        if (obj.containsKey(field)) {
-            return (Long) obj.get(field);
-        }
-        return null;
-    }
-
-    public static Boolean getBoolean(JSONObject obj, String field) {
-        if (obj.containsKey(field)) {
-            return (Boolean) obj.get(field);
-        }
-        return null;
-    }
-
-    public static JSONObject getJSONObject(JSONObject obj, String field) {
-        if (obj.containsKey(field)) {
-            return (JSONObject) obj.get(field);
-        }
-        return null;
-    }
-
-    public static JSONArray getJSONArray(JSONObject obj, String field) {
-        if (obj.containsKey(field)) {
-            return (JSONArray) obj.get(field);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3cc18301/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
new file mode 100644
index 0000000..38500b0
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.json.JSONException;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class JSONUtils {
+
+    public static String getString(JSONObject obj, String field) {
+        if (obj == null || StringUtils.isEmpty(field)) {
+            return null;
+        }
+
+        try {
+            return obj.get(field).toString();
+        } catch (JSONException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    public static int getInt(JSONObject obj, String field) {
+        if (obj == null || StringUtils.isEmpty(field)) {
+            return 0;
+        }
+
+        try {
+            return (int) obj.get(field);
+        } catch (JSONException e) {
+            e.printStackTrace();
+            return 0;
+        }
+    }
+
+    public static long getLong(JSONObject obj, String field) {
+        return getLong(obj, field, 0L);
+    }
+
+    public static long getLong(JSONObject obj, String field, long defaultValue) {
+        if (obj == null || StringUtils.isEmpty(field)) {
+            return defaultValue;
+        }
+
+        try {
+            return (long) obj.get(field);
+        } catch (JSONException e) {
+            e.printStackTrace();
+            return defaultValue;
+        }
+    }
+
+    public static Boolean getBoolean(JSONObject obj, String field) {
+        if (obj == null || StringUtils.isEmpty(field)) {
+            return false;
+        }
+
+        try {
+            return (Boolean) obj.get(field);
+        } catch (JSONException e) {
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    public static JSONObject getJSONObject(JSONObject obj, String field) {
+        if (obj == null || StringUtils.isEmpty(field)) {
+            return null;
+        }
+
+        try {
+            return (JSONObject) obj.get(field);
+        } catch (JSONException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+    public static JSONArray getJSONArray(JSONObject obj, String field) {
+        if (obj == null || StringUtils.isEmpty(field)) {
+            return null;
+        }
+
+        try {
+            return (JSONArray) obj.get(field);
+        } catch (JSONException e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+}