You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/10/18 21:35:51 UTC

git commit: updated refs/heads/trunk to 5e44c4e

Repository: giraph
Updated Branches:
  refs/heads/trunk 843cd6eea -> 5e44c4e4b


GIRAPH-1205

closes #88


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5e44c4e4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5e44c4e4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5e44c4e4

Branch: refs/heads/trunk
Commit: 5e44c4e4be97e93f5d13cdc7bd52b7374635398c
Parents: 843cd6e
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Oct 18 14:29:38 2018 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Thu Oct 18 14:29:38 2018 -0700

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 26 +++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/5e44c4e4/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 51887fe..83dd7f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -141,6 +141,9 @@ public class NettyClient {
   /** How many network requests were resent because channel failed */
   public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
       "Network requests resent for channel failure";
+  /** How many network requests were resent because connection failed */
+  public static final String NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
+      "Network requests resent for connection or request failure";
 
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
@@ -221,6 +224,8 @@ public class NettyClient {
   private final GiraphHadoopCounter networkRequestsResentForTimeout;
   /** How many network requests were resent because channel failed */
   private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+  /** How many network requests were resent because connection failed */
+  private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
 
   /**
    * Only constructor
@@ -266,6 +271,10 @@ public class NettyClient {
         new GiraphHadoopCounter(context.getCounter(
             NETTY_COUNTERS_GROUP,
             NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+    networkRequestsResentForConnectionFailure =
+      new GiraphHadoopCounter(context.getCounter(
+        NETTY_COUNTERS_GROUP,
+        NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
@@ -984,14 +993,19 @@ public class NettyClient {
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
-        ChannelFuture writeFuture = requestInfo.getWriteFuture();
-        // If not connected anymore, request failed, or the request is taking
-        // too long, re-establish and resend
-        return (writeFuture != null && (!writeFuture.channel().isActive() ||
-            (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
-            (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
+        // If the request is taking too long, re-establish and resend
+        return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
       }
     }, networkRequestsResentForTimeout);
+    resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
+      @Override
+      public boolean apply(RequestInfo requestInfo) {
+        ChannelFuture writeFuture = requestInfo.getWriteFuture();
+        // If not connected anymore or request failed re-establish and resend
+        return writeFuture != null && (!writeFuture.channel().isActive() ||
+            (writeFuture.isDone() && !writeFuture.isSuccess()));
+      }
+    }, networkRequestsResentForConnectionFailure);
   }
 
   /**