You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/05/04 16:39:33 UTC
git commit: updated refs/heads/trunk to 1c7552b
Repository: giraph
Updated Branches:
refs/heads/trunk fd61fdad3 -> 1c7552b1a
GIRAPH-1058: Fix connection retry logic
Summary: Currently when we fail to connect to a channel we retry immediately and that retry most often fails. Add a short wait between retries, and improve the check for whether the channel connected successfully.
Test Plan: Ran multiple jobs which were often failing before the fix, with fix they worked
Differential Revision: https://reviews.facebook.net/D57447
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1c7552b1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1c7552b1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1c7552b1
Branch: refs/heads/trunk
Commit: 1c7552b1a3c2bbde15f98671c7b7c1424494c128
Parents: fd61fda
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Apr 29 13:23:29 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Apr 29 13:24:57 2016 -0700
----------------------------------------------------------------------
.../apache/giraph/comm/netty/NettyClient.java | 20 +++++++++++++++++++-
.../org/apache/giraph/conf/GiraphConstants.java | 5 +++++
src/site/xdoc/options.xml | 18 ++++++++++++++++++
3 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 863449a..c185fdc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -91,6 +91,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTE
import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
+import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
/**
@@ -166,6 +167,8 @@ public class NettyClient {
private final float requestSizeWarningThreshold;
/** Maximum number of connection failures */
private final int maxConnectionFailures;
+ /** How long to wait before trying to reconnect failed connections */
+ private final long waitTimeBetweenConnectionRetriesMs;
/** Maximum number of milliseconds for a request */
private final int maxRequestMilliseconds;
/** Waiting interval for checking outstanding requests msecs */
@@ -239,6 +242,8 @@ public class NettyClient {
maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
+ waitTimeBetweenConnectionRetriesMs =
+ WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);
maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);
@@ -462,11 +467,24 @@ public class NettyClient {
int connected = 0;
while (failures < maxConnectionFailures) {
List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
+ boolean isFirstFailure = true;
for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
context.progress();
ChannelFuture future = waitingConnection.future;
ProgressableUtils.awaitChannelFuture(future, context);
- if (!future.isSuccess()) {
+ if (!future.isSuccess() || !future.channel().isOpen()) {
+ // Make a short pause before trying to reconnect failed addresses
+ // again, but to do it just once per iterating through channels
+ if (isFirstFailure) {
+ isFirstFailure = false;
+ try {
+ Thread.sleep(waitTimeBetweenConnectionRetriesMs);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "connectAllAddresses: InterruptedException occurred", e);
+ }
+ }
+
LOG.warn("connectAllAddresses: Future failed " +
"to connect with " + waitingConnection.address + " with " +
failures + " failures because of " + future.cause());
http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 8335e7e..15eca3c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -682,6 +682,11 @@ public interface GiraphConstants {
new IntConfOption("giraph.nettyMaxConnectionFailures", 1000,
"Netty max connection failures");
+ /** How long to wait before trying to reconnect failed connections */
+ IntConfOption WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS =
+ new IntConfOption("giraph.waitTimeBetweenConnectionRetriesMs", 500,
+ "");
+
/** Initial port to start using for the IPC communication */
IntConfOption IPC_INITIAL_PORT =
new IntConfOption("giraph.ipcInitialPort", 30000,
http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/src/site/xdoc/options.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/options.xml b/src/site/xdoc/options.xml
index 687d30f..2735575 100644
--- a/src/site/xdoc/options.xml
+++ b/src/site/xdoc/options.xml
@@ -148,6 +148,12 @@ under the License.
<td>Enable the Metrics system</td>
</tr>
<tr>
+ <td>giraph.nettyAutoRead</td>
+ <td>boolean</td>
+ <td>true</td>
+ <td>Whether netty should pro-actively read requests and feed them to its processing pipeline</td>
+ </tr>
+ <tr>
<td>giraph.nettyClientUseExecutionHandler</td>
<td>boolean</td>
<td>true</td>
@@ -376,6 +382,12 @@ under the License.
<td>Class which decides whether a failed job should be retried - optional</td>
</tr>
<tr>
+ <td>giraph.mapper.observers</td>
+ <td>class</td>
+ <td>null</td>
+ <td>Classes for Mapper Observer - optional</td>
+ </tr>
+ <tr>
<td>giraph.mappingInputFormatClass</td>
<td>class</td>
<td>null</td>
@@ -820,6 +832,12 @@ under the License.
<td>Maximum timeout (in ms) for waiting for all all tasks to complete</td>
</tr>
<tr>
+ <td>giraph.waitTimeBetweenConnectionRetriesMs</td>
+ <td>integer</td>
+ <td>500</td>
+ <td></td>
+ </tr>
+ <tr>
<td>giraph.waitingRequestMsecs</td>
<td>integer</td>
<td>15000</td>