You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/07/05 18:14:06 UTC
git commit: updated refs/heads/trunk to 819f293
Repository: giraph
Updated Branches:
refs/heads/trunk faf339206 -> 819f293f4
GIRAPH-1083: Make sure we fail after exception in ooc-io thread happens
Summary: Currently if some exception happens in ooc-io thread the job is left running for long time after the exception. We should make sure we fail early.
Test Plan: Ran a job with ooc on where I simulated the failure, without change job hangs for a long time, with the change it fails right after the exception happens, and logs it to command line.
Differential Revision: https://reviews.facebook.net/D60291
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/819f293f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/819f293f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/819f293f
Branch: refs/heads/trunk
Commit: 819f293f4c780fc6833785da27e10f965570f44e
Parents: faf3392
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Jul 1 13:26:50 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Jul 1 13:28:02 2016 -0700
----------------------------------------------------------------------
.../apache/giraph/graph/GraphTaskManager.java | 12 +++++--
.../org/apache/giraph/ooc/OutOfCoreEngine.java | 19 ++--------
.../apache/giraph/ooc/OutOfCoreIOCallable.java | 5 ++-
.../giraph/ooc/OutOfCoreIOCallableFactory.java | 38 +++++---------------
.../giraph/utils/LogStacktraceCallable.java | 21 ++++++++++-
.../org/apache/giraph/utils/ThreadUtils.java | 24 +++++++++++++
6 files changed, 67 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index a1d8522..4d97e5f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
import com.sun.management.GarbageCollectionNotificationInfo;
import com.yammer.metrics.core.Counter;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -1043,7 +1045,7 @@ end[PURE_YARN]*/
public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
return new OverrideExceptionHandler(
CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
- getConf()));
+ getConf()), getJobProgressTracker());
}
public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
@@ -1079,16 +1081,21 @@ end[PURE_YARN]*/
class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
/** Checker if worker should fail after a thread gets an exception */
private final CheckerIfWorkerShouldFailAfterException checker;
+ /** JobProgressTracker to log problems to */
+ private final JobProgressTracker jobProgressTracker;
/**
* Constructor
*
* @param checker Checker if worker should fail after a thread gets an
* exception
+ * @param jobProgressTracker JobProgressTracker to log problems to
*/
public OverrideExceptionHandler(
- CheckerIfWorkerShouldFailAfterException checker) {
+ CheckerIfWorkerShouldFailAfterException checker,
+ JobProgressTracker jobProgressTracker) {
this.checker = checker;
+ this.jobProgressTracker = jobProgressTracker;
}
@Override
@@ -1100,6 +1107,7 @@ end[PURE_YARN]*/
LOG.fatal(
"uncaughtException: OverrideExceptionHandler on thread " +
t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
+ jobProgressTracker.logError(ExceptionUtils.getStackTrace(e));
zooKeeperCleanup();
workerFailureCleanup();
http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index 3187468..d5bfd4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -77,11 +77,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
private final MetaPartitionManager metaPartitionManager;
/** Out-of-core oracle (brain of out-of-core mechanism) */
private final OutOfCoreOracle oracle;
- /**
- * Whether the job should fail due to IO threads terminating because of
- * exceptions
- */
- private volatile boolean jobFailed = false;
/** IO statistics collector */
private final OutOfCoreIOStatistics statistics;
/**
@@ -167,7 +162,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
}
int numIOThreads = dataAccessor.getNumAccessorThreads();
this.oocIOCallableFactory =
- new OutOfCoreIOCallableFactory(this, numIOThreads);
+ new OutOfCoreIOCallableFactory(this, numIOThreads,
+ service.getGraphTaskManager().createUncaughtExceptionHandler());
this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
@@ -307,10 +303,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
"InterruptedException while waiting to retrieve a partition to " +
"process");
}
- if (jobFailed) {
- throw new RuntimeException("Job Failed due to a failure in an " +
- "out-of-core IO thread!");
- }
}
if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
partitionAvailable.notifyAll();
@@ -410,13 +402,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
}
/**
- * Set a flag to fail the job.
- */
- public void failTheJob() {
- jobFailed = true;
- }
-
- /**
* Update the fraction of processing threads that should remain active. It is
* the responsibility of out-of-core oracle to update the number of active
* threads.
http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
index bea3994..829ad80 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -112,9 +112,8 @@ public class OutOfCoreIOCallable implements Callable<Void>,
1000 / 1024 / 1024))));
}
} catch (Exception e) {
- oocEngine.failTheJob();
- LOG.error("call: execution of IO command " + command + " failed!");
- throw new RuntimeException(e);
+ throw new RuntimeException(
+ "call: execution of IO command " + command + " failed!", e);
}
// CHECKSTYLE: resume IllegalCatch
if (!(command instanceof WaitIOCommand)) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
index 6aeb196..b8b730e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
@@ -19,7 +19,6 @@
package org.apache.giraph.ooc;
import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;
@@ -46,20 +45,24 @@ public class OutOfCoreIOCallableFactory {
private final List<Future> results;
/** Number of threads used for IO operations */
private final int numIOThreads;
+ /** Thread UncaughtExceptionHandler to use */
+ private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
/** Executor service for IO threads */
private ExecutorService outOfCoreIOExecutor;
/**
* Constructor
- *
* @param oocEngine Out-of-core engine
* @param numIOThreads Number of IO threads used
+ * @param uncaughtExceptionHandler Thread UncaughtExceptionHandler to use
*/
public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine,
- int numIOThreads) {
+ int numIOThreads,
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
this.oocEngine = oocEngine;
this.numIOThreads = numIOThreads;
this.results = new ArrayList<>(numIOThreads);
+ this.uncaughtExceptionHandler = uncaughtExceptionHandler;
}
/**
@@ -75,34 +78,11 @@ public class OutOfCoreIOCallableFactory {
};
outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
- ThreadUtils.createThreadFactory("ooc-io-%d")) {
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- if (t == null && r instanceof Future<?>) {
- try {
- Future<?> future = (Future<?>) r;
- if (future.isDone()) {
- future.get();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- t = e;
- }
- if (t != null) {
- LOG.info("afterExecute: an out-of-core thread terminated " +
- "unexpectedly with " + t);
- oocEngine.failTheJob();
- }
- }
- }
- };
+ ThreadUtils.createThreadFactory("ooc-io-%d"));
for (int i = 0; i < numIOThreads; ++i) {
- Future<Void> future = outOfCoreIOExecutor.submit(
- new LogStacktraceCallable<>(
- outOfCoreIOCallableFactory.newCallable(i)));
+ Future<Void> future = ThreadUtils.submitToExecutor(outOfCoreIOExecutor,
+ outOfCoreIOCallableFactory.newCallable(i), uncaughtExceptionHandler);
results.add(future);
}
// Notify executor to not accept any more tasks
http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
index 730825d..3b659aa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
@@ -34,7 +34,9 @@ public class LogStacktraceCallable<V> implements Callable<V> {
Logger.getLogger(LogStacktraceCallable.class);
/** Pass call() to this callable. */
- private Callable<V> callable;
+ private final Callable<V> callable;
+ /** Uncaught exception handler, if any */
+ private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
/**
* Construct an instance that will pass call() to the given callable.
@@ -42,7 +44,21 @@ public class LogStacktraceCallable<V> implements Callable<V> {
* @param callable Callable
*/
public LogStacktraceCallable(Callable<V> callable) {
+ this(callable, null);
+ }
+
+ /**
+ * Construct an instance that will pass call() to the given callable.
+ *
+ * @param callable Callable
+ * @param uncaughtExceptionHandler Uncaught exception handler, if any
+ *
+ *
+ */
+ public LogStacktraceCallable(Callable<V> callable,
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
this.callable = callable;
+ this.uncaughtExceptionHandler = uncaughtExceptionHandler;
}
@Override
@@ -55,6 +71,9 @@ public class LogStacktraceCallable<V> implements Callable<V> {
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
LOG.error("Execution of callable failed", e);
+ if (uncaughtExceptionHandler != null) {
+ uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
+ }
throw e;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
index 9518bdc..83eca14 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -19,6 +19,9 @@ package org.apache.giraph.utils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
/**
@@ -62,4 +65,25 @@ public class ThreadUtils {
public static ThreadFactory createThreadFactory(String nameFormat) {
return createThreadFactory(nameFormat, null);
}
+
+ /**
+ * Submit a callable to executor service, ensuring any exceptions are
+ * caught with provided exception handler.
+ *
+ * When using submit(), UncaughtExceptionHandler which is set on ThreadFactory
+ * isn't used, so we need this utility.
+ *
+ * @param executorService Executor service to submit callable to
+ * @param callable Callable to submit
+ * @param uncaughtExceptionHandler Handler for uncaught exceptions in callable
+ * @param <T> Type of callable result
+ * @return Future for callable
+ */
+ public static <T> Future<T> submitToExecutor(
+ ExecutorService executorService,
+ Callable<T> callable,
+ Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+ return executorService.submit(
+ new LogStacktraceCallable<>(callable, uncaughtExceptionHandler));
+ }
}