You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/07/08 22:43:12 UTC
[2/2] git commit: updated refs/heads/trunk to 61cb37e
GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/61cb37ec
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/61cb37ec
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/61cb37ec
Branch: refs/heads/trunk
Commit: 61cb37ecd50b0d9400873624e46692c3282e4cfc
Parents: 7f9218a
Author: Pavan Kumar <pa...@fb.com>
Authored: Tue Jul 8 12:08:53 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Tue Jul 8 12:11:12 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
findbugs-exclude.xml | 6 +--
.../apache/giraph/comm/netty/NettyClient.java | 38 +++++++++++---
.../giraph/comm/netty/NettyMasterClient.java | 8 ++-
.../giraph/comm/netty/NettyMasterServer.java | 6 ++-
.../apache/giraph/comm/netty/NettyServer.java | 36 ++++++++-----
.../giraph/comm/netty/NettyWorkerClient.java | 8 ++-
.../giraph/comm/netty/NettyWorkerServer.java | 6 ++-
.../handler/MasterRequestServerHandler.java | 11 ++--
.../netty/handler/RequestServerHandler.java | 18 ++++---
.../handler/WorkerRequestServerHandler.java | 11 ++--
.../org/apache/giraph/graph/GraphMapper.java | 23 +++------
.../apache/giraph/graph/GraphTaskManager.java | 30 +++++++++++
.../apache/giraph/master/BspServiceMaster.java | 9 ++--
.../org/apache/giraph/utils/ThreadUtils.java | 54 ++++++++++++++++++++
.../apache/giraph/worker/BspServiceWorker.java | 6 ++-
.../org/apache/giraph/yarn/GiraphYarnTask.java | 13 -----
.../org/apache/giraph/comm/ConnectionTest.java | 26 ++++++----
.../giraph/comm/MockExceptionHandler.java | 26 ++++++++++
.../apache/giraph/comm/RequestFailureTest.java | 5 +-
.../org/apache/giraph/comm/RequestTest.java | 5 +-
.../apache/giraph/comm/SaslConnectionTest.java | 6 ++-
22 files changed, 255 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 834b45f..13dfcd7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka)
+
GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)
GIRAPH-713: Provide an option to do request compression (pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index e0466f7..9ac4412 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -39,11 +39,7 @@
<Bug pattern="DM_EXIT"/>
</Match>
<Match>
- <Class name="org.apache.giraph.graph.GraphMapper$OverrideExceptionHandler"/>
- <Bug pattern="DM_EXIT"/>
- </Match>
- <Match>
- <Class name="org.apache.giraph.yarn.GiraphYarnTask$OverrideExceptionHandler"/>
+ <Class name="org.apache.giraph.graph.GraphTaskManager$OverrideExceptionHandler"/>
<Bug pattern="DM_EXIT"/>
</Match>
<Match>
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 5bb5545..97394bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -35,6 +35,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.utils.PipelineUtils;
import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
@@ -42,7 +43,6 @@ import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -175,6 +175,12 @@ public class NettyClient {
/** When was the last time we checked if we should resend some requests */
private final AtomicLong lastTimeCheckedRequestsForProblems =
new AtomicLong(0);
+ /**
+ * Logger used to dump stack traces for every exception that happens
+ * in netty client threads.
+ */
+ private final LogOnErrorChannelFutureListener logErrorListener =
+ new LogOnErrorChannelFutureListener();
/**
* Only constructor
@@ -182,10 +188,13 @@ public class NettyClient {
* @param context Context for progress
* @param conf Configuration
* @param myTaskInfo Current task info
+ * @param exceptionHandler handler for uncaught exception. Will
+ * terminate job.
*/
public NettyClient(Mapper<?, ?, ?, ?>.Context context,
final ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo) {
+ TaskInfo myTaskInfo,
+ final Thread.UncaughtExceptionHandler exceptionHandler) {
this.context = context;
this.myTaskInfo = myTaskInfo;
this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
@@ -226,8 +235,8 @@ public class NettyClient {
if (useExecutionGroup) {
int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
executionGroup = new DefaultEventExecutorGroup(executionThreads,
- new ThreadFactoryBuilder().setNameFormat("netty-client-exec-%d")
- .build());
+ ThreadUtils.createThreadFactory(
+ "netty-client-exec-%d", exceptionHandler));
if (LOG.isInfoEnabled()) {
LOG.info("NettyClient: Using execution handler with " +
executionThreads + " threads after " +
@@ -238,8 +247,8 @@ public class NettyClient {
}
workerGroup = new NioEventLoopGroup(maxPoolSize,
- new ThreadFactoryBuilder().setNameFormat(
- "netty-client-worker-%d").build());
+ ThreadUtils.createThreadFactory(
+ "netty-client-worker-%d", exceptionHandler));
bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
@@ -696,6 +705,7 @@ public class NettyClient {
}
ChannelFuture writeFuture = channel.write(request);
newRequestInfo.setWriteFuture(writeFuture);
+ writeFuture.addListener(logErrorListener);
if (limitNumberOfOpenRequests &&
clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
@@ -868,6 +878,7 @@ public class NettyClient {
}
ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
requestInfo.setWriteFuture(writeFuture);
+ writeFuture.addListener(logErrorListener);
}
addedRequestIds.clear();
addedRequestInfos.clear();
@@ -906,4 +917,19 @@ public class NettyClient {
}
return address;
}
+
+ /**
+ * This listener class just dumps exception stack traces if
+ * something happens.
+ */
+ private static class LogOnErrorChannelFutureListener
+ implements ChannelFutureListener {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isDone() && !future.isSuccess()) {
+ LOG.error("Request failed", future.cause());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index c982209..1218d29 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -54,12 +54,16 @@ public class NettyMasterClient implements MasterClient {
* @param context Context from mapper
* @param configuration Configuration
* @param service Centralized service
+ * @param exceptionHandler handler for uncaught exception. Will
+ * terminate job.
*/
public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration configuration,
- CentralizedServiceMaster<?, ?, ?> service) {
+ CentralizedServiceMaster<?, ?, ?> service,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
this.nettyClient =
- new NettyClient(context, configuration, service.getMasterInfo());
+ new NettyClient(context, configuration, service.getMasterInfo(),
+ exceptionHandler);
this.service = service;
this.progressable = context;
maxBytesPerAggregatorRequest = configuration.getInt(
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
index cb36c3e..1c05910 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -39,13 +39,15 @@ public class NettyMasterServer implements MasterServer {
* @param conf Hadoop configuration
* @param service Centralized service
* @param progressable Progressable for reporting progress
+ * @param exceptionHandler to handle uncaught exceptions
*/
public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
CentralizedServiceMaster<?, ?, ?> service,
- Progressable progressable) {
+ Progressable progressable,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
nettyServer = new NettyServer(conf,
new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
- service.getMasterInfo(), progressable);
+ service.getMasterInfo(), progressable, exceptionHandler);
nettyServer.start();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index 8162857..454232a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -33,6 +33,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.utils.PipelineUtils;
import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ThreadUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import io.netty.bootstrap.ServerBootstrap;
@@ -54,8 +55,6 @@ import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@@ -122,6 +121,8 @@ public class NettyServer {
private final EventExecutorGroup executionGroup;
/** Name of the handler before the execution handler (if used) */
private final String handlerToUseExecutionGroup;
+ /** Handles all uncaught exceptions in netty threads */
+ private final Thread.UncaughtExceptionHandler exceptionHandler;
/**
* Constructor for creating the server
@@ -130,10 +131,12 @@ public class NettyServer {
* @param requestServerHandlerFactory Factory for request handlers
* @param myTaskInfo Current task info
* @param progressable Progressable for reporting progress
+ * @param exceptionHandler handle uncaught exceptions
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory,
- TaskInfo myTaskInfo, Progressable progressable) {
+ TaskInfo myTaskInfo, Progressable progressable,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
this.conf = conf;
this.progressable = progressable;
this.requestServerHandlerFactory = requestServerHandlerFactory;
@@ -141,6 +144,7 @@ public class NettyServer {
this.saslServerHandlerFactory = new SaslServerHandler.Factory();
/*end[HADOOP_NON_SECURE]*/
this.myTaskInfo = myTaskInfo;
+ this.exceptionHandler = exceptionHandler;
sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
@@ -149,12 +153,12 @@ public class NettyServer {
maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
bossGroup = new NioEventLoopGroup(4,
- new ThreadFactoryBuilder().setNameFormat(
- "netty-server-boss-%d").build());
+ ThreadUtils.createThreadFactory(
+ "netty-server-boss-%d", exceptionHandler));
workerGroup = new NioEventLoopGroup(maxPoolSize,
- new ThreadFactoryBuilder().setNameFormat(
- "netty-server-worker-%d").build());
+ ThreadUtils.createThreadFactory(
+ "netty-server-worker-%d", exceptionHandler));
try {
this.localHostname = conf.getLocalHostname();
@@ -173,8 +177,8 @@ public class NettyServer {
if (useExecutionGroup) {
int executionThreads = conf.getNettyServerExecutionThreads();
executionGroup = new DefaultEventExecutorGroup(executionThreads,
- new ThreadFactoryBuilder().setNameFormat("netty-server-exec-%d").
- build());
+ ThreadUtils.createThreadFactory(
+ "netty-server-exec-%d", exceptionHandler));
if (LOG.isInfoEnabled()) {
LOG.info("NettyServer: Using execution group with " +
executionThreads + " threads for " +
@@ -194,13 +198,16 @@ public class NettyServer {
* @param myTaskInfo Current task info
* @param progressable Progressable for reporting progress
* @param saslServerHandlerFactory Factory for SASL handlers
+ * @param exceptionHandler handle uncaught exceptions
*/
public NettyServer(ImmutableClassesGiraphConfiguration conf,
RequestServerHandler.Factory requestServerHandlerFactory,
TaskInfo myTaskInfo,
Progressable progressable,
- SaslServerHandler.Factory saslServerHandlerFactory) {
- this(conf, requestServerHandlerFactory, myTaskInfo, progressable);
+ SaslServerHandler.Factory saslServerHandlerFactory,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
+ this(conf, requestServerHandlerFactory, myTaskInfo,
+ progressable, exceptionHandler);
this.saslServerHandlerFactory = saslServerHandlerFactory;
}
/*end[HADOOP_NON_SECURE]*/
@@ -267,8 +274,8 @@ public class NettyServer {
executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
requestServerHandlerFactory.newHandler(workerRequestReservedMap,
- conf, myTaskInfo), handlerToUseExecutionGroup,
- executionGroup, ch);
+ conf, myTaskInfo, exceptionHandler),
+ handlerToUseExecutionGroup, executionGroup, ch);
// Removed after authentication completes:
PipelineUtils.addLastWithExecutorCheck("responseEncoder",
new ResponseEncoder(), handlerToUseExecutionGroup,
@@ -310,7 +317,7 @@ public class NettyServer {
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
requestServerHandlerFactory.newHandler(
- workerRequestReservedMap, conf, myTaskInfo),
+ workerRequestReservedMap, conf, myTaskInfo, exceptionHandler),
handlerToUseExecutionGroup, executionGroup, ch);
/*if_not[HADOOP_NON_SECURE]*/
}
@@ -404,5 +411,6 @@ public class NettyServer {
public InetSocketAddress getMyAddress() {
return myAddress;
}
+
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 7541418..c893a24 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -74,13 +74,17 @@ public class NettyWorkerClient<I extends WritableComparable,
* @param context Context from mapper
* @param configuration Configuration
* @param service Used to get partition mapping
+ * @param exceptionHandler handler for uncaught exception. Will
+ * terminate job.
*/
public NettyWorkerClient(
Mapper<?, ?, ?, ?>.Context context,
ImmutableClassesGiraphConfiguration<I, V, E> configuration,
- CentralizedServiceWorker<I, V, E> service) {
+ CentralizedServiceWorker<I, V, E> service,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
this.nettyClient =
- new NettyClient(context, configuration, service.getWorkerInfo());
+ new NettyClient(context, configuration, service.getWorkerInfo(),
+ exceptionHandler);
this.conf = configuration;
this.service = service;
this.superstepRequestCounters = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index adb96cb..22ecc0e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -77,10 +77,12 @@ public class NettyWorkerServer<I extends WritableComparable,
* @param conf Configuration
* @param service Service to get partition mappings
* @param context Mapper context
+ * @param exceptionHandler handle uncaught exceptions
*/
public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> conf,
CentralizedServiceWorker<I, V, E> service,
- Mapper<?, ?, ?, ?>.Context context) {
+ Mapper<?, ?, ?, ?>.Context context,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
this.conf = conf;
this.service = service;
this.context = context;
@@ -91,7 +93,7 @@ public class NettyWorkerServer<I extends WritableComparable,
nettyServer = new NettyServer(conf,
new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
- service.getWorkerInfo(), context);
+ service.getWorkerInfo(), context, exceptionHandler);
nettyServer.start();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 3e06026..e043314 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -36,13 +36,15 @@ public class MasterRequestServerHandler extends
* @param conf Configuration
* @param myTaskInfo Current task info
* @param aggregatorHandler Master aggregator handler
+ * @param exceptionHandler Handles uncaught exceptions
*/
public MasterRequestServerHandler(
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo,
- MasterAggregatorHandler aggregatorHandler) {
- super(workerRequestReservedMap, conf, myTaskInfo);
+ MasterAggregatorHandler aggregatorHandler,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
+ super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
this.aggregatorHandler = aggregatorHandler;
}
@@ -71,9 +73,10 @@ public class MasterRequestServerHandler extends
public RequestServerHandler newHandler(
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo) {
+ TaskInfo myTaskInfo,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
return new MasterRequestServerHandler(workerRequestReservedMap, conf,
- myTaskInfo, aggregatorHandler);
+ myTaskInfo, aggregatorHandler, exceptionHandler);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index b6d0533..d75870a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -56,6 +56,8 @@ public abstract class RequestServerHandler<R> extends
private final TaskInfo myTaskInfo;
/** Start nanoseconds for the processing time */
private long startProcessingNanoseconds = -1;
+ /** Handler for uncaught exceptions */
+ private final Thread.UncaughtExceptionHandler exceptionHandler;
/**
* Constructor
@@ -63,14 +65,17 @@ public abstract class RequestServerHandler<R> extends
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration
* @param myTaskInfo Current task info
+ * @param exceptionHandler Handles uncaught exceptions
*/
public RequestServerHandler(
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo) {
+ TaskInfo myTaskInfo,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
this.workerRequestReservedMap = workerRequestReservedMap;
closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
this.myTaskInfo = myTaskInfo;
+ this.exceptionHandler = exceptionHandler;
}
@Override
@@ -159,10 +164,9 @@ public abstract class RequestServerHandler<R> extends
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- LOG.warn("exceptionCaught: Channel failed with " +
- "remote address " + ctx.channel().remoteAddress(), cause);
+ public void exceptionCaught(
+ ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ exceptionHandler.uncaughtException(Thread.currentThread(), cause);
}
/**
@@ -175,11 +179,13 @@ public abstract class RequestServerHandler<R> extends
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration to use
* @param myTaskInfo Current task info
+ * @param exceptionHandler Handles uncaught exceptions
* @return New {@link RequestServerHandler}
*/
RequestServerHandler newHandler(
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo);
+ TaskInfo myTaskInfo,
+ Thread.UncaughtExceptionHandler exceptionHandler);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
index f64c373..574e413 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
@@ -46,12 +46,14 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
* @param workerRequestReservedMap Worker request reservation map
* @param conf Configuration
* @param myTaskInfo Current task info
+ * @param exceptionHandler Handles uncaught exceptions
*/
public WorkerRequestServerHandler(ServerData<I, V, E> serverData,
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo) {
- super(workerRequestReservedMap, conf, myTaskInfo);
+ TaskInfo myTaskInfo,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
+ super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
this.serverData = serverData;
}
@@ -80,9 +82,10 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
public RequestServerHandler newHandler(
WorkerRequestReservedMap workerRequestReservedMap,
ImmutableClassesGiraphConfiguration conf,
- TaskInfo myTaskInfo) {
+ TaskInfo myTaskInfo,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
return new WorkerRequestServerHandler<I, V, E, Writable>(serverData,
- workerRequestReservedMap, conf, myTaskInfo);
+ workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index c86a024..6f748c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -49,13 +49,15 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
@Override
public void setup(Context context)
throws IOException, InterruptedException {
- // Setting the default handler for uncaught exceptions.
- Thread.setDefaultUncaughtExceptionHandler(
- new OverrideExceptionHandler());
-
// Execute all Giraph-related role(s) assigned to this compute node.
// Roles can include "master," "worker," "zookeeper," or . . . ?
graphTaskManager = new GraphTaskManager<I, V, E>(context);
+
+ // Setting the default handler for uncaught exceptions.
+ Thread.setDefaultUncaughtExceptionHandler(
+ graphTaskManager.createUncaughtExceptionHandler());
+
+
graphTaskManager.setup(
DistributedCache.getLocalCacheArchives(context.getConfiguration()));
}
@@ -96,6 +98,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
// CHECKSTYLE: stop IllegalCatch
} catch (RuntimeException e) {
// CHECKSTYLE: resume IllegalCatch
+ LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
graphTaskManager.zooKeeperCleanup();
graphTaskManager.workerFailureCleanup();
throw new IllegalStateException(
@@ -103,16 +106,4 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
}
}
- /**
- * Default handler for uncaught exceptions.
- */
- class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- LOG.fatal(
- "uncaughtException: OverrideExceptionHandler on thread " +
- t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
- System.exit(1);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index e13eedd..b2a5c84 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -911,7 +911,37 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
}
+ /**
+ * Creates exception handler that will terminate process gracefully in case
+ * of any uncaught exception.
+ * @return new exception handler object.
+ */
+ public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
+ return new OverrideExceptionHandler();
+ }
+
public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
return conf;
}
+
+
+ /**
+ * Default handler for uncaught exceptions.
+ * It will do the best to clean up and then will terminate current giraph job.
+ */
+ class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e) {
+ try {
+ LOG.fatal(
+ "uncaughtException: OverrideExceptionHandler on thread " +
+ t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
+
+ zooKeeperCleanup();
+ workerFailureCleanup();
+ } finally {
+ System.exit(1);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 02d4f2b..0275395 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -883,11 +883,13 @@ public class BspServiceMaster<I extends WritableComparable,
masterInfo = new MasterInfo();
masterServer =
- new NettyMasterServer(getConfiguration(), this, getContext());
+ new NettyMasterServer(getConfiguration(), this, getContext(),
+ getGraphTaskManager().createUncaughtExceptionHandler());
masterInfo.setInetSocketAddress(masterServer.getMyAddress());
masterInfo.setTaskId(getTaskPartition());
masterClient =
- new NettyMasterClient(getContext(), getConfiguration(), this);
+ new NettyMasterClient(getContext(), getConfiguration(), this,
+ getGraphTaskManager().createUncaughtExceptionHandler());
if (LOG.isInfoEnabled()) {
LOG.info("becomeMaster: I am now the master!");
@@ -1397,8 +1399,7 @@ public class BspServiceMaster<I extends WritableComparable,
// Did a worker die?
try {
- if ((getSuperstep() > 0) &&
- !superstepChosenWorkerAlive(
+ if (!superstepChosenWorkerAlive(
workerInfoHealthyPath,
workerInfoList)) {
return false;
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
new file mode 100644
index 0000000..a235ff4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Utility class for thread related functions.
+ */
+public class ThreadUtils {
+
+ /**
+ * Utility class. Do not inherit or create objects.
+ */
+ private ThreadUtils() { }
+
+ /**
+ * Creates new thread factory with specified thread name format.
+ *
+ * @param nameFormat defines naming format for threads created by
+ * thread factory
+ * @param exceptionHandler handles uncaught exceptions in all threads
+ * produced created thread factory
+ * @return new thread factory with specified thread name format and
+ * exception handler.
+ */
+ public static ThreadFactory createThreadFactory(
+ String nameFormat,
+ Thread.UncaughtExceptionHandler exceptionHandler) {
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder().
+ setNameFormat(nameFormat);
+ if (exceptionHandler != null) {
+ builder.setUncaughtExceptionHandler(exceptionHandler);
+ }
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index dbe6a45..de7af28 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -197,10 +197,12 @@ public class BspServiceWorker<I extends WritableComparable,
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
workerInfo = new WorkerInfo();
- workerServer = new NettyWorkerServer<I, V, E>(conf, this, context);
+ workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
+ graphTaskManager.createUncaughtExceptionHandler());
workerInfo.setInetSocketAddress(workerServer.getMyAddress());
workerInfo.setTaskId(getTaskPartition());
- workerClient = new NettyWorkerClient<I, V, E>(context, conf, this);
+ workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
+ graphTaskManager.createUncaughtExceptionHandler());
workerAggregatorRequestProcessor =
new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
index f4719cc..ccfc972 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
@@ -166,19 +166,6 @@ public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
}
/**
- * Default handler for uncaught exceptions.
- */
- class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- LOG.fatal(
- "uncaughtException: OverrideExceptionHandler on thread " +
- t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
- System.exit(1);
- }
- }
-
- /**
* Task entry point.
* @param args CLI arguments injected by GiraphApplicationMaster to hand off
* job, task, and attempt ID's to this (and every) Giraph task.
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
index e771e36..5bc9ef0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
@@ -70,11 +70,12 @@ public class ConnectionTest {
NettyServer server =
new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData), workerInfo,
- context);
+ context, new MockExceptionHandler());
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
- NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+ NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
client.connectAllAddresses(
Lists.<WorkerInfo>newArrayList(workerInfo));
@@ -101,7 +102,8 @@ public class ConnectionTest {
WorkerInfo workerInfo1 = new WorkerInfo();
workerInfo1.setTaskId(1);
NettyServer server1 =
- new NettyServer(conf, requestServerHandlerFactory, workerInfo1, context);
+ new NettyServer(conf, requestServerHandlerFactory, workerInfo1,
+ context, new MockExceptionHandler());
server1.start();
workerInfo1.setInetSocketAddress(server1.getMyAddress());
@@ -109,7 +111,7 @@ public class ConnectionTest {
workerInfo1.setTaskId(2);
NettyServer server2 =
new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
- context);
+ context, new MockExceptionHandler());
server2.start();
workerInfo2.setInetSocketAddress(server2.getMyAddress());
@@ -117,11 +119,12 @@ public class ConnectionTest {
workerInfo1.setTaskId(3);
NettyServer server3 =
new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
- context);
+ context, new MockExceptionHandler());
server3.start();
workerInfo3.setInetSocketAddress(server3.getMyAddress());
- NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+ NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
workerInfo2, workerInfo3);
client.connectAllAddresses(addresses);
@@ -148,16 +151,19 @@ public class ConnectionTest {
WorkerInfo workerInfo = new WorkerInfo();
NettyServer server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData), workerInfo,
- context);
+ context, new MockExceptionHandler());
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
- NettyClient client1 = new NettyClient(context, conf, new WorkerInfo());
+ NettyClient client1 = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
client1.connectAllAddresses(addresses);
- NettyClient client2 = new NettyClient(context, conf, new WorkerInfo());
+ NettyClient client2 = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
client2.connectAllAddresses(addresses);
- NettyClient client3 = new NettyClient(context, conf, new WorkerInfo());
+ NettyClient client3 = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
client3.connectAllAddresses(addresses);
client1.stop();
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java b/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java
new file mode 100644
index 0000000..edd3fc0
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+public class MockExceptionHandler implements Thread.UncaughtExceptionHandler{
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ throw new RuntimeException(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 157a543..572e290 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -160,10 +160,11 @@ public class RequestFailureTest {
WorkerInfo workerInfo = new WorkerInfo();
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData), workerInfo,
- context);
+ context, new MockExceptionHandler());
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
- client = new NettyClient(context, conf, new WorkerInfo());
+ client = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
client.connectAllAddresses(
Lists.<WorkerInfo>newArrayList(workerInfo));
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 32454f4..8037db9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -95,10 +95,11 @@ public class RequestTest {
workerInfo = new WorkerInfo();
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData), workerInfo,
- context);
+ context, new MockExceptionHandler());
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
- client = new NettyClient(context, conf, new WorkerInfo());
+ client = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
client.connectAllAddresses(
Lists.<WorkerInfo>newArrayList(workerInfo));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
index c026cf8..96ce062 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -85,11 +85,13 @@ public class SaslConnectionTest {
new WorkerRequestServerHandler.Factory(serverData),
workerInfo,
context,
- mockedSaslServerFactory);
+ mockedSaslServerFactory,
+ new MockExceptionHandler());
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
- NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+ NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
+ new MockExceptionHandler());
client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo));
client.stop();