You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/04/08 20:11:56 UTC

git commit: updated refs/heads/trunk to 81b837d

Updated Branches:
  refs/heads/trunk 34abd6f5b -> 81b837d36


GIRAPH-609:  More information on runtime exceptions for Callables


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/81b837d3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/81b837d3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/81b837d3

Branch: refs/heads/trunk
Commit: 81b837d36189a1d763bd1747ca0550107c155664
Parents: 34abd6f
Author: Avery Ching <ac...@fb.com>
Authored: Mon Apr 8 10:12:42 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Mon Apr 8 11:11:49 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../java/org/apache/giraph/edge/EdgeStore.java     |   17 ++--
 .../org/apache/giraph/graph/GraphTaskManager.java  |    6 +-
 .../org/apache/giraph/master/BspServiceMaster.java |    5 +-
 .../apache/giraph/utils/LogStacktraceCallable.java |   61 +++++++++++++++
 .../org/apache/giraph/worker/BspServiceWorker.java |    6 +-
 6 files changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index cd21461..f7e0af6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-609: More information on runtime exceptions for Callables (aching)
+
   GIRAPH-607: Hive IO bump (nitay)
 
   GIRAPH-564: Input/output formats and readers/writers should implement

http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index d6653d0..01a67dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -20,24 +20,24 @@ package org.apache.giraph.edge;
 
 import com.google.common.collect.MapMaker;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * Collects incoming edges for vertices owned by this worker.
  *
@@ -218,7 +218,8 @@ public class EdgeStore<I extends WritableComparable,
           return null;
         }
       };
-      movePartitionExecutor.submit(moveCallable);
+      movePartitionExecutor.submit(
+          new LogStacktraceCallable<Void>(moveCallable));
     }
 
     movePartitionExecutor.shutdown();

http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 8ed44e8..9823532 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -37,6 +37,7 @@ import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.utils.ReflectionUtils;
@@ -752,7 +753,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
           computePartitionIdQueue,
           conf,
           serviceWorker);
-      partitionFutures.add(partitionExecutor.submit(computeCallable));
+      LogStacktraceCallable<Collection<PartitionStats>> wrapped =
+          new LogStacktraceCallable<Collection<PartitionStats>>(
+              computeCallable);
+      partitionFutures.add(partitionExecutor.submit(wrapped));
     }
 
     // Wait until all the threads are done to wait on all requests

http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index affe4ff..d01dbb4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -56,6 +56,7 @@ import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.utils.ProgressableUtils;
 import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.giraph.zk.BspEvent;
@@ -652,8 +653,8 @@ public class BspServiceMaster<I extends WritableComparable,
     boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
     for (int i = 0; i < splitList.size(); ++i) {
       InputSplit inputSplit = splitList.get(i);
-      taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i,
-          writeLocations));
+      taskExecutor.submit(new LogStacktraceCallable<Void>(
+          new WriteInputSplit(inputSplit, inputSplitsPath, i, writeLocations)));
     }
     taskExecutor.shutdown();
     ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());

http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
new file mode 100644
index 0000000..730825d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.concurrent.Callable;
+import org.apache.log4j.Logger;
+
+/**
+ * A wrapper to improve debugging. It passes the call() invocation to the
+ * provided callable, and upon any exception logs the stacktrace and rethrows
+ * the exception. The logging functionality is missing in FutureTask.
+ *
+ * @param <V> Return type of call()
+ */
+public class LogStacktraceCallable<V> implements Callable<V> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(LogStacktraceCallable.class);
+
+  /** Pass call() to this callable. */
+  private Callable<V> callable;
+
+  /**
+   * Construct an instance that will pass call() to the given callable.
+   *
+   * @param callable Callable
+   */
+  public LogStacktraceCallable(Callable<V> callable) {
+    this.callable = callable;
+  }
+
+  @Override
+  public V call() throws Exception {
+    try {
+      return callable.call();
+      // We catch, log stack trace of, and rethrow all exceptions. It's OK to
+      // skip style check.
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      LOG.error("Execution of callable failed", e);
+      throw e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 061df68..c20d06e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -42,6 +42,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.utils.LogStacktraceCallable;
 import org.apache.giraph.utils.JMapHistoDumper;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.io.VertexOutputFormat;
@@ -283,7 +284,10 @@ public class BspServiceWorker<I extends WritableComparable,
     for (int i = 0; i < numThreads; ++i) {
       Callable<VertexEdgeCount> inputSplitsCallable =
           inputSplitsCallableFactory.newCallable(i);
-      threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
+      LogStacktraceCallable<VertexEdgeCount> wrapped =
+          new LogStacktraceCallable<VertexEdgeCount>(
+              inputSplitsCallable);
+      threadsFutures.add(inputSplitsExecutor.submit(wrapped));
     }
 
     // Wait until all the threads are done to wait on all requests