You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/12/11 23:32:31 UTC
git commit: updated refs/heads/trunk to aa740b5
Repository: giraph
Updated Branches:
refs/heads/trunk bb8dab546 -> aa740b54b
GIRAPH-1213
closes #96
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/aa740b54
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/aa740b54
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/aa740b54
Branch: refs/heads/trunk
Commit: aa740b54bd914a0246e2a9280459e33c233de9b3
Parents: bb8dab5
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Nov 29 11:35:53 2018 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Dec 11 15:31:18 2018 -0800
----------------------------------------------------------------------
.../apache/giraph/comm/netty/NettyClient.java | 25 ++++++++++++++++----
.../netty/handler/ResponseClientHandler.java | 5 ++--
2 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/aa740b54/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 74011b9..64c9c04 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -82,6 +82,7 @@ import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.AttributeKey;
/*end[HADOOP_NON_SECURE]*/
+import io.netty.util.concurrent.BlockingOperationException;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
@@ -755,7 +756,11 @@ public class NettyClient {
int reconnectFailures = 0;
while (reconnectFailures < maxConnectionFailures) {
ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
- ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+ try {
+ ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+ } catch (BlockingOperationException e) {
+ LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
+ }
if (connectionFuture.isSuccess()) {
if (LOG.isInfoEnabled()) {
LOG.info("getNextChannel: Connected to " + remoteServer + "!");
@@ -1052,7 +1057,8 @@ public class NettyClient {
writeFuture.channel().isActive() +
", future done = " + writeFuture.isDone() + ", " +
"success = " + writeFuture.isSuccess() + ", " +
- "cause = " + writeFuture.cause();
+ "cause = " + writeFuture.cause() + ", " +
+ "channelId = " + writeFuture.channel().hashCode();
}
LOG.warn("checkRequestsForProblems: Problem with request id " +
entry.getKey() + ", " + logMessage + ", " +
@@ -1080,6 +1086,11 @@ public class NettyClient {
LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
}
writeRequestToChannel(requestInfo);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("checkRequestsForProblems: Request " + requestId +
+ " was resent through channelId=" +
+ requestInfo.getWriteFuture().channel().hashCode());
+ }
}
addedRequestIds.clear();
addedRequestInfos.clear();
@@ -1147,8 +1158,11 @@ public class NettyClient {
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
- return requestInfo.getDestinationAddress().equals(
- channel.remoteAddress());
+ if (requestInfo.getWriteFuture() == null ||
+ requestInfo.getWriteFuture().channel() == null) {
+ return false;
+ }
+ return requestInfo.getWriteFuture().channel().equals(channel);
}
}, networkRequestsResentForChannelFailure, true);
}
@@ -1163,7 +1177,8 @@ public class NettyClient {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isDone() && !future.isSuccess()) {
- LOG.error("Request failed", future.cause());
+ LOG.error("Channel failed channelId=" + future.channel().hashCode(),
+ future.cause());
checkRequestsAfterChannelFailure(future.channel());
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/aa740b54/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 12dde3b..5c6f035 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -106,8 +106,9 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- LOG.warn("exceptionCaught: Channel failed with " +
- "remote address " + ctx.channel().remoteAddress(), cause);
+ LOG.warn("exceptionCaught: Channel channelId=" +
+ ctx.channel().hashCode() + " failed with remote address " +
+ ctx.channel().remoteAddress(), cause);
}
}