You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2017/04/04 16:39:08 UTC

git commit: updated refs/heads/trunk to c9621e3

Repository: giraph
Updated Branches:
  refs/heads/trunk e354e067e -> c9621e30b


JIRA-1141

closes #33


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c9621e30
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c9621e30
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c9621e30

Branch: refs/heads/trunk
Commit: c9621e30ba79b20322f449e24a2f438918ce33fa
Parents: e354e06
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Apr 4 09:38:43 2017 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Apr 4 09:38:43 2017 -0700

----------------------------------------------------------------------
 .../giraph/job/CombinedWorkerProgress.java      | 32 +++++++++++++++--
 .../job/DefaultJobProgressTrackerService.java   | 36 ++++++++++++++++++++
 2 files changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c9621e30/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
index 8cc16ec..882c4f4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/CombinedWorkerProgress.java
@@ -150,10 +150,13 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
     return workersDone == expectedWorkersDone;
   }
 
-  @Override
-  public String toString() {
+  /**
+   * Get string describing total job progress
+   *
+   * @return String describing total job progress
+   */
+  protected String getProgressString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
     if (isInputSuperstep()) {
       sb.append("Loading data: ");
       if (!masterProgress.vertexInputSplitsSet() ||
@@ -189,6 +192,14 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
       sb.append(partitionsStored).append(" out of ").append(
           partitionsToStore).append(" partitions stored");
     }
+    return sb.toString();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Data from ").append(workersInSuperstep).append(" workers - ");
+    sb.append(getProgressString());
     sb.append("; min free memory on worker ").append(
         workerWithMinFreeMemory).append(" - ").append(
         DECIMAL_FORMAT.format(minFreeMemoryMB)).append("MB, average ").append(
@@ -204,4 +215,19 @@ public class CombinedWorkerProgress extends WorkerProgressStats {
     }
     return sb.toString();
   }
+
+  /**
+   * Check if this instance made progress from another instance
+   *
+   * @param lastProgress Instance to compare with
+   * @return True iff progress was made
+   */
+  public boolean madeProgressFrom(CombinedWorkerProgress lastProgress) {
+    // If progress strings are different there was progress made
+    if (!getProgressString().equals(lastProgress.getProgressString())) {
+      return true;
+    }
+    // If more workers were done there was progress made
+    return workersDone != lastProgress.workersDone;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c9621e30/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
index ccd0fba..16eae1d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/DefaultJobProgressTrackerService.java
@@ -20,6 +20,7 @@ package org.apache.giraph.job;
 
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.IntConfOption;
 import org.apache.giraph.master.MasterProgress;
 import org.apache.giraph.utils.ThreadUtils;
 import org.apache.giraph.worker.WorkerProgress;
@@ -36,6 +37,12 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class DefaultJobProgressTrackerService
     implements JobProgressTrackerService {
+  /** Max time job is allowed to not make progress before getting killed */
+  public static final IntConfOption MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS =
+      new IntConfOption(
+          "giraph.maxAllowedTimeWithoutProgressMs",
+          3 * 60 * 60 * 1000, // Allow 3h
+          "Max time job is allowed to not make progress before getting killed");
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(JobProgressTrackerService.class);
@@ -81,6 +88,10 @@ public class DefaultJobProgressTrackerService
     writerThread = ThreadUtils.startThread(new Runnable() {
       @Override
       public void run() {
+        long lastTimeProgressChanged = -1;
+        long maxAllowedTimeWithoutProgress =
+            MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf);
+        CombinedWorkerProgress lastProgress = null;
         while (!finished) {
           if (mappersStarted == conf.getMaxWorkers() + 1 &&
               !workerProgresses.isEmpty()) {
@@ -95,6 +106,16 @@ public class DefaultJobProgressTrackerService
             if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) {
               break;
             }
+
+            if (lastProgress == null ||
+                combinedWorkerProgress.madeProgressFrom(lastProgress)) {
+              lastProgress = combinedWorkerProgress;
+              lastTimeProgressChanged = System.currentTimeMillis();
+            } else if (lastTimeProgressChanged +
+                maxAllowedTimeWithoutProgress < System.currentTimeMillis()) {
+              killTooLongJob();
+              break;
+            }
           }
           if (!ThreadUtils.trySleep(UPDATE_MILLISECONDS)) {
             break;
@@ -104,6 +125,21 @@ public class DefaultJobProgressTrackerService
     }, "progress-writer");
   }
 
+  /**
+   * Kill the job which was taking too long to make any progress
+   */
+  protected void killTooLongJob() {
+    // Job didn't make progress in too long, killing it
+    try {
+      LOG.error("Killing the job because it didn't make progress for " +
+          MAX_ALLOWED_TIME_WITHOUT_PROGRESS_MS.get(conf) / 1000 + "s");
+      job.killJob();
+    } catch (IOException e) {
+      LOG.error(
+          "Failed to kill the job which wasn't making progress", e);
+    }
+  }
+
   @Override
   public void setJob(Job job) {
     this.job = job;