You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/10/18 21:35:51 UTC
git commit: updated refs/heads/trunk to 5e44c4e
Repository: giraph
Updated Branches:
refs/heads/trunk 843cd6eea -> 5e44c4e4b
GIRAPH-1205
closes #88
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/5e44c4e4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/5e44c4e4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/5e44c4e4
Branch: refs/heads/trunk
Commit: 5e44c4e4be97e93f5d13cdc7bd52b7374635398c
Parents: 843cd6e
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Oct 18 14:29:38 2018 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Thu Oct 18 14:29:38 2018 -0700
----------------------------------------------------------------------
.../apache/giraph/comm/netty/NettyClient.java | 26 +++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/5e44c4e4/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 51887fe..83dd7f5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -141,6 +141,9 @@ public class NettyClient {
/** How many network requests were resent because channel failed */
public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
"Network requests resent for channel failure";
+ /** How many network requests were resent because connection failed */
+ public static final String NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
+ "Network requests resent for connection or request failure";
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
@@ -221,6 +224,8 @@ public class NettyClient {
private final GiraphHadoopCounter networkRequestsResentForTimeout;
/** How many network requests were resent because channel failed */
private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
+ /** How many network requests were resent because connection failed */
+ private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
/**
* Only constructor
@@ -266,6 +271,10 @@ public class NettyClient {
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
+ networkRequestsResentForConnectionFailure =
+ new GiraphHadoopCounter(context.getCounter(
+ NETTY_COUNTERS_GROUP,
+ NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));
maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
@@ -984,14 +993,19 @@ public class NettyClient {
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
- ChannelFuture writeFuture = requestInfo.getWriteFuture();
- // If not connected anymore, request failed, or the request is taking
- // too long, re-establish and resend
- return (writeFuture != null && (!writeFuture.channel().isActive() ||
- (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
- (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
+ // If the request is taking too long, re-establish and resend
+ return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
}
}, networkRequestsResentForTimeout);
+ resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
+ @Override
+ public boolean apply(RequestInfo requestInfo) {
+ ChannelFuture writeFuture = requestInfo.getWriteFuture();
+ // If not connected anymore or request failed re-establish and resend
+ return writeFuture != null && (!writeFuture.channel().isActive() ||
+ (writeFuture.isDone() && !writeFuture.isSuccess()));
+ }
+ }, networkRequestsResentForConnectionFailure);
}
/**