You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2013/04/08 20:11:56 UTC
git commit: updated refs/heads/trunk to 81b837d
Updated Branches:
refs/heads/trunk 34abd6f5b -> 81b837d36
GIRAPH-609: More information on runtime exceptions for Callables
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/81b837d3
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/81b837d3
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/81b837d3
Branch: refs/heads/trunk
Commit: 81b837d36189a1d763bd1747ca0550107c155664
Parents: 34abd6f
Author: Avery Ching <ac...@fb.com>
Authored: Mon Apr 8 10:12:42 2013 -0700
Committer: Avery Ching <ac...@fb.com>
Committed: Mon Apr 8 11:11:49 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/edge/EdgeStore.java | 17 ++--
.../org/apache/giraph/graph/GraphTaskManager.java | 6 +-
.../org/apache/giraph/master/BspServiceMaster.java | 5 +-
.../apache/giraph/utils/LogStacktraceCallable.java | 61 +++++++++++++++
.../org/apache/giraph/worker/BspServiceWorker.java | 6 +-
6 files changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index cd21461..f7e0af6 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-609: More information on runtime exceptions for Callables (aching)
+
GIRAPH-607: Hive IO bump (nitay)
GIRAPH-564: Input/output formats and readers/writers should implement
http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
index d6653d0..01a67dd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/edge/EdgeStore.java
@@ -20,24 +20,24 @@ package org.apache.giraph.edge;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.ByteArrayVertexIdEdges;
+import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* Collects incoming edges for vertices owned by this worker.
*
@@ -218,7 +218,8 @@ public class EdgeStore<I extends WritableComparable,
return null;
}
};
- movePartitionExecutor.submit(moveCallable);
+ movePartitionExecutor.submit(
+ new LogStacktraceCallable<Void>(moveCallable));
}
movePartitionExecutor.shutdown();
http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 8ed44e8..9823532 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -37,6 +37,7 @@ import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReflectionUtils;
@@ -752,7 +753,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
computePartitionIdQueue,
conf,
serviceWorker);
- partitionFutures.add(partitionExecutor.submit(computeCallable));
+ LogStacktraceCallable<Collection<PartitionStats>> wrapped =
+ new LogStacktraceCallable<Collection<PartitionStats>>(
+ computeCallable);
+ partitionFutures.add(partitionExecutor.submit(wrapped));
}
// Wait until all the threads are done to wait on all requests
http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index affe4ff..d01dbb4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -56,6 +56,7 @@ import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
+import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.zk.BspEvent;
@@ -652,8 +653,8 @@ public class BspServiceMaster<I extends WritableComparable,
boolean writeLocations = USE_INPUT_SPLIT_LOCALITY.get(conf);
for (int i = 0; i < splitList.size(); ++i) {
InputSplit inputSplit = splitList.get(i);
- taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i,
- writeLocations));
+ taskExecutor.submit(new LogStacktraceCallable<Void>(
+ new WriteInputSplit(inputSplit, inputSplitsPath, i, writeLocations)));
}
taskExecutor.shutdown();
ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext());
http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
new file mode 100644
index 0000000..730825d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/LogStacktraceCallable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.concurrent.Callable;
+import org.apache.log4j.Logger;
+
+/**
+ * A wrapper to improve debugging. It passes the call() invocation to the
+ * provided callable, and upon any exception logs the stacktrace and rethrows
+ * the exception. The logging functionality is missing in FutureTask.
+ *
+ * @param <V> Return type of call()
+ */
+public class LogStacktraceCallable<V> implements Callable<V> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(LogStacktraceCallable.class);
+
+ /** Pass call() to this callable. */
+ private Callable<V> callable;
+
+ /**
+ * Construct an instance that will pass call() to the given callable.
+ *
+ * @param callable Callable
+ */
+ public LogStacktraceCallable(Callable<V> callable) {
+ this.callable = callable;
+ }
+
+ @Override
+ public V call() throws Exception {
+ try {
+ return callable.call();
+ // We catch, log stack trace of, and rethrow all exceptions. It's OK to
+ // skip style check.
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ LOG.error("Execution of callable failed", e);
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/81b837d3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 061df68..c20d06e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -42,6 +42,7 @@ import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
+import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexOutputFormat;
@@ -283,7 +284,10 @@ public class BspServiceWorker<I extends WritableComparable,
for (int i = 0; i < numThreads; ++i) {
Callable<VertexEdgeCount> inputSplitsCallable =
inputSplitsCallableFactory.newCallable(i);
- threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
+ LogStacktraceCallable<VertexEdgeCount> wrapped =
+ new LogStacktraceCallable<VertexEdgeCount>(
+ inputSplitsCallable);
+ threadsFutures.add(inputSplitsExecutor.submit(wrapped));
}
// Wait until all the threads are done to wait on all requests