You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2017/10/12 20:27:05 UTC

git commit: updated refs/heads/trunk to 2e7ce47

Repository: giraph
Updated Branches:
  refs/heads/trunk 83d06d95d -> 2e7ce47df


GIRAPH-1163

closes #52


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2e7ce47d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2e7ce47d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2e7ce47d

Branch: refs/heads/trunk
Commit: 2e7ce47dfc59e772a9fcc8577bbc6b14f9311bf3
Parents: 83d06d9
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Oct 12 13:26:18 2017 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Thu Oct 12 13:26:18 2017 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/graph/GraphMapper.java    |  4 +++-
 .../apache/giraph/graph/GraphTaskManager.java   |  6 +++--
 .../graph/JobProgressTrackerClientNoOp.java     |  2 +-
 .../RetryableJobProgressTrackerClient.java      |  5 ++--
 .../job/DefaultJobProgressTrackerService.java   |  3 ++-
 .../apache/giraph/job/JobProgressTracker.java   | 10 +++++---
 .../writable/kryo/KryoWritableWrapper.java      | 25 ++++++++++++++++++++
 7 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index aa4ce7b..86c711c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -19,6 +19,7 @@
 package org.apache.giraph.graph;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -93,11 +94,12 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
       // CHECKSTYLE: stop IllegalCatch
     } catch (RuntimeException e) {
       // CHECKSTYLE: resume IllegalCatch
+      byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
       LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
       graphTaskManager.getJobProgressTracker().logError(
           "Exception occurred on mapper " +
               graphTaskManager.getConf().getTaskPartition() + ": " +
-              ExceptionUtils.getStackTrace(e));
+              ExceptionUtils.getStackTrace(e), exByteArray);
       graphTaskManager.zooKeeperCleanup();
       graphTaskManager.workerFailureCleanup();
       throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 1967b44..b0659bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -65,6 +65,7 @@ import org.apache.giraph.worker.InputSplitsCallable;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.giraph.worker.WorkerProgress;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -1115,8 +1116,9 @@ end[PURE_YARN]*/
         LOG.fatal(
             "uncaughtException: OverrideExceptionHandler on thread " +
                 t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
-        jobProgressTracker.logError(ExceptionUtils.getStackTrace(e));
-
+        byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
+        jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
+                exByteArray);
         zooKeeperCleanup();
         workerFailureCleanup();
       } finally {

http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
index e699bfb..6f1258d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -39,7 +39,7 @@ public class JobProgressTrackerClientNoOp implements JobProgressTrackerClient {
   }
 
   @Override
-  public void logError(String logLine) {
+  public void logError(String logLine, byte [] exByteArray) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index a7ac055..f51d765 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -124,11 +124,12 @@ public class RetryableJobProgressTrackerClient
   }
 
   @Override
-  public synchronized void logError(final String logLine) {
+  public synchronized void logError(final String logLine,
+                                    final byte [] exByteArray) {
     executeWithRetry(new Runnable() {
       @Override
       public void run() {
-        jobProgressTracker.logError(logLine);
+        jobProgressTracker.logError(logLine, exByteArray);
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index bb9390e..d7d03d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -224,7 +224,8 @@ public class DefaultJobProgressTrackerService
   }
 
   @Override
-  public void logError(String logLine) {
+  public void
+  logError(String logLine, byte [] exByteArray) {
     LOG.error(logLine);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index 92e35b8..a1ad44d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -43,13 +43,17 @@ public interface JobProgressTracker {
   void logInfo(String logLine);
 
   /**
-   * Call this when you want to log an error line from any mapper to command
-   * line
+   * Call this when you want to log an error line and exception
+   * object from any mapper to command line
+   *
+   * KryoWritableWrapper.convertFromByteArray can be used to
+   * get exception object back
    *
    * @param logLine Line to log
+   * @param exByteArray Exception byte array
    */
   @ThriftMethod
-  void logError(String logLine);
+  void logError(String logLine, byte [] exByteArray);
 
   /**
    * Notify that job is failing

http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
index f17955b..d80a9a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
@@ -120,4 +120,29 @@ public class KryoWritableWrapper<T> implements Writable {
   public static <T> T wrapAndCopy(T object) {
     return WritableUtils.createCopy(new KryoWritableWrapper<>(object)).get();
   }
+
+  /**
+   * Converting the object to byte array.
+   * @param object Object
+   * @param <T> Type
+   * @return byte array
+   */
+  public static <T> byte [] convertToByteArray(T object) {
+    KryoWritableWrapper<T> wrapper =
+            new KryoWritableWrapper<>(object);
+    return WritableUtils.toByteArray(wrapper);
+  }
+
+  /**
+   * Converting from byte array
+   * @param arr byte array
+   * @param <T> type
+   * @return original object
+   */
+  public static <T> T convertFromByteArray(byte [] arr) {
+    KryoWritableWrapper<T> wrapper =
+            new KryoWritableWrapper<>();
+    WritableUtils.fromByteArray(arr, wrapper);
+    return wrapper.get();
+  }
 }