You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/05/04 16:39:33 UTC

git commit: updated refs/heads/trunk to 1c7552b

Repository: giraph
Updated Branches:
  refs/heads/trunk fd61fdad3 -> 1c7552b1a


GIRAPH-1058: Fix connection retry logic

Summary: Currently when we fail to connect to a channel we retry immediately and that retry most often fails. Add a short wait between retries, and improve the check for whether the channel connected successfully.

Test Plan: Ran multiple jobs which were often failing before the fix, with fix they worked

Differential Revision: https://reviews.facebook.net/D57447


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1c7552b1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1c7552b1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1c7552b1

Branch: refs/heads/trunk
Commit: 1c7552b1a3c2bbde15f98671c7b7c1424494c128
Parents: fd61fda
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Apr 29 13:23:29 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Apr 29 13:24:57 2016 -0700

----------------------------------------------------------------------
 .../apache/giraph/comm/netty/NettyClient.java   | 20 +++++++++++++++++++-
 .../org/apache/giraph/conf/GiraphConstants.java |  5 +++++
 src/site/xdoc/options.xml                       | 18 ++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 863449a..c185fdc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -91,6 +91,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTE
 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
 import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
 import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
+import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
 
 /**
@@ -166,6 +167,8 @@ public class NettyClient {
   private final float requestSizeWarningThreshold;
   /** Maximum number of connection failures */
   private final int maxConnectionFailures;
+  /** How long to wait before trying to reconnect failed connections */
+  private final long waitTimeBetweenConnectionRetriesMs;
   /** Maximum number of milliseconds for a request */
   private final int maxRequestMilliseconds;
   /** Waiting interval for checking outstanding requests msecs */
@@ -239,6 +242,8 @@ public class NettyClient {
 
     maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
     maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
+    waitTimeBetweenConnectionRetriesMs =
+        WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
     waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
     maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
     maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
@@ -462,11 +467,24 @@ public class NettyClient {
     int connected = 0;
     while (failures < maxConnectionFailures) {
       List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
+      boolean isFirstFailure = true;
       for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
         context.progress();
         ChannelFuture future = waitingConnection.future;
         ProgressableUtils.awaitChannelFuture(future, context);
-        if (!future.isSuccess()) {
+        if (!future.isSuccess() || !future.channel().isOpen()) {
+          // Make a short pause before trying to reconnect failed addresses
+          // again, but to do it just once per iterating through channels
+          if (isFirstFailure) {
+            isFirstFailure = false;
+            try {
+              Thread.sleep(waitTimeBetweenConnectionRetriesMs);
+            } catch (InterruptedException e) {
+              throw new IllegalStateException(
+                  "connectAllAddresses: InterruptedException occurred", e);
+            }
+          }
+
           LOG.warn("connectAllAddresses: Future failed " +
               "to connect with " + waitingConnection.address + " with " +
               failures + " failures because of " + future.cause());

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 8335e7e..15eca3c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -682,6 +682,11 @@ public interface GiraphConstants {
       new IntConfOption("giraph.nettyMaxConnectionFailures", 1000,
           "Netty max connection failures");
 
+  /** How long to wait before trying to reconnect failed connections */
+  IntConfOption WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS =
+      new IntConfOption("giraph.waitTimeBetweenConnectionRetriesMs", 500,
+          "");
+
   /** Initial port to start using for the IPC communication */
   IntConfOption IPC_INITIAL_PORT =
       new IntConfOption("giraph.ipcInitialPort", 30000,

http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/src/site/xdoc/options.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/options.xml b/src/site/xdoc/options.xml
index 687d30f..2735575 100644
--- a/src/site/xdoc/options.xml
+++ b/src/site/xdoc/options.xml
@@ -148,6 +148,12 @@ under the License.
          <td>Enable the Metrics system</td>
        </tr>
        <tr>
+         <td>giraph.nettyAutoRead</td>
+         <td>boolean</td>
+         <td>true</td>
+         <td>Whether netty should pro-actively read requests and feed them to its processing pipeline</td>
+       </tr>
+       <tr>
          <td>giraph.nettyClientUseExecutionHandler</td>
          <td>boolean</td>
          <td>true</td>
@@ -376,6 +382,12 @@ under the License.
          <td>Class which decides whether a failed job should be retried - optional</td>
        </tr>
        <tr>
+         <td>giraph.mapper.observers</td>
+         <td>class</td>
+         <td>null</td>
+         <td>Classes for Mapper Observer - optional</td>
+       </tr>
+       <tr>
          <td>giraph.mappingInputFormatClass</td>
          <td>class</td>
          <td>null</td>
@@ -820,6 +832,12 @@ under the License.
          <td>Maximum timeout (in ms) for waiting for all all tasks to complete</td>
        </tr>
        <tr>
+         <td>giraph.waitTimeBetweenConnectionRetriesMs</td>
+         <td>integer</td>
+         <td>500</td>
+         <td></td>
+       </tr>
+       <tr>
          <td>giraph.waitingRequestMsecs</td>
          <td>integer</td>
          <td>15000</td>