You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/09/18 19:25:49 UTC

git commit: updated refs/heads/trunk to 6128d66

Repository: giraph
Updated Branches:
  refs/heads/trunk 239ea8fb3 -> 6128d66eb


JIRA-1200

closes #83


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6128d66e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6128d66e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6128d66e

Branch: refs/heads/trunk
Commit: 6128d66eba5b1dfc0a5e047a057c33d00abac6e7
Parents: 239ea8f
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Sep 18 12:25:34 2018 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Sep 18 12:25:34 2018 -0700

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 37 +++++++++++++++++---
 1 file changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/6128d66e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 2c38505..51887fe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -38,6 +38,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.BooleanConfOption;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.counters.GiraphHadoopCounter;
 import org.apache.giraph.function.Predicate;
 import org.apache.giraph.graph.TaskInfo;
 import org.apache.giraph.master.MasterInfo;
@@ -131,6 +132,16 @@ public class NettyClient {
   public static final AttributeKey<SaslNettyClient> SASL =
       AttributeKey.valueOf("saslNettyClient");
 /*end[HADOOP_NON_SECURE]*/
+
+  /** Group name for netty counters */
+  public static final String NETTY_COUNTERS_GROUP = "Netty counters";
+  /** How many network requests were resent because they took too long */
+  public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
+      "Network requests resent for timeout";
+  /** How many network requests were resent because channel failed */
+  public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
+      "Network requests resent for channel failure";
+
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(NettyClient.class);
   /** Context used to report progress */
@@ -206,6 +217,11 @@ public class NettyClient {
   /** Flow control policy used */
   private final FlowControl flowControl;
 
+  /** How many network requests were resent because they took too long */
+  private final GiraphHadoopCounter networkRequestsResentForTimeout;
+  /** How many network requests were resent because channel failed */
+  private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+
   /**
    * Only constructor
    *
@@ -242,6 +258,15 @@ public class NettyClient {
       flowControl = new NoOpFlowControl(this);
     }
 
+    networkRequestsResentForTimeout =
+        new GiraphHadoopCounter(context.getCounter(
+            NETTY_COUNTERS_GROUP,
+            NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
+    networkRequestsResentForChannelFailure =
+        new GiraphHadoopCounter(context.getCounter(
+            NETTY_COUNTERS_GROUP,
+            NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
     waitTimeBetweenConnectionRetriesMs =
@@ -966,17 +991,18 @@ public class NettyClient {
             (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
             (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
       }
-    });
+    }, networkRequestsResentForTimeout);
   }
 
   /**
    * Resend requests which satisfy predicate
-   *
-   * @param shouldResendRequestPredicate Predicate to use to check whether
+   *  @param shouldResendRequestPredicate Predicate to use to check whether
    *                                     request should be resent
+   * @param counter Counter to increment for every resent network request
    */
   private void resendRequestsWhenNeeded(
-      Predicate<RequestInfo> shouldResendRequestPredicate) {
+      Predicate<RequestInfo> shouldResendRequestPredicate,
+      GiraphHadoopCounter counter) {
     // Check if there are open requests which have been sent a long time ago,
     // and if so, resend them.
     List<ClientRequestId> addedRequestIds = Lists.newArrayList();
@@ -1006,6 +1032,7 @@ public class NettyClient {
         addedRequestIds.add(entry.getKey());
         addedRequestInfos.add(new RequestInfo(
             requestInfo.getDestinationAddress(), requestInfo.getRequest()));
+        counter.increment();
       }
     }
 
@@ -1093,7 +1120,7 @@ public class NettyClient {
         return requestInfo.getDestinationAddress().equals(
             channel.remoteAddress());
       }
-    });
+    }, networkRequestsResentForChannelFailure);
   }
 
   /**