You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2017/10/12 20:27:05 UTC
git commit: updated refs/heads/trunk to 2e7ce47
Repository: giraph
Updated Branches:
refs/heads/trunk 83d06d95d -> 2e7ce47df
GIRAPH-1163
closes #52
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/2e7ce47d
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/2e7ce47d
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/2e7ce47d
Branch: refs/heads/trunk
Commit: 2e7ce47dfc59e772a9fcc8577bbc6b14f9311bf3
Parents: 83d06d9
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Oct 12 13:26:18 2017 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Thu Oct 12 13:26:18 2017 -0700
----------------------------------------------------------------------
.../org/apache/giraph/graph/GraphMapper.java | 4 +++-
.../apache/giraph/graph/GraphTaskManager.java | 6 +++--
.../graph/JobProgressTrackerClientNoOp.java | 2 +-
.../RetryableJobProgressTrackerClient.java | 5 ++--
.../job/DefaultJobProgressTrackerService.java | 3 ++-
.../apache/giraph/job/JobProgressTracker.java | 10 +++++---
.../writable/kryo/KryoWritableWrapper.java | 25 ++++++++++++++++++++
7 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index aa4ce7b..86c711c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -19,6 +19,7 @@
package org.apache.giraph.graph;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -93,11 +94,12 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
// CHECKSTYLE: stop IllegalCatch
} catch (RuntimeException e) {
// CHECKSTYLE: resume IllegalCatch
+ byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
graphTaskManager.getJobProgressTracker().logError(
"Exception occurred on mapper " +
graphTaskManager.getConf().getTaskPartition() + ": " +
- ExceptionUtils.getStackTrace(e));
+ ExceptionUtils.getStackTrace(e), exByteArray);
graphTaskManager.zooKeeperCleanup();
graphTaskManager.workerFailureCleanup();
throw new IllegalStateException(
http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 1967b44..b0659bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -65,6 +65,7 @@ import org.apache.giraph.worker.InputSplitsCallable;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.giraph.worker.WorkerProgress;
+import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -1115,8 +1116,9 @@ end[PURE_YARN]*/
LOG.fatal(
"uncaughtException: OverrideExceptionHandler on thread " +
t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
- jobProgressTracker.logError(ExceptionUtils.getStackTrace(e));
-
+ byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
+ jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
+ exByteArray);
zooKeeperCleanup();
workerFailureCleanup();
} finally {
http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
index e699bfb..6f1258d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java
@@ -39,7 +39,7 @@ public class JobProgressTrackerClientNoOp implements JobProgressTrackerClient {
}
@Override
- public void logError(String logLine) {
+ public void logError(String logLine, byte [] exByteArray) {
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index a7ac055..f51d765 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -124,11 +124,12 @@ public class RetryableJobProgressTrackerClient
}
@Override
- public synchronized void logError(final String logLine) {
+ public synchronized void logError(final String logLine,
+ final byte [] exByteArray) {
executeWithRetry(new Runnable() {
@Override
public void run() {
- jobProgressTracker.logError(logLine);
+ jobProgressTracker.logError(logLine, exByteArray);
}
});
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index bb9390e..d7d03d2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -224,7 +224,8 @@ public class DefaultJobProgressTrackerService
}
@Override
- public void logError(String logLine) {
+ public void
+ logError(String logLine, byte [] exByteArray) {
LOG.error(logLine);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
index 92e35b8..a1ad44d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java
@@ -43,13 +43,17 @@ public interface JobProgressTracker {
void logInfo(String logLine);
/**
- * Call this when you want to log an error line from any mapper to command
- * line
+ * Call this when you want to log an error line and exception
+ * object from any mapper to command line
+ *
+ * KryoWritableWrapper.convertFromByteArray can be used to
+ * get exception object back
*
* @param logLine Line to log
+ * @param exByteArray Exception byte array
*/
@ThriftMethod
- void logError(String logLine);
+ void logError(String logLine, byte [] exByteArray);
/**
* Notify that job is failing
http://git-wip-us.apache.org/repos/asf/giraph/blob/2e7ce47d/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
index f17955b..d80a9a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
@@ -120,4 +120,29 @@ public class KryoWritableWrapper<T> implements Writable {
public static <T> T wrapAndCopy(T object) {
return WritableUtils.createCopy(new KryoWritableWrapper<>(object)).get();
}
+
+ /**
+ * Converting the object to byte array.
+ * @param object Object
+ * @param <T> Type
+ * @return byte array
+ */
+ public static <T> byte [] convertToByteArray(T object) {
+ KryoWritableWrapper<T> wrapper =
+ new KryoWritableWrapper<>(object);
+ return WritableUtils.toByteArray(wrapper);
+ }
+
+ /**
+ * Converting from byte array
+ * @param arr byte array
+ * @param <T> type
+ * @return original object
+ */
+ public static <T> T convertFromByteArray(byte [] arr) {
+ KryoWritableWrapper<T> wrapper =
+ new KryoWritableWrapper<>();
+ WritableUtils.fromByteArray(arr, wrapper);
+ return wrapper.get();
+ }
}