You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2015/10/22 05:12:57 UTC

git commit: updated refs/heads/trunk to 81d5bad

Repository: giraph
Updated Branches:
  refs/heads/trunk b735f02bd -> 81d5badf7


GIRAPH-1036: Allow mappers to fail early on exceptions

Summary:
Often when something fails in a mapper we see it stuck until its timeout passes. Digging through this issue I found two root causes:
- Many threads we are creating were not daemon, preventing process to exit, only main thread should be daemon
- When calling submit on ExecutorService, exceptions are not propagated back to the caller, unless get is called on the future. In ProgressableUtils.getResultsWithNCallables we were calling get on one by one future, causing us to have to wait for previous futures to finish before getting exception which happened in later one.

Test Plan: Run jobs in which I simulated exceptions on some partitions in loading, compute and storing phases, for each verified we exit quickly with exception clearly shown, and without this change we'd wait for timeout and other threads from same ProgressableUtils.getResultsWithNCallables to finish. Run a normal job successfully. mvn clean verify

Differential Revision: https://reviews.facebook.net/D49143


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/81d5badf
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/81d5badf
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/81d5badf

Branch: refs/heads/trunk
Commit: 81d5badf7b76e9f1efde1cebe2150bee70e4cf58
Parents: b735f02
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Oct 20 18:19:36 2015 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Wed Oct 21 20:06:04 2015 -0700

----------------------------------------------------------------------
 .../queue/AsyncMessageStoreWrapper.java         |  5 +-
 .../giraph/ooc/AdaptiveOutOfCoreEngine.java     | 10 ++--
 .../apache/giraph/utils/JMapHistoDumper.java    |  1 +
 .../apache/giraph/utils/ProgressableUtils.java  | 53 +++++++++++++++-----
 .../giraph/utils/ReactiveJMapHistoDumper.java   |  1 +
 .../org/apache/giraph/utils/ThreadUtils.java    | 13 ++++-
 .../giraph/worker/WorkerProgressWriter.java     |  3 +-
 7 files changed, 64 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/81d5badf/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index e820f26..04afba5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -17,10 +17,10 @@
  */
 package org.apache.giraph.comm.messages.queue;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.utils.VertexIdMessages;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -60,8 +60,7 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
   /** Executor that processes messages in background */
   private static final ExecutorService EXECUTOR_SERVICE =
       Executors.newCachedThreadPool(
-          new ThreadFactoryBuilder().setDaemon(true)
-              .setNameFormat("AsyncMessageStoreWrapper-%d").build());
+          ThreadUtils.createThreadFactory("AsyncMessageStoreWrapper-%d"));
 
   /** Number of threads that will process messages in background */
   private final int threadsCount;

http://git-wip-us.apache.org/repos/asf/giraph/blob/81d5badf/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
index 749e41e..d5b0e20 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/AdaptiveOutOfCoreEngine.java
@@ -19,11 +19,11 @@
 package org.apache.giraph.ooc;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.LogStacktraceCallable;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
@@ -152,7 +152,7 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
         }
       };
     checkMemoryExecutor = Executors.newSingleThreadExecutor(
-        new ThreadFactoryBuilder().setNameFormat("check-memory").build());
+        ThreadUtils.createThreadFactory("check-memory"));
     checkMemoryResult = checkMemoryExecutor.submit(new LogStacktraceCallable<>(
         checkMemoryCallableFactory.newCallable(0)));
 
@@ -164,9 +164,9 @@ public class AdaptiveOutOfCoreEngine<I extends WritableComparable,
               AdaptiveOutOfCoreEngine.this, serviceWorker);
         }
       };
-    outOfCoreProcessorExecutor = Executors
-        .newFixedThreadPool(numOocThreads,
-            new ThreadFactoryBuilder().setNameFormat("ooc-%d").build());
+    outOfCoreProcessorExecutor =
+        Executors.newFixedThreadPool(numOocThreads,
+            ThreadUtils.createThreadFactory("ooc-%d"));
     oocProcessorResults = Lists.newArrayListWithCapacity(numOocThreads);
     for (int i = 0; i < numOocThreads; ++i) {
       Future<Void> future = outOfCoreProcessorExecutor.submit(

http://git-wip-us.apache.org/repos/asf/giraph/blob/81d5badf/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
index f90337f..4282d35 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/JMapHistoDumper.java
@@ -94,6 +94,7 @@ public class JMapHistoDumper implements MasterObserver, WorkerObserver {
         }
       }
     });
+    thread.setDaemon(true);
     thread.start();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/81d5badf/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
index 08a7914..3008248 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
@@ -20,14 +20,17 @@ package org.apache.giraph.utils;
 
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
+
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.util.concurrent.EventExecutorGroup;
 
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -43,6 +46,11 @@ public class ProgressableUtils {
       Logger.getLogger(ProgressableUtils.class);
   /** Msecs to refresh the progress meter (one minute) */
   private static final int DEFUALT_MSEC_PERIOD = 60 * 1000;
+  /**
+   * When getting results with many threads, how many milliseconds to wait
+   * on each when looping through them
+   */
+  private static final int MSEC_TO_WAIT_ON_EACH_FUTURE = 10;
 
   /** Do not instantiate. */
   private ProgressableUtils() {
@@ -217,21 +225,42 @@ public class ProgressableUtils {
   public static <R> List<R> getResultsWithNCallables(
       CallableFactory<R> callableFactory, int numThreads,
       String threadNameFormat, Progressable progressable) {
-    ExecutorService executorService =
-        Executors.newFixedThreadPool(numThreads,
-            new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build());
-    List<Future<R>> futures = Lists.newArrayListWithCapacity(numThreads);
+    ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
+        ThreadUtils.createThreadFactory(threadNameFormat));
+    HashMap<Integer, Future<R>> futures = new HashMap<>(numThreads);
     for (int i = 0; i < numThreads; i++) {
       Callable<R> callable = callableFactory.newCallable(i);
       Future<R> future = executorService.submit(
           new LogStacktraceCallable<R>(callable));
-      futures.add(future);
+      futures.put(i, future);
     }
     executorService.shutdown();
-    List<R> futureResults = Lists.newArrayListWithCapacity(numThreads);
-    for (Future<R> future : futures) {
-      R result = ProgressableUtils.getFutureResult(future, progressable);
-      futureResults.add(result);
+    List<R> futureResults =
+        new ArrayList<>(Collections.<R>nCopies(numThreads, null));
+    // Loop through the futures until all are finished
+    // We do this in order to get any exceptions from the futures early
+    while (!futures.isEmpty()) {
+      Iterator<Map.Entry<Integer, Future<R>>> iterator =
+          futures.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<Integer, Future<R>> entry = iterator.next();
+        R result;
+        try {
+          // Try to get result from the future
+          result = entry.getValue().get(
+              MSEC_TO_WAIT_ON_EACH_FUTURE, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException e) {
+          throw new IllegalStateException("Exception occurred", e);
+        } catch (TimeoutException e) {
+          // If result is not ready yet just keep waiting
+          continue;
+        }
+        // Result is ready, put it to final results
+        futureResults.set(entry.getKey(), result);
+        // Remove current future since we are done with it
+        iterator.remove();
+      }
+      progressable.progress();
     }
     return futureResults;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/81d5badf/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
index 844f929..f3ecfb0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReactiveJMapHistoDumper.java
@@ -107,6 +107,7 @@ public class ReactiveJMapHistoDumper extends
       }
     });
     thread.setName("ReactiveJMapHistoDumperSupervisorThread");
+    thread.setDaemon(true);
     thread.start();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/81d5badf/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
index a235ff4..9518bdc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -45,10 +45,21 @@ public class ThreadUtils {
       String nameFormat,
       Thread.UncaughtExceptionHandler exceptionHandler) {
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder().
-        setNameFormat(nameFormat);
+        setNameFormat(nameFormat).setDaemon(true);
     if (exceptionHandler != null) {
       builder.setUncaughtExceptionHandler(exceptionHandler);
     }
     return builder.build();
   }
+
+  /**
+   * Creates new thread factory with specified thread name format.
+   *
+   * @param nameFormat defines naming format for threads created by
+   *                   thread factory
+   * @return new thread factory with specified thread name format
+   */
+  public static ThreadFactory createThreadFactory(String nameFormat) {
+    return createThreadFactory(nameFormat, null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/81d5badf/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
index dae9963..f37a48d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java
@@ -62,7 +62,8 @@ public class WorkerProgressWriter {
           }
         }
       }
-    });
+    }, "workerProgressThread");
+    writerThread.setDaemon(true);
     writerThread.start();
   }