You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/06/21 22:51:59 UTC

git commit: updated refs/heads/trunk to 51f0937

Repository: giraph
Updated Branches:
  refs/heads/trunk 2185f5946 -> 51f093764


GIRAPH-1077: Jobs getting stuck after channel failure

Summary: When a channel fails currently we just log the failure. Since we don't wait on open requests from every place, checking requests doesn't get called always, and we've seen issues with jobs staying stuck, for example during the input stage when request for split to read from worker to master fails. When we know that channel failed, we should try to resend the requests from that channel.

Test Plan: Ran a job multiple times until I got failure of channel between master and worker to happen, without this change job would get stuck but with it it ran successfully.

Differential Revision: https://reviews.facebook.net/D59895


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/51f09376
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/51f09376
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/51f09376

Branch: refs/heads/trunk
Commit: 51f09376456ed8dadc2e801afaa495863fd7ee3b
Parents: 2185f59
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Jun 21 11:54:40 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Jun 21 11:56:22 2016 -0700

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 46 +++++++++++++++++---
 1 file changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/51f09376/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 217dba6..6afe329 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -38,6 +38,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.BooleanConfOption;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.function.Predicate;
 import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.master.MasterInfo;
 import org.apache.giraph.utils.PipelineUtils;
@@ -930,6 +931,27 @@ public class NettyClient {
         System.currentTimeMillis())) {
       return;
     }
+    resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
+      @Override
+      public boolean apply(RequestInfo requestInfo) {
+        ChannelFuture writeFuture = requestInfo.getWriteFuture();
+        // If not connected anymore, request failed, or the request is taking
+        // too long, re-establish and resend
+        return !writeFuture.channel().isActive() ||
+          (writeFuture.isDone() && !writeFuture.isSuccess()) ||
+          (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
+      }
+    });
+  }
+
+  /**
+   * Resend requests which satisfy predicate
+   *
+   * @param shouldResendRequestPredicate Predicate to use to check whether
+   *                                     request should be resent
+   */
+  private void resendRequestsWhenNeeded(
+      Predicate<RequestInfo> shouldResendRequestPredicate) {
     // Check if there are open requests which have been sent a long time ago,
     // and if so, resend them.
     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
@@ -943,11 +965,8 @@ public class NettyClient {
       if (writeFuture == null) {
         continue;
       }
-      // If not connected anymore, request failed, or the request is taking
-      // too long, re-establish and resend
-      if (!writeFuture.channel().isActive() ||
-          (writeFuture.isDone() && !writeFuture.isSuccess()) ||
-          (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {
+      // If request should be resent
+      if (shouldResendRequestPredicate.apply(requestInfo)) {
         LOG.warn("checkRequestsForProblems: Problem with request id " +
             entry.getKey() + " connected = " +
             writeFuture.channel().isActive() +
@@ -1044,16 +1063,31 @@ public class NettyClient {
   }
 
   /**
+   * Resend requests related to channel which failed
+   *
+   * @param future ChannelFuture of the failed channel
+   */
+  private void checkRequestsAfterChannelFailure(final ChannelFuture future) {
+    resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
+      @Override
+      public boolean apply(RequestInfo requestInfo) {
+        return requestInfo.getWriteFuture() == future;
+      }
+    });
+  }
+
+  /**
    * This listener class just dumps exception stack traces if
    * something happens.
    */
-  private static class LogOnErrorChannelFutureListener
+  private class LogOnErrorChannelFutureListener
       implements ChannelFutureListener {
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
       if (future.isDone() && !future.isSuccess()) {
         LOG.error("Request failed", future.cause());
+        checkRequestsAfterChannelFailure(future);
       }
     }
   }