You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2019/05/08 20:50:46 UTC

[giraph] branch trunk updated: GIRAPH-1218

This is an automated email from the ASF dual-hosted git repository.

dionysios pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/giraph.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e827a56  GIRAPH-1218
e827a56 is described below

commit e827a5668a94d13800201191ebf7d34642728771
Author: Dionysios Logothetis <dl...@gmail.com>
AuthorDate: Wed May 8 13:50:21 2019 -0700

    GIRAPH-1218
    
    closes #101
---
 .../graph/RetryableJobProgressTrackerClient.java   | 86 ++++++++++++++++------
 1 file changed, 62 insertions(+), 24 deletions(-)

diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
index f51d765..369feec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java
@@ -19,9 +19,11 @@
 package org.apache.giraph.graph;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.job.ClientThriftServer;
 import org.apache.giraph.job.JobProgressTracker;
 import org.apache.giraph.master.MasterProgress;
+import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.worker.WorkerProgress;
 import org.apache.log4j.Logger;
 
@@ -42,12 +44,21 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
 
 /**
- * Wrapper around JobProgressTracker which retires to connect and swallows
+ * Wrapper around JobProgressTracker which retries to connect and swallows
  * exceptions so app wouldn't crash if something goes wrong with progress
  * reports.
  */
 public class RetryableJobProgressTrackerClient
     implements JobProgressTrackerClient {
+  /** Conf option for number of retries */
+  public static final IntConfOption RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES =
+    new IntConfOption("giraph.job.progress.client.num.retries", 1,
+      "Number of times to retry a failed operation");
+  /** Conf option for wait time between retries */
+  public static final IntConfOption
+    RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS =
+    new IntConfOption("giraph.job.progress.client.retries.wait", 1000,
+      "Time (msec) to wait between retries");
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(RetryableJobProgressTrackerClient.class);
@@ -57,6 +68,10 @@ public class RetryableJobProgressTrackerClient
   private ThriftClientManager clientManager;
   /** Job progress tracker */
   private JobProgressTracker jobProgressTracker;
+  /** Cached value for number of retries */
+  private int numRetries;
+  /** Cached value for wait time between retries */
+  private int retryWaitMsec;
 
   /**
    * Constructor
@@ -66,6 +81,8 @@ public class RetryableJobProgressTrackerClient
   public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
       ExecutionException, InterruptedException {
     this.conf = conf;
+    numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
+    retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
     resetConnection();
   }
 
@@ -110,7 +127,7 @@ public class RetryableJobProgressTrackerClient
       public void run() {
         jobProgressTracker.mapperStarted();
       }
-    });
+    }, numRetries);
   }
 
   @Override
@@ -120,7 +137,7 @@ public class RetryableJobProgressTrackerClient
       public void run() {
         jobProgressTracker.logInfo(logLine);
       }
-    });
+    }, numRetries);
   }
 
   @Override
@@ -131,7 +148,7 @@ public class RetryableJobProgressTrackerClient
       public void run() {
         jobProgressTracker.logError(logLine, exByteArray);
       }
-    });
+    }, numRetries);
   }
 
   @Override
@@ -141,7 +158,7 @@ public class RetryableJobProgressTrackerClient
       public void run() {
         jobProgressTracker.logFailure(reason);
       }
-    });
+    }, numRetries);
   }
 
   @Override
@@ -151,7 +168,7 @@ public class RetryableJobProgressTrackerClient
       public void run() {
         jobProgressTracker.updateProgress(workerProgress);
       }
-    });
+    }, numRetries);
   }
 
   @Override
@@ -161,50 +178,71 @@ public class RetryableJobProgressTrackerClient
       public void run() {
         jobProgressTracker.updateMasterProgress(masterProgress);
       }
-    });
+    }, numRetries);
   }
 
   /**
    * Execute Runnable, if disconnected try to connect again and retry
    *
    * @param runnable Runnable to execute
+   * @param numRetries Number of retries
    */
-  private void executeWithRetry(Runnable runnable) {
+  private void executeWithRetry(Runnable runnable, int numRetries) {
     try {
       runnable.run();
     } catch (RuntimeTTransportException | RejectedExecutionException te) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(te.getClass() + " occurred while talking to " +
-            "JobProgressTracker server, trying to reconnect", te);
+          "JobProgressTracker server, trying to reconnect", te);
       }
-      try {
+      for (int i = 0; i < numRetries; i++) {
         try {
-          clientManager.close();
+          ThreadUtils.trySleep(retryWaitMsec);
+          retry(runnable);
+          break; // If the retry succeeded, we simply break from the loop
+
           // CHECKSTYLE: stop IllegalCatch
         } catch (Exception e) {
           // CHECKSTYLE: resume IllegalCatch
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "Exception occurred while trying to close client manager", e);
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Exception occurred while talking to " +
+              "JobProgressTracker server after retry " + i +
+              " of " + numRetries, e);
           }
         }
-        resetConnection();
-        runnable.run();
-        // CHECKSTYLE: stop IllegalCatch
-      } catch (Exception e) {
-        // CHECKSTYLE: resume IllegalCatch
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Exception occurred while talking to " +
-              "JobProgressTracker server, giving up", e);
-        }
       }
       // CHECKSTYLE: stop IllegalCatch
     } catch (Exception e) {
       // CHECKSTYLE: resume IllegalCatch
       if (LOG.isInfoEnabled()) {
         LOG.info("Exception occurred while talking to " +
-            "JobProgressTracker server, giving up", e);
+          "JobProgressTracker server, giving up", e);
       }
     }
   }
+
+  /**
+   * Executes a single retry by closing the existing {@link #clientManager}
+   * connection, re-initializing it, and then executing the passed instance
+   * of {@link Runnable}.
+   *
+   * @param runnable Instance of {@link Runnable} to execute.
+   * @throws ExecutionException
+   * @throws InterruptedException
+   */
+  private void retry(Runnable runnable) throws ExecutionException,
+    InterruptedException {
+    try {
+      clientManager.close();
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Exception e) {
+      // CHECKSTYLE: resume IllegalCatch
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          "Exception occurred while trying to close client manager", e);
+      }
+    }
+    resetConnection();
+    runnable.run();
+  }
 }