You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2018/12/11 23:32:31 UTC

git commit: updated refs/heads/trunk to aa740b5

Repository: giraph
Updated Branches:
  refs/heads/trunk bb8dab546 -> aa740b54b


GIRAPH-1213

closes #96


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/aa740b54
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/aa740b54
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/aa740b54

Branch: refs/heads/trunk
Commit: aa740b54bd914a0246e2a9280459e33c233de9b3
Parents: bb8dab5
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Nov 29 11:35:53 2018 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Dec 11 15:31:18 2018 -0800

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 25 ++++++++++++++++----
 .../netty/handler/ResponseClientHandler.java    |  5 ++--
 2 files changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/aa740b54/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 74011b9..64c9c04 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -82,6 +82,7 @@ import io.netty.handler.codec.FixedLengthFrameDecoder;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import io.netty.util.AttributeKey;
 /*end[HADOOP_NON_SECURE]*/
+import io.netty.util.concurrent.BlockingOperationException;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
 
@@ -755,7 +756,11 @@ public class NettyClient {
     int reconnectFailures = 0;
     while (reconnectFailures < maxConnectionFailures) {
       ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
-      ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+      try {
+        ProgressableUtils.awaitChannelFuture(connectionFuture, context);
+      } catch (BlockingOperationException e) {
+        LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
+      }
       if (connectionFuture.isSuccess()) {
         if (LOG.isInfoEnabled()) {
           LOG.info("getNextChannel: Connected to " + remoteServer + "!");
@@ -1052,7 +1057,8 @@ public class NettyClient {
               writeFuture.channel().isActive() +
               ", future done = " + writeFuture.isDone() + ", " +
               "success = " + writeFuture.isSuccess() + ", " +
-              "cause = " + writeFuture.cause();
+              "cause = " + writeFuture.cause() + ", " +
+              "channelId = " + writeFuture.channel().hashCode();
         }
         LOG.warn("checkRequestsForProblems: Problem with request id " +
             entry.getKey() + ", " + logMessage + ", " +
@@ -1080,6 +1086,11 @@ public class NettyClient {
         LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
       }
       writeRequestToChannel(requestInfo);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("checkRequestsForProblems: Request " + requestId +
+            " was resent through channelId=" +
+            requestInfo.getWriteFuture().channel().hashCode());
+      }
     }
     addedRequestIds.clear();
     addedRequestInfos.clear();
@@ -1147,8 +1158,11 @@ public class NettyClient {
     resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
       @Override
       public boolean apply(RequestInfo requestInfo) {
-        return requestInfo.getDestinationAddress().equals(
-            channel.remoteAddress());
+        if (requestInfo.getWriteFuture() == null ||
+            requestInfo.getWriteFuture().channel() == null) {
+          return false;
+        }
+        return requestInfo.getWriteFuture().channel().equals(channel);
       }
     }, networkRequestsResentForChannelFailure, true);
   }
@@ -1163,7 +1177,8 @@ public class NettyClient {
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
       if (future.isDone() && !future.isSuccess()) {
-        LOG.error("Request failed", future.cause());
+        LOG.error("Channel failed channelId=" + future.channel().hashCode(),
+            future.cause());
         checkRequestsAfterChannelFailure(future.channel());
       }
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/aa740b54/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 12dde3b..5c6f035 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -106,8 +106,9 @@ public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
     throws Exception {
-    LOG.warn("exceptionCaught: Channel failed with " +
-        "remote address " + ctx.channel().remoteAddress(), cause);
+    LOG.warn("exceptionCaught: Channel channelId=" +
+        ctx.channel().hashCode() + " failed with remote address " +
+        ctx.channel().remoteAddress(), cause);
   }
 }