You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by pa...@apache.org on 2014/07/08 22:43:12 UTC

[2/2] git commit: updated refs/heads/trunk to 61cb37e

GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/61cb37ec
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/61cb37ec
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/61cb37ec

Branch: refs/heads/trunk
Commit: 61cb37ecd50b0d9400873624e46692c3282e4cfc
Parents: 7f9218a
Author: Pavan Kumar <pa...@fb.com>
Authored: Tue Jul 8 12:08:53 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Tue Jul 8 12:11:12 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 findbugs-exclude.xml                            |  6 +--
 .../apache/giraph/comm/netty/NettyClient.java   | 38 +++++++++++---
 .../giraph/comm/netty/NettyMasterClient.java    |  8 ++-
 .../giraph/comm/netty/NettyMasterServer.java    |  6 ++-
 .../apache/giraph/comm/netty/NettyServer.java   | 36 ++++++++-----
 .../giraph/comm/netty/NettyWorkerClient.java    |  8 ++-
 .../giraph/comm/netty/NettyWorkerServer.java    |  6 ++-
 .../handler/MasterRequestServerHandler.java     | 11 ++--
 .../netty/handler/RequestServerHandler.java     | 18 ++++---
 .../handler/WorkerRequestServerHandler.java     | 11 ++--
 .../org/apache/giraph/graph/GraphMapper.java    | 23 +++------
 .../apache/giraph/graph/GraphTaskManager.java   | 30 +++++++++++
 .../apache/giraph/master/BspServiceMaster.java  |  9 ++--
 .../org/apache/giraph/utils/ThreadUtils.java    | 54 ++++++++++++++++++++
 .../apache/giraph/worker/BspServiceWorker.java  |  6 ++-
 .../org/apache/giraph/yarn/GiraphYarnTask.java  | 13 -----
 .../org/apache/giraph/comm/ConnectionTest.java  | 26 ++++++----
 .../giraph/comm/MockExceptionHandler.java       | 26 ++++++++++
 .../apache/giraph/comm/RequestFailureTest.java  |  5 +-
 .../org/apache/giraph/comm/RequestTest.java     |  5 +-
 .../apache/giraph/comm/SaslConnectionTest.java  |  6 ++-
 22 files changed, 255 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 834b45f..13dfcd7 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka)
+
   GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)
 
   GIRAPH-713: Provide an option to do request compression (pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index e0466f7..9ac4412 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -39,11 +39,7 @@
     <Bug pattern="DM_EXIT"/>
   </Match>
   <Match>
-    <Class name="org.apache.giraph.graph.GraphMapper$OverrideExceptionHandler"/>
-    <Bug pattern="DM_EXIT"/>
-  </Match>
-  <Match>
-    <Class name="org.apache.giraph.yarn.GiraphYarnTask$OverrideExceptionHandler"/>
+    <Class name="org.apache.giraph.graph.GraphTaskManager$OverrideExceptionHandler"/>
     <Bug pattern="DM_EXIT"/>
   </Match>
   <Match>

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 5bb5545..97394bf 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -35,6 +35,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.utils.PipelineUtils;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.utils.TimedLogger;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
@@ -42,7 +43,6 @@ import org.apache.log4j.Logger;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -175,6 +175,12 @@ public class NettyClient {
   /** When was the last time we checked if we should resend some requests */
   private final AtomicLong lastTimeCheckedRequestsForProblems =
       new AtomicLong(0);
+  /**
+   * Logger used to dump stack traces for every exception that happens
+   * in netty client threads.
+   */
+  private final LogOnErrorChannelFutureListener logErrorListener =
+      new LogOnErrorChannelFutureListener();
 
   /**
    * Only constructor
@@ -182,10 +188,13 @@ public class NettyClient {
    * @param context Context for progress
    * @param conf Configuration
    * @param myTaskInfo Current task info
+   * @param exceptionHandler handler for uncaught exception. Will
+   *                         terminate job.
    */
   public NettyClient(Mapper<?, ?, ?, ?>.Context context,
                      final ImmutableClassesGiraphConfiguration conf,
-                     TaskInfo myTaskInfo) {
+                     TaskInfo myTaskInfo,
+                     final Thread.UncaughtExceptionHandler exceptionHandler) {
     this.context = context;
     this.myTaskInfo = myTaskInfo;
     this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
@@ -226,8 +235,8 @@ public class NettyClient {
     if (useExecutionGroup) {
       int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);
       executionGroup = new DefaultEventExecutorGroup(executionThreads,
-          new ThreadFactoryBuilder().setNameFormat("netty-client-exec-%d")
-              .build());
+          ThreadUtils.createThreadFactory(
+              "netty-client-exec-%d", exceptionHandler));
       if (LOG.isInfoEnabled()) {
         LOG.info("NettyClient: Using execution handler with " +
             executionThreads + " threads after " +
@@ -238,8 +247,8 @@ public class NettyClient {
     }
 
     workerGroup = new NioEventLoopGroup(maxPoolSize,
-        new ThreadFactoryBuilder().setNameFormat(
-            "netty-client-worker-%d").build());
+        ThreadUtils.createThreadFactory(
+            "netty-client-worker-%d", exceptionHandler));
 
     bootstrap = new Bootstrap();
     bootstrap.group(workerGroup)
@@ -696,6 +705,7 @@ public class NettyClient {
     }
     ChannelFuture writeFuture = channel.write(request);
     newRequestInfo.setWriteFuture(writeFuture);
+    writeFuture.addListener(logErrorListener);
 
     if (limitNumberOfOpenRequests &&
         clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
@@ -868,6 +878,7 @@ public class NettyClient {
       }
       ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
       requestInfo.setWriteFuture(writeFuture);
+      writeFuture.addListener(logErrorListener);
     }
     addedRequestIds.clear();
     addedRequestInfos.clear();
@@ -906,4 +917,19 @@ public class NettyClient {
     }
     return address;
   }
+
+  /**
+   * This listener class just dumps exception stack traces if
+   * something happens.
+   */
+  private static class LogOnErrorChannelFutureListener
+      implements ChannelFutureListener {
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (future.isDone() && !future.isSuccess()) {
+        LOG.error("Request failed", future.cause());
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
index c982209..1218d29 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
@@ -54,12 +54,16 @@ public class NettyMasterClient implements MasterClient {
    * @param context Context from mapper
    * @param configuration Configuration
    * @param service Centralized service
+   * @param exceptionHandler handler for uncaught exception. Will
+   *                         terminate job.
    */
   public NettyMasterClient(Mapper<?, ?, ?, ?>.Context context,
                            ImmutableClassesGiraphConfiguration configuration,
-                           CentralizedServiceMaster<?, ?, ?> service) {
+                           CentralizedServiceMaster<?, ?, ?> service,
+                           Thread.UncaughtExceptionHandler exceptionHandler) {
     this.nettyClient =
-        new NettyClient(context, configuration, service.getMasterInfo());
+        new NettyClient(context, configuration, service.getMasterInfo(),
+            exceptionHandler);
     this.service = service;
     this.progressable = context;
     maxBytesPerAggregatorRequest = configuration.getInt(

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
index cb36c3e..1c05910 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
@@ -39,13 +39,15 @@ public class NettyMasterServer implements MasterServer {
    * @param conf Hadoop configuration
    * @param service Centralized service
    * @param progressable Progressable for reporting progress
+   * @param exceptionHandler to handle uncaught exceptions
    */
   public NettyMasterServer(ImmutableClassesGiraphConfiguration conf,
       CentralizedServiceMaster<?, ?, ?> service,
-      Progressable progressable) {
+      Progressable progressable,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
     nettyServer = new NettyServer(conf,
         new MasterRequestServerHandler.Factory(service.getAggregatorHandler()),
-        service.getMasterInfo(), progressable);
+        service.getMasterInfo(), progressable, exceptionHandler);
     nettyServer.start();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
index 8162857..454232a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
@@ -33,6 +33,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.utils.PipelineUtils;
 import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 import io.netty.bootstrap.ServerBootstrap;
@@ -54,8 +55,6 @@ import io.netty.util.concurrent.EventExecutorGroup;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.channel.AdaptiveRecvByteBufAllocator;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
@@ -122,6 +121,8 @@ public class NettyServer {
   private final EventExecutorGroup executionGroup;
   /** Name of the handler before the execution handler (if used) */
   private final String handlerToUseExecutionGroup;
+  /** Handles all uncaught exceptions in netty threads */
+  private final Thread.UncaughtExceptionHandler exceptionHandler;
 
   /**
    * Constructor for creating the server
@@ -130,10 +131,12 @@ public class NettyServer {
    * @param requestServerHandlerFactory Factory for request handlers
    * @param myTaskInfo Current task info
    * @param progressable Progressable for reporting progress
+   * @param exceptionHandler handle uncaught exceptions
    */
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
       RequestServerHandler.Factory requestServerHandlerFactory,
-      TaskInfo myTaskInfo, Progressable progressable) {
+      TaskInfo myTaskInfo, Progressable progressable,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
     this.conf = conf;
     this.progressable = progressable;
     this.requestServerHandlerFactory = requestServerHandlerFactory;
@@ -141,6 +144,7 @@ public class NettyServer {
     this.saslServerHandlerFactory = new SaslServerHandler.Factory();
     /*end[HADOOP_NON_SECURE]*/
     this.myTaskInfo = myTaskInfo;
+    this.exceptionHandler = exceptionHandler;
     sendBufferSize = GiraphConstants.SERVER_SEND_BUFFER_SIZE.get(conf);
     receiveBufferSize = GiraphConstants.SERVER_RECEIVE_BUFFER_SIZE.get(conf);
 
@@ -149,12 +153,12 @@ public class NettyServer {
     maxPoolSize = GiraphConstants.NETTY_SERVER_THREADS.get(conf);
 
     bossGroup = new NioEventLoopGroup(4,
-        new ThreadFactoryBuilder().setNameFormat(
-          "netty-server-boss-%d").build());
+        ThreadUtils.createThreadFactory(
+            "netty-server-boss-%d", exceptionHandler));
 
     workerGroup = new NioEventLoopGroup(maxPoolSize,
-        new ThreadFactoryBuilder().setNameFormat(
-          "netty-server-worker-%d").build());
+        ThreadUtils.createThreadFactory(
+            "netty-server-worker-%d", exceptionHandler));
 
     try {
       this.localHostname = conf.getLocalHostname();
@@ -173,8 +177,8 @@ public class NettyServer {
     if (useExecutionGroup) {
       int executionThreads = conf.getNettyServerExecutionThreads();
       executionGroup = new DefaultEventExecutorGroup(executionThreads,
-          new ThreadFactoryBuilder().setNameFormat("netty-server-exec-%d").
-              build());
+          ThreadUtils.createThreadFactory(
+              "netty-server-exec-%d", exceptionHandler));
       if (LOG.isInfoEnabled()) {
         LOG.info("NettyServer: Using execution group with " +
             executionThreads + " threads for " +
@@ -194,13 +198,16 @@ public class NettyServer {
    * @param myTaskInfo Current task info
    * @param progressable Progressable for reporting progress
    * @param saslServerHandlerFactory  Factory for SASL handlers
+   * @param exceptionHandler handle uncaught exceptions
    */
   public NettyServer(ImmutableClassesGiraphConfiguration conf,
                      RequestServerHandler.Factory requestServerHandlerFactory,
                      TaskInfo myTaskInfo,
                      Progressable progressable,
-                     SaslServerHandler.Factory saslServerHandlerFactory) {
-    this(conf, requestServerHandlerFactory, myTaskInfo, progressable);
+                     SaslServerHandler.Factory saslServerHandlerFactory,
+                     Thread.UncaughtExceptionHandler exceptionHandler) {
+    this(conf, requestServerHandlerFactory, myTaskInfo,
+        progressable, exceptionHandler);
     this.saslServerHandlerFactory = saslServerHandlerFactory;
   }
 /*end[HADOOP_NON_SECURE]*/
@@ -267,8 +274,8 @@ public class NettyServer {
               executionGroup, ch);
           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
               requestServerHandlerFactory.newHandler(workerRequestReservedMap,
-                  conf, myTaskInfo), handlerToUseExecutionGroup,
-              executionGroup, ch);
+                  conf, myTaskInfo, exceptionHandler),
+              handlerToUseExecutionGroup, executionGroup, ch);
           // Removed after authentication completes:
           PipelineUtils.addLastWithExecutorCheck("responseEncoder",
               new ResponseEncoder(), handlerToUseExecutionGroup,
@@ -310,7 +317,7 @@ public class NettyServer {
               handlerToUseExecutionGroup, executionGroup, ch);
           PipelineUtils.addLastWithExecutorCheck("requestServerHandler",
               requestServerHandlerFactory.newHandler(
-                  workerRequestReservedMap, conf, myTaskInfo),
+                  workerRequestReservedMap, conf, myTaskInfo, exceptionHandler),
               handlerToUseExecutionGroup, executionGroup, ch);
 /*if_not[HADOOP_NON_SECURE]*/
         }
@@ -404,5 +411,6 @@ public class NettyServer {
   public InetSocketAddress getMyAddress() {
     return myAddress;
   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
index 7541418..c893a24 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
@@ -74,13 +74,17 @@ public class NettyWorkerClient<I extends WritableComparable,
    * @param context Context from mapper
    * @param configuration Configuration
    * @param service Used to get partition mapping
+   * @param exceptionHandler handler for uncaught exception. Will
+   *                         terminate job.
    */
   public NettyWorkerClient(
       Mapper<?, ?, ?, ?>.Context context,
       ImmutableClassesGiraphConfiguration<I, V, E> configuration,
-      CentralizedServiceWorker<I, V, E> service) {
+      CentralizedServiceWorker<I, V, E> service,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
     this.nettyClient =
-        new NettyClient(context, configuration, service.getWorkerInfo());
+        new NettyClient(context, configuration, service.getWorkerInfo(),
+            exceptionHandler);
     this.conf = configuration;
     this.service = service;
     this.superstepRequestCounters = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index adb96cb..22ecc0e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -77,10 +77,12 @@ public class NettyWorkerServer<I extends WritableComparable,
    * @param conf Configuration
    * @param service Service to get partition mappings
    * @param context Mapper context
+   * @param exceptionHandler handle uncaught exceptions
    */
   public NettyWorkerServer(ImmutableClassesGiraphConfiguration<I, V, E> conf,
       CentralizedServiceWorker<I, V, E> service,
-      Mapper<?, ?, ?, ?>.Context context) {
+      Mapper<?, ?, ?, ?>.Context context,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
     this.conf = conf;
     this.service = service;
     this.context = context;
@@ -91,7 +93,7 @@ public class NettyWorkerServer<I extends WritableComparable,
 
     nettyServer = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory<I, V, E>(serverData),
-        service.getWorkerInfo(), context);
+        service.getWorkerInfo(), context, exceptionHandler);
     nettyServer.start();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
index 3e06026..e043314 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
@@ -36,13 +36,15 @@ public class MasterRequestServerHandler extends
    * @param conf                     Configuration
    * @param myTaskInfo               Current task info
    * @param aggregatorHandler        Master aggregator handler
+   * @param exceptionHandler         Handles uncaught exceptions
    */
   public MasterRequestServerHandler(
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
       TaskInfo myTaskInfo,
-      MasterAggregatorHandler aggregatorHandler) {
-    super(workerRequestReservedMap, conf, myTaskInfo);
+      MasterAggregatorHandler aggregatorHandler,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
+    super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
     this.aggregatorHandler = aggregatorHandler;
   }
 
@@ -71,9 +73,10 @@ public class MasterRequestServerHandler extends
     public RequestServerHandler newHandler(
         WorkerRequestReservedMap workerRequestReservedMap,
         ImmutableClassesGiraphConfiguration conf,
-        TaskInfo myTaskInfo) {
+        TaskInfo myTaskInfo,
+        Thread.UncaughtExceptionHandler exceptionHandler) {
       return new MasterRequestServerHandler(workerRequestReservedMap, conf,
-          myTaskInfo, aggregatorHandler);
+          myTaskInfo, aggregatorHandler, exceptionHandler);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index b6d0533..d75870a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -56,6 +56,8 @@ public abstract class RequestServerHandler<R> extends
   private final TaskInfo myTaskInfo;
   /** Start nanoseconds for the processing time */
   private long startProcessingNanoseconds = -1;
+  /** Handler for uncaught exceptions */
+  private final Thread.UncaughtExceptionHandler exceptionHandler;
 
   /**
    * Constructor
@@ -63,14 +65,17 @@ public abstract class RequestServerHandler<R> extends
    * @param workerRequestReservedMap Worker request reservation map
    * @param conf Configuration
    * @param myTaskInfo Current task info
+   * @param exceptionHandler Handles uncaught exceptions
    */
   public RequestServerHandler(
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
-      TaskInfo myTaskInfo) {
+      TaskInfo myTaskInfo,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
     this.workerRequestReservedMap = workerRequestReservedMap;
     closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
     this.myTaskInfo = myTaskInfo;
+    this.exceptionHandler = exceptionHandler;
   }
 
   @Override
@@ -159,10 +164,9 @@ public abstract class RequestServerHandler<R> extends
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-    throws Exception {
-    LOG.warn("exceptionCaught: Channel failed with " +
-        "remote address " + ctx.channel().remoteAddress(), cause);
+  public void exceptionCaught(
+      ChannelHandlerContext ctx, Throwable cause) throws Exception {
+    exceptionHandler.uncaughtException(Thread.currentThread(), cause);
   }
 
   /**
@@ -175,11 +179,13 @@ public abstract class RequestServerHandler<R> extends
      * @param workerRequestReservedMap Worker request reservation map
      * @param conf Configuration to use
      * @param myTaskInfo Current task info
+     * @param exceptionHandler Handles uncaught exceptions
      * @return New {@link RequestServerHandler}
      */
     RequestServerHandler newHandler(
         WorkerRequestReservedMap workerRequestReservedMap,
         ImmutableClassesGiraphConfiguration conf,
-        TaskInfo myTaskInfo);
+        TaskInfo myTaskInfo,
+        Thread.UncaughtExceptionHandler exceptionHandler);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
index f64c373..574e413 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
@@ -46,12 +46,14 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
    * @param workerRequestReservedMap Worker request reservation map
    * @param conf                     Configuration
    * @param myTaskInfo               Current task info
+   * @param exceptionHandler         Handles uncaught exceptions
    */
   public WorkerRequestServerHandler(ServerData<I, V, E> serverData,
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
-      TaskInfo myTaskInfo) {
-    super(workerRequestReservedMap, conf, myTaskInfo);
+      TaskInfo myTaskInfo,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
+    super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
     this.serverData = serverData;
   }
 
@@ -80,9 +82,10 @@ public class WorkerRequestServerHandler<I extends WritableComparable,
     public RequestServerHandler newHandler(
         WorkerRequestReservedMap workerRequestReservedMap,
         ImmutableClassesGiraphConfiguration conf,
-        TaskInfo myTaskInfo) {
+        TaskInfo myTaskInfo,
+        Thread.UncaughtExceptionHandler exceptionHandler) {
       return new WorkerRequestServerHandler<I, V, E, Writable>(serverData,
-          workerRequestReservedMap, conf, myTaskInfo);
+          workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
index c86a024..6f748c5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphMapper.java
@@ -49,13 +49,15 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
   @Override
   public void setup(Context context)
     throws IOException, InterruptedException {
-    // Setting the default handler for uncaught exceptions.
-    Thread.setDefaultUncaughtExceptionHandler(
-        new OverrideExceptionHandler());
-
     // Execute all Giraph-related role(s) assigned to this compute node.
     // Roles can include "master," "worker," "zookeeper," or . . . ?
     graphTaskManager = new GraphTaskManager<I, V, E>(context);
+
+    // Setting the default handler for uncaught exceptions.
+    Thread.setDefaultUncaughtExceptionHandler(
+        graphTaskManager.createUncaughtExceptionHandler());
+
+
     graphTaskManager.setup(
       DistributedCache.getLocalCacheArchives(context.getConfiguration()));
   }
@@ -96,6 +98,7 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
       // CHECKSTYLE: stop IllegalCatch
     } catch (RuntimeException e) {
       // CHECKSTYLE: resume IllegalCatch
+      LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
       graphTaskManager.zooKeeperCleanup();
       graphTaskManager.workerFailureCleanup();
       throw new IllegalStateException(
@@ -103,16 +106,4 @@ public class GraphMapper<I extends WritableComparable, V extends Writable,
     }
   }
 
-  /**
-    * Default handler for uncaught exceptions.
-    */
-  class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
-    @Override
-    public void uncaughtException(final Thread t, final Throwable e) {
-      LOG.fatal(
-        "uncaughtException: OverrideExceptionHandler on thread " +
-         t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
-      System.exit(1);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index e13eedd..b2a5c84 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -911,7 +911,37 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     }
   }
 
+  /**
+   * Creates exception handler that will terminate process gracefully in case
+   * of any uncaught exception.
+   * @return new exception handler object.
+   */
+  public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
+    return new OverrideExceptionHandler();
+  }
+
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
     return conf;
   }
+
+
+  /**
+   * Default handler for uncaught exceptions.
+   * It will do the best to clean up and then will terminate current giraph job.
+   */
+  class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(final Thread t, final Throwable e) {
+      try {
+        LOG.fatal(
+            "uncaughtException: OverrideExceptionHandler on thread " +
+                t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
+
+        zooKeeperCleanup();
+        workerFailureCleanup();
+      } finally {
+        System.exit(1);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 02d4f2b..0275395 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -883,11 +883,13 @@ public class BspServiceMaster<I extends WritableComparable,
 
           masterInfo = new MasterInfo();
           masterServer =
-              new NettyMasterServer(getConfiguration(), this, getContext());
+              new NettyMasterServer(getConfiguration(), this, getContext(),
+                  getGraphTaskManager().createUncaughtExceptionHandler());
           masterInfo.setInetSocketAddress(masterServer.getMyAddress());
           masterInfo.setTaskId(getTaskPartition());
           masterClient =
-              new NettyMasterClient(getContext(), getConfiguration(), this);
+              new NettyMasterClient(getContext(), getConfiguration(), this,
+                  getGraphTaskManager().createUncaughtExceptionHandler());
 
           if (LOG.isInfoEnabled()) {
             LOG.info("becomeMaster: I am now the master!");
@@ -1397,8 +1399,7 @@ public class BspServiceMaster<I extends WritableComparable,
 
       // Did a worker die?
       try {
-        if ((getSuperstep() > 0) &&
-            !superstepChosenWorkerAlive(
+        if (!superstepChosenWorkerAlive(
                 workerInfoHealthyPath,
                 workerInfoList)) {
           return false;

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
new file mode 100644
index 0000000..a235ff4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Utility class for thread related functions.
+ */
+public class ThreadUtils {
+
+  /**
+   * Utility class. Do not inherit or create objects.
+   */
+  private ThreadUtils() { }
+
+  /**
+   * Creates new thread factory with specified thread name format.
+   *
+   * @param nameFormat defines naming format for threads created by
+   *                   thread factory
+   * @param exceptionHandler handles uncaught exceptions in all threads
+   *                         produced created thread factory
+   * @return new thread factory with specified thread name format and
+   * exception handler.
+   */
+  public static ThreadFactory createThreadFactory(
+      String nameFormat,
+      Thread.UncaughtExceptionHandler exceptionHandler) {
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder().
+        setNameFormat(nameFormat);
+    if (exceptionHandler != null) {
+      builder.setUncaughtExceptionHandler(exceptionHandler);
+    }
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index dbe6a45..de7af28 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -197,10 +197,12 @@ public class BspServiceWorker<I extends WritableComparable,
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
     workerInfo = new WorkerInfo();
-    workerServer = new NettyWorkerServer<I, V, E>(conf, this, context);
+    workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
+        graphTaskManager.createUncaughtExceptionHandler());
     workerInfo.setInetSocketAddress(workerServer.getMyAddress());
     workerInfo.setTaskId(getTaskPartition());
-    workerClient = new NettyWorkerClient<I, V, E>(context, conf, this);
+    workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
+        graphTaskManager.createUncaughtExceptionHandler());
 
     workerAggregatorRequestProcessor =
         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
index f4719cc..ccfc972 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
@@ -166,19 +166,6 @@ public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
   }
 
   /**
-    * Default handler for uncaught exceptions.
-    */
-  class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
-    @Override
-    public void uncaughtException(final Thread t, final Throwable e) {
-      LOG.fatal(
-        "uncaughtException: OverrideExceptionHandler on thread " +
-         t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
-      System.exit(1);
-    }
-  }
-
-  /**
    * Task entry point.
    * @param args CLI arguments injected by GiraphApplicationMaster to hand off
    *             job, task, and attempt ID's to this (and every) Giraph task.

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
index e771e36..5bc9ef0 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
@@ -70,11 +70,12 @@ public class ConnectionTest {
     NettyServer server =
         new NettyServer(conf,
             new WorkerRequestServerHandler.Factory(serverData), workerInfo,
-            context);
+            context, new MockExceptionHandler());
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
 
-    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     client.connectAllAddresses(
         Lists.<WorkerInfo>newArrayList(workerInfo));
 
@@ -101,7 +102,8 @@ public class ConnectionTest {
     WorkerInfo workerInfo1 = new WorkerInfo();
     workerInfo1.setTaskId(1);
     NettyServer server1 =
-        new NettyServer(conf, requestServerHandlerFactory, workerInfo1, context);
+        new NettyServer(conf, requestServerHandlerFactory, workerInfo1,
+            context, new MockExceptionHandler());
     server1.start();
     workerInfo1.setInetSocketAddress(server1.getMyAddress());
 
@@ -109,7 +111,7 @@ public class ConnectionTest {
     workerInfo1.setTaskId(2);
     NettyServer server2 =
         new NettyServer(conf, requestServerHandlerFactory, workerInfo2,
-            context);
+            context, new MockExceptionHandler());
     server2.start();
     workerInfo2.setInetSocketAddress(server2.getMyAddress());
 
@@ -117,11 +119,12 @@ public class ConnectionTest {
     workerInfo1.setTaskId(3);
     NettyServer server3 =
         new NettyServer(conf, requestServerHandlerFactory, workerInfo3,
-            context);
+            context, new MockExceptionHandler());
     server3.start();
     workerInfo3.setInetSocketAddress(server3.getMyAddress());
 
-    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
         workerInfo2, workerInfo3);
     client.connectAllAddresses(addresses);
@@ -148,16 +151,19 @@ public class ConnectionTest {
     WorkerInfo workerInfo = new WorkerInfo();
     NettyServer server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
-            context);
+            context, new MockExceptionHandler());
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
 
     List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo);
-    NettyClient client1 = new NettyClient(context, conf, new WorkerInfo());
+    NettyClient client1 = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     client1.connectAllAddresses(addresses);
-    NettyClient client2 = new NettyClient(context, conf, new WorkerInfo());
+    NettyClient client2 = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     client2.connectAllAddresses(addresses);
-    NettyClient client3 = new NettyClient(context, conf, new WorkerInfo());
+    NettyClient client3 = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     client3.connectAllAddresses(addresses);
 
     client1.stop();

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java b/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java
new file mode 100644
index 0000000..edd3fc0
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/MockExceptionHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+public class MockExceptionHandler implements Thread.UncaughtExceptionHandler{
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+    throw new RuntimeException(e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index 157a543..572e290 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -160,10 +160,11 @@ public class RequestFailureTest {
     WorkerInfo workerInfo = new WorkerInfo();
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
-            context);
+            context, new MockExceptionHandler());
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
-    client = new NettyClient(context, conf, new WorkerInfo());
+    client = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     client.connectAllAddresses(
         Lists.<WorkerInfo>newArrayList(workerInfo));
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 32454f4..8037db9 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -95,10 +95,11 @@ public class RequestTest {
     workerInfo = new WorkerInfo();
     server = new NettyServer(conf,
         new WorkerRequestServerHandler.Factory(serverData), workerInfo,
-            context);
+            context, new MockExceptionHandler());
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
-    client = new NettyClient(context, conf, new WorkerInfo());
+    client = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     client.connectAllAddresses(
         Lists.<WorkerInfo>newArrayList(workerInfo));
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/61cb37ec/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
index c026cf8..96ce062 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -85,11 +85,13 @@ public class SaslConnectionTest {
             new WorkerRequestServerHandler.Factory(serverData),
             workerInfo,
             context,
-            mockedSaslServerFactory);
+            mockedSaslServerFactory,
+            new MockExceptionHandler());
     server.start();
     workerInfo.setInetSocketAddress(server.getMyAddress());
 
-    NettyClient client = new NettyClient(context, conf, new WorkerInfo());
+    NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
+        new MockExceptionHandler());
     client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo));
 
     client.stop();