You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2019/12/16 20:41:18 UTC

[giraph] branch trunk updated: GIRAPH-1230

This is an automated email from the ASF dual-hosted git repository.

dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f8d017e  GIRAPH-1230
f8d017e is described below

commit f8d017e61d66ec56b17ecf796743d6851c2f0988
Author: Dionysios Logothetis <dl...@gmail.com>
AuthorDate: Mon Dec 16 12:40:54 2019 -0800

    GIRAPH-1230
    
    closes #118
---
 checkstyle.xml                                     |  2 +-
 .../apache/giraph/comm/netty/ChannelRotater.java   |  9 ++++--
 .../org/apache/giraph/comm/netty/NettyClient.java  | 37 ++++++++++++----------
 .../org/apache/giraph/graph/GraphTaskManager.java  | 28 ++++++++++++++++
 .../org/apache/giraph/worker/BspServiceWorker.java | 12 ++++++-
 5 files changed, 66 insertions(+), 22 deletions(-)

diff --git a/checkstyle.xml b/checkstyle.xml
index e0e604c..f820d74 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -227,7 +227,7 @@
     </module>
       <!-- Over time, we will revised this down -->
     <module name="MethodLength">
-      <property name="max" value="200"/>
+      <property name="max" value="210"/>
     </module>
     <module name="ParameterNumber">
       <property name="max" value="8"/>
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
index 54c0b50..53af9c0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/ChannelRotater.java
@@ -25,12 +25,15 @@ import com.google.common.collect.Lists;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import org.apache.log4j.Logger;
 
 
 /**
  * Maintains multiple channels and rotates between them.  This is thread-safe.
  */
 public class ChannelRotater {
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(ChannelRotater.class);
   /** Index of last used channel */
   private int index = 0;
   /** Channel list */
@@ -73,9 +76,9 @@ public class ChannelRotater {
    */
   public synchronized Channel nextChannel() {
     if (channelList.isEmpty()) {
-      throw new IllegalArgumentException(
-          "nextChannel: No channels exist for hostname " +
-              address.getHostName());
+      LOG.warn("nextChannel: No channels exist for hostname " +
+        address.getHostName());
+      return null;
     }
 
     ++index;
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index c8ccea2..103a8ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -242,6 +242,11 @@ public class NettyClient {
   /** How many network requests were resent because connection failed */
   private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
 
+  /**
+   * Keeps track of the number of reconnect failures. Once this exceeds the
+   * value of {@link #maxConnectionFailures}, the job will fail.
+   */
+  private int reconnectFailures = 0;
 
   /**
    * Only constructor
@@ -764,26 +769,25 @@ public class NettyClient {
   private Channel getNextChannel(InetSocketAddress remoteServer) {
     Channel channel = addressChannelMap.get(remoteServer).nextChannel();
     if (channel == null) {
-      throw new IllegalStateException(
-          "getNextChannel: No channel exists for " + remoteServer);
-    }
-
-    // Return this channel if it is connected
-    if (channel.isActive()) {
-      return channel;
-    }
+      LOG.warn("getNextChannel: No channel exists for " + remoteServer);
+    } else {
+      // Return this channel if it is connected
+      if (channel.isActive()) {
+        return channel;
+      }
 
-    // Get rid of the failed channel
-    if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
-      LOG.warn("getNextChannel: Unlikely event that the channel " +
+      // Get rid of the failed channel
+      if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
+        LOG.warn("getNextChannel: Unlikely event that the channel " +
           channel + " was already removed!");
-    }
-    if (LOG.isInfoEnabled()) {
-      LOG.info("getNextChannel: Fixing disconnected channel to " +
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getNextChannel: Fixing disconnected channel to " +
           remoteServer + ", open = " + channel.isOpen() + ", " +
           "bound = " + channel.isRegistered());
+      }
     }
-    int reconnectFailures = 0;
+
     while (reconnectFailures < maxConnectionFailures) {
       ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
       try {
@@ -1205,7 +1209,7 @@ public class NettyClient {
    * This listener class just dumps exception stack traces if
    * something happens.
    */
-  private class LogOnErrorChannelFutureListener
+  private static class LogOnErrorChannelFutureListener
       implements ChannelFutureListener {
 
     @Override
@@ -1213,7 +1217,6 @@ public class NettyClient {
       if (future.isDone() && !future.isSuccess()) {
         LOG.error("Channel failed channelId=" + future.channel().hashCode(),
             future.cause());
-        checkRequestsAfterChannelFailure(future.channel());
       }
     }
   }
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 54d3084..6db1934 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -1075,6 +1075,18 @@ end[PURE_YARN]*/
             getConf()), getJobProgressTracker());
   }
 
+  /**
+   * Creates exception handler with the passed implementation of
+   * {@link CheckerIfWorkerShouldFailAfterException}.
+   *
+   * @param checker Instance that checks whether the job should fail.
+   * @return Exception handler.
+   */
+  public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler(
+    CheckerIfWorkerShouldFailAfterException checker) {
+    return new OverrideExceptionHandler(checker, getJobProgressTracker());
+  }
+
   public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
     return conf;
   }
@@ -1128,6 +1140,9 @@ end[PURE_YARN]*/
     @Override
     public void uncaughtException(final Thread t, final Throwable e) {
       if (!checker.checkIfWorkerShouldFail(t, e)) {
+        LOG.error(
+          "uncaughtException: OverrideExceptionHandler on thread " +
+            t.getName() + ", msg = " +  e.getMessage(), e);
         return;
       }
       try {
@@ -1169,4 +1184,17 @@ end[PURE_YARN]*/
       return true;
     }
   }
+
+  /**
+   * Checks the message of a throwable, and checks whether it is a
+   * "connection reset by peer" type of exception.
+   *
+   * @param throwable Throwable
+   * @return True if the throwable is a "connection reset by peer",
+   * false otherwise.
+   */
+  public static boolean isConnectionResetByPeer(Throwable throwable) {
+    return throwable.getMessage().startsWith(
+      "Connection reset by peer") ? true : false;
+  }
 }
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index b6756c9..a745b1e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -121,6 +121,8 @@ import org.json.JSONObject;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.giraph.graph.GraphTaskManager.isConnectionResetByPeer;
+
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
  *
@@ -217,7 +219,15 @@ public class BspServiceWorker<I extends WritableComparable,
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
     workerInfo = new WorkerInfo();
     workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
-        graphTaskManager.createUncaughtExceptionHandler());
+        graphTaskManager.createUncaughtExceptionHandler(
+          (thread, throwable) -> {
+            // If the connection was closed by the client, then we just log
+            // the error, we do not fail the job, since the client will
+            // attempt to reconnect.
+            return !isConnectionResetByPeer(throwable);
+          }
+        )
+    );
     workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
         workerServer.getLocalHostOrIp());
     workerInfo.setTaskId(getTaskId());