You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/09/18 19:25:49 UTC
git commit: updated refs/heads/trunk to 6128d66
Repository: giraph
Updated Branches:
refs/heads/trunk 239ea8fb3 -> 6128d66eb
JIRA-1200
closes #83
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/6128d66e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/6128d66e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/6128d66e
Branch: refs/heads/trunk
Commit: 6128d66eba5b1dfc0a5e047a057c33d00abac6e7
Parents: 239ea8f
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Sep 18 12:25:34 2018 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Sep 18 12:25:34 2018 -0700
----------------------------------------------------------------------
.../apache/giraph/comm/netty/NettyClient.java | 37 +++++++++++++++++---
1 file changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/6128d66e/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 2c38505..51887fe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -38,6 +38,7 @@ import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.BooleanConfOption;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.counters.GiraphHadoopCounter;
import org.apache.giraph.function.Predicate;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.master.MasterInfo;
@@ -131,6 +132,16 @@ public class NettyClient {
public static final AttributeKey<SaslNettyClient> SASL =
AttributeKey.valueOf("saslNettyClient");
/*end[HADOOP_NON_SECURE]*/
+
+ /** Group name for netty counters */
+ public static final String NETTY_COUNTERS_GROUP = "Netty counters";
+ /** How many network requests were resent because they took too long */
+ public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
+ "Network requests resent for timeout";
+ /** How many network requests were resent because channel failed */
+ public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
+ "Network requests resent for channel failure";
+
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
/** Context used to report progress */
@@ -206,6 +217,11 @@ public class NettyClient {
/** Flow control policy used */
private final FlowControl flowControl;
+ /** How many network requests were resent because they took too long */
+ private final GiraphHadoopCounter networkRequestsResentForTimeout;
+ /** How many network requests were resent because channel failed */
+ private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+
/**
* Only constructor
*
@@ -242,6 +258,15 @@ public class NettyClient {
flowControl = new NoOpFlowControl(this);
}
+ networkRequestsResentForTimeout =
+ new GiraphHadoopCounter(context.getCounter(
+ NETTY_COUNTERS_GROUP,
+ NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
+ networkRequestsResentForChannelFailure =
+ new GiraphHadoopCounter(context.getCounter(
+ NETTY_COUNTERS_GROUP,
+ NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+
maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
waitTimeBetweenConnectionRetriesMs =
@@ -966,17 +991,18 @@ public class NettyClient {
(writeFuture.isDone() && !writeFuture.isSuccess()))) ||
(requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
}
- });
+ }, networkRequestsResentForTimeout);
}
/**
* Resend requests which satisfy predicate
- *
- * @param shouldResendRequestPredicate Predicate to use to check whether
+ * @param shouldResendRequestPredicate Predicate to use to check whether
* request should be resent
+ * @param counter Counter to increment for every resent network request
*/
private void resendRequestsWhenNeeded(
- Predicate<RequestInfo> shouldResendRequestPredicate) {
+ Predicate<RequestInfo> shouldResendRequestPredicate,
+ GiraphHadoopCounter counter) {
// Check if there are open requests which have been sent a long time ago,
// and if so, resend them.
List<ClientRequestId> addedRequestIds = Lists.newArrayList();
@@ -1006,6 +1032,7 @@ public class NettyClient {
addedRequestIds.add(entry.getKey());
addedRequestInfos.add(new RequestInfo(
requestInfo.getDestinationAddress(), requestInfo.getRequest()));
+ counter.increment();
}
}
@@ -1093,7 +1120,7 @@ public class NettyClient {
return requestInfo.getDestinationAddress().equals(
channel.remoteAddress());
}
- });
+ }, networkRequestsResentForChannelFailure);
}
/**