You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2019/05/08 20:50:46 UTC
[giraph] branch trunk updated: GIRAPH-1218
This is an automated email from the ASF dual-hosted git repository.
dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push:
new e827a56 GIRAPH-1218
e827a56 is described below
commit e827a5668a94d13800201191ebf7d34642728771
Author: Dionysios Logothetis <dl...@gmail.com>
AuthorDate: Wed May 8 13:50:21 2019 -0700
GIRAPH-1218
closes #101
---
.../graph/RetryableJobProgressTrackerClient.java | 86 ++++++++++++++++------
1 file changed, 62 insertions(+), 24 deletions(-)
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index f51d765..369feec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -19,9 +19,11 @@
package org.apache.giraph.graph;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.job.ClientThriftServer;
import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.master.MasterProgress;
+import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.log4j.Logger;
@@ -42,12 +44,21 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
/**
- * Wrapper around JobProgressTracker which retires to connect and swallows
+ * Wrapper around JobProgressTracker which retries to connect and swallows
* exceptions so app wouldn't crash if something goes wrong with progress
* reports.
*/
public class RetryableJobProgressTrackerClient
implements JobProgressTrackerClient {
+ /** Conf option for number of retries */
+ public static final IntConfOption RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES =
+ new IntConfOption("giraph.job.progress.client.num.retries", 1,
+ "Number of times to retry a failed operation");
+ /** Conf option for wait time between retries */
+ public static final IntConfOption
+ RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS =
+ new IntConfOption("giraph.job.progress.client.retries.wait", 1000,
+ "Time (msec) to wait between retries");
/** Class logger */
private static final Logger LOG =
Logger.getLogger(RetryableJobProgressTrackerClient.class);
@@ -57,6 +68,10 @@ public class RetryableJobProgressTrackerClient
private ThriftClientManager clientManager;
/** Job progress tracker */
private JobProgressTracker jobProgressTracker;
+ /** Cached value for number of retries */
+ private int numRetries;
+ /** Cached value for wait time between retries */
+ private int retryWaitMsec;
/**
* Constructor
@@ -66,6 +81,8 @@ public class RetryableJobProgressTrackerClient
public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
ExecutionException, InterruptedException {
this.conf = conf;
+ numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
+ retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
resetConnection();
}
@@ -110,7 +127,7 @@ public class RetryableJobProgressTrackerClient
public void run() {
jobProgressTracker.mapperStarted();
}
- });
+ }, numRetries);
}
@Override
@@ -120,7 +137,7 @@ public class RetryableJobProgressTrackerClient
public void run() {
jobProgressTracker.logInfo(logLine);
}
- });
+ }, numRetries);
}
@Override
@@ -131,7 +148,7 @@ public class RetryableJobProgressTrackerClient
public void run() {
jobProgressTracker.logError(logLine, exByteArray);
}
- });
+ }, numRetries);
}
@Override
@@ -141,7 +158,7 @@ public class RetryableJobProgressTrackerClient
public void run() {
jobProgressTracker.logFailure(reason);
}
- });
+ }, numRetries);
}
@Override
@@ -151,7 +168,7 @@ public class RetryableJobProgressTrackerClient
public void run() {
jobProgressTracker.updateProgress(workerProgress);
}
- });
+ }, numRetries);
}
@Override
@@ -161,50 +178,71 @@ public class RetryableJobProgressTrackerClient
public void run() {
jobProgressTracker.updateMasterProgress(masterProgress);
}
- });
+ }, numRetries);
}
/**
* Execute Runnable, if disconnected try to connect again and retry
*
* @param runnable Runnable to execute
+ * @param numRetries Number of retries
*/
- private void executeWithRetry(Runnable runnable) {
+ private void executeWithRetry(Runnable runnable, int numRetries) {
try {
runnable.run();
} catch (RuntimeTTransportException | RejectedExecutionException te) {
if (LOG.isDebugEnabled()) {
LOG.debug(te.getClass() + " occurred while talking to " +
- "JobProgressTracker server, trying to reconnect", te);
+ "JobProgressTracker server, trying to reconnect", te);
}
- try {
+ for (int i = 0; i < numRetries; i++) {
try {
- clientManager.close();
+ ThreadUtils.trySleep(retryWaitMsec);
+ retry(runnable);
+ break; // If the retry succeeded, we simply break from the loop
+
// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Exception occurred while trying to close client manager", e);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Exception occurred while talking to " +
+ "JobProgressTracker server after retry " + i +
+ " of " + numRetries, e);
}
}
- resetConnection();
- runnable.run();
- // CHECKSTYLE: stop IllegalCatch
- } catch (Exception e) {
- // CHECKSTYLE: resume IllegalCatch
- if (LOG.isInfoEnabled()) {
- LOG.info("Exception occurred while talking to " +
- "JobProgressTracker server, giving up", e);
- }
}
// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
if (LOG.isInfoEnabled()) {
LOG.info("Exception occurred while talking to " +
- "JobProgressTracker server, giving up", e);
+ "JobProgressTracker server, giving up", e);
}
}
}
+
+ /**
+ * Executes a single retry by closing the existing {@link #clientManager}
+ * connection, re-initializing it, and then executing the passed instance
+ * of {@link Runnable}.
+ *
+ * @param runnable Instance of {@link Runnable} to execute.
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ private void retry(Runnable runnable) throws ExecutionException,
+ InterruptedException {
+ try {
+ clientManager.close();
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (Exception e) {
+ // CHECKSTYLE: resume IllegalCatch
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Exception occurred while trying to close client manager", e);
+ }
+ }
+ resetConnection();
+ runnable.run();
+ }
}