You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/07/05 18:14:06 UTC

git commit: updated refs/heads/trunk to 819f293

Repository: giraph
Updated Branches:
  refs/heads/trunk faf339206 -> 819f293f4


GIRAPH-1083: Make sure we fail after exception in ooc-io thread happens

Summary: Currently if some exception happens in ooc-io thread the job is left running for long time after the exception. We should make sure we fail early.

Test Plan: Ran a job with ooc on where I simulated the failure, without change job hangs for a long time, with the change it fails right after the exception happens, and logs it to command line.

Differential Revision: https://reviews.facebook.net/D60291


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/819f293f
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/819f293f
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/819f293f

Branch: refs/heads/trunk
Commit: 819f293f4c780fc6833785da27e10f965570f44e
Parents: faf3392
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Jul 1 13:26:50 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Jul 1 13:28:02 2016 -0700

----------------------------------------------------------------------
 .../apache/giraph/graph/GraphTaskManager.java   | 12 +++++--
 .../org/apache/giraph/ooc/OutOfCoreEngine.java  | 19 ++--------
 .../apache/giraph/ooc/OutOfCoreIOCallable.java  |  5 ++-
 .../giraph/ooc/OutOfCoreIOCallableFactory.java  | 38 +++++---------------
 .../giraph/utils/LogStacktraceCallable.java     | 21 ++++++++++-
 .../org/apache/giraph/utils/ThreadUtils.java    | 24 +++++++++++++
 6 files changed, 67 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index a1d8522..4d97e5f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
 
 import com.sun.management.GarbageCollectionNotificationInfo;
 import com.yammer.metrics.core.Counter;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceMaster;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -1043,7 +1045,7 @@ end[PURE_YARN]*/
   public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
     return new OverrideExceptionHandler(
         CHECKER_IF_WORKER_SHOULD_FAIL_AFTER_EXCEPTION_CLASS.newInstance(
-            getConf()));
+            getConf()), getJobProgressTracker());
   }
 
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
@@ -1079,16 +1081,21 @@ end[PURE_YARN]*/
   class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
     /** Checker if worker should fail after a thread gets an exception */
     private final CheckerIfWorkerShouldFailAfterException checker;
+    /** JobProgressTracker to log problems to */
+    private final JobProgressTracker jobProgressTracker;
 
     /**
      * Constructor
      *
      * @param checker Checker if worker should fail after a thread gets an
      *                exception
+     * @param jobProgressTracker JobProgressTracker to log problems to
      */
     public OverrideExceptionHandler(
-        CheckerIfWorkerShouldFailAfterException checker) {
+        CheckerIfWorkerShouldFailAfterException checker,
+        JobProgressTracker jobProgressTracker) {
       this.checker = checker;
+      this.jobProgressTracker = jobProgressTracker;
     }
 
     @Override
@@ -1100,6 +1107,7 @@ end[PURE_YARN]*/
         LOG.fatal(
             "uncaughtException: OverrideExceptionHandler on thread " +
                 t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
+        jobProgressTracker.logError(ExceptionUtils.getStackTrace(e));
 
         zooKeeperCleanup();
         workerFailureCleanup();

http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
index 3187468..d5bfd4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreEngine.java
@@ -77,11 +77,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   private final MetaPartitionManager metaPartitionManager;
   /** Out-of-core oracle (brain of out-of-core mechanism) */
   private final OutOfCoreOracle oracle;
-  /**
-   * Whether the job should fail due to IO threads terminating because of
-   * exceptions
-   */
-  private volatile boolean jobFailed = false;
   /** IO statistics collector */
   private final OutOfCoreIOStatistics statistics;
   /**
@@ -167,7 +162,8 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
     }
     int numIOThreads = dataAccessor.getNumAccessorThreads();
     this.oocIOCallableFactory =
-        new OutOfCoreIOCallableFactory(this, numIOThreads);
+        new OutOfCoreIOCallableFactory(this, numIOThreads,
+            service.getGraphTaskManager().createUncaughtExceptionHandler());
     this.ioScheduler = new OutOfCoreIOScheduler(conf, this, numIOThreads);
     this.metaPartitionManager = new MetaPartitionManager(numIOThreads, this);
     this.statistics = new OutOfCoreIOStatistics(conf, numIOThreads);
@@ -307,10 +303,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
               "InterruptedException while waiting to retrieve a partition to " +
               "process");
         }
-        if (jobFailed) {
-          throw new RuntimeException("Job Failed due to a failure in an " +
-              "out-of-core IO thread!");
-        }
       }
       if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
         partitionAvailable.notifyAll();
@@ -410,13 +402,6 @@ public class OutOfCoreEngine implements ResetSuperstepMetricsObserver {
   }
 
   /**
-   * Set a flag to fail the job.
-   */
-  public void failTheJob() {
-    jobFailed = true;
-  }
-
-  /**
    * Update the fraction of processing threads that should remain active. It is
    * the responsibility of out-of-core oracle to update the number of active
    * threads.

http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
index bea3994..829ad80 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallable.java
@@ -112,9 +112,8 @@ public class OutOfCoreIOCallable implements Callable<Void>,
                           1000 / 1024 / 1024))));
         }
       } catch (Exception e) {
-        oocEngine.failTheJob();
-        LOG.error("call: execution of IO command " + command + " failed!");
-        throw new RuntimeException(e);
+        throw new RuntimeException(
+            "call: execution of IO command " + command + " failed!", e);
       }
       // CHECKSTYLE: resume IllegalCatch
       if (!(command instanceof WaitIOCommand)) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
index 6aeb196..b8b730e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/OutOfCoreIOCallableFactory.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.ooc;
 
 import org.apache.giraph.utils.CallableFactory;
-import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.ThreadUtils;
 import org.apache.log4j.Logger;
 
@@ -46,20 +45,24 @@ public class OutOfCoreIOCallableFactory {
   private final List<Future> results;
   /** Number of threads used for IO operations */
   private final int numIOThreads;
+  /** Thread UncaughtExceptionHandler to use */
+  private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
   /** Executor service for IO threads */
   private ExecutorService outOfCoreIOExecutor;
 
   /**
    * Constructor
-   *
    * @param oocEngine Out-of-core engine
    * @param numIOThreads Number of IO threads used
+   * @param uncaughtExceptionHandler Thread UncaughtExceptionHandler to use
    */
   public OutOfCoreIOCallableFactory(OutOfCoreEngine oocEngine,
-                                    int numIOThreads) {
+      int numIOThreads,
+      Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
     this.oocEngine = oocEngine;
     this.numIOThreads = numIOThreads;
     this.results = new ArrayList<>(numIOThreads);
+    this.uncaughtExceptionHandler = uncaughtExceptionHandler;
   }
 
   /**
@@ -75,34 +78,11 @@ public class OutOfCoreIOCallableFactory {
       };
     outOfCoreIOExecutor = new ThreadPoolExecutor(numIOThreads, numIOThreads, 0L,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
-        ThreadUtils.createThreadFactory("ooc-io-%d")) {
-      @Override
-      protected void afterExecute(Runnable r, Throwable t) {
-        super.afterExecute(r, t);
-        if (t == null && r instanceof Future<?>) {
-          try {
-            Future<?> future = (Future<?>) r;
-            if (future.isDone()) {
-              future.get();
-            }
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          } catch (ExecutionException e) {
-            t = e;
-          }
-          if (t != null) {
-            LOG.info("afterExecute: an out-of-core thread terminated " +
-                "unexpectedly with " + t);
-            oocEngine.failTheJob();
-          }
-        }
-      }
-    };
+        ThreadUtils.createThreadFactory("ooc-io-%d"));
 
     for (int i = 0; i < numIOThreads; ++i) {
-      Future<Void> future = outOfCoreIOExecutor.submit(
-          new LogStacktraceCallable<>(
-              outOfCoreIOCallableFactory.newCallable(i)));
+      Future<Void> future = ThreadUtils.submitToExecutor(outOfCoreIOExecutor,
+          outOfCoreIOCallableFactory.newCallable(i), uncaughtExceptionHandler);
       results.add(future);
     }
     // Notify executor to not accept any more tasks

http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
index 730825d..3b659aa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
@@ -34,7 +34,9 @@ public class LogStacktraceCallable<V> implements Callable<V> {
       Logger.getLogger(LogStacktraceCallable.class);
 
   /** Pass call() to this callable. */
-  private Callable<V> callable;
+  private final Callable<V> callable;
+  /** Uncaught exception handler, if any */
+  private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
 
   /**
    * Construct an instance that will pass call() to the given callable.
@@ -42,7 +44,21 @@ public class LogStacktraceCallable<V> implements Callable<V> {
    * @param callable Callable
    */
   public LogStacktraceCallable(Callable<V> callable) {
+    this(callable, null);
+  }
+
+  /**
+   * Construct an instance that will pass call() to the given callable.
+   *
+   * @param callable Callable
+   * @param uncaughtExceptionHandler Uncaught exception handler, if any
+   *
+   *
+   */
+  public LogStacktraceCallable(Callable<V> callable,
+      Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
     this.callable = callable;
+    this.uncaughtExceptionHandler = uncaughtExceptionHandler;
   }
 
   @Override
@@ -55,6 +71,9 @@ public class LogStacktraceCallable<V> implements Callable<V> {
     } catch (Exception e) {
       // CHECKSTYLE: resume IllegalCatch
       LOG.error("Execution of callable failed", e);
+      if (uncaughtExceptionHandler != null) {
+        uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
+      }
       throw e;
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/819f293f/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
index 9518bdc..83eca14 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -19,6 +19,9 @@ package org.apache.giraph.utils;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 
 /**
@@ -62,4 +65,25 @@ public class ThreadUtils {
   public static ThreadFactory createThreadFactory(String nameFormat) {
     return createThreadFactory(nameFormat, null);
   }
+
+  /**
+   * Submit a callable to executor service, ensuring any exceptions are
+   * caught with provided exception handler.
+   *
+   * When using submit(), UncaughtExceptionHandler which is set on ThreadFactory
+   * isn't used, so we need this utility.
+   *
+   * @param executorService Executor service to submit callable to
+   * @param callable Callable to submit
+   * @param uncaughtExceptionHandler Handler for uncaught exceptions in callable
+   * @param <T> Type of callable result
+   * @return Future for callable
+   */
+  public static <T> Future<T> submitToExecutor(
+      ExecutorService executorService,
+      Callable<T> callable,
+      Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
+    return executorService.submit(
+        new LogStacktraceCallable<>(callable, uncaughtExceptionHandler));
+  }
 }