You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:19 UTC

[36/47] git commit: updated refs/heads/release-1.1 to 4c139ee

GIRAPH-952: Limit job runtime


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/61db6891
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/61db6891
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/61db6891

Branch: refs/heads/release-1.1
Commit: 61db689128679b886753fed0b8f310b0ece9e0cf
Parents: da3c7b2
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Oct 2 15:28:23 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Oct 3 09:38:22 2014 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/conf/GiraphConstants.java | 10 +++++
 .../java/org/apache/giraph/job/GiraphJob.java   |  3 ++
 .../giraph/job/JobProgressTrackerService.java   | 42 ++++++++++++++++++++
 3 files changed, 55 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index d1fdf57..e78eb42 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -251,6 +251,16 @@ public interface GiraphConstants {
           "Class which decides whether a failed job should be retried - " +
               "optional");
 
+  /**
+   * Maximum allowed time for job to run after getting all resources before it
+   * will be killed, in milliseconds (-1 if it has no limit)
+   */
+  LongConfOption MAX_ALLOWED_JOB_TIME_MS =
+      new LongConfOption("giraph.maxAllowedJobTimeMilliseconds", -1,
+          "Maximum allowed time for job to run after getting all resources " +
+              "before it will be killed, in milliseconds " +
+              "(-1 if it has no limit)");
+
   // At least one of the input format classes is required.
   /** VertexInputFormat class */
   ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS =

http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 491d3d2..ca1ad1c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -254,6 +254,9 @@ public class GiraphJob {
       submittedJob.setMapperClass(GraphMapper.class);
       submittedJob.setInputFormatClass(BspInputFormat.class);
       submittedJob.setOutputFormatClass(BspOutputFormat.class);
+      if (jobProgressTrackerService != null) {
+        jobProgressTrackerService.setJob(submittedJob);
+      }
 
       GiraphJobObserver jobObserver = conf.getJobObserver();
       jobObserver.launchingJob(submittedJob);

http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
index 3a896e2..49610de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
@@ -19,7 +19,9 @@
 package org.apache.giraph.job;
 
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.worker.WorkerProgress;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
 
 import com.facebook.swift.codec.ThriftCodecManager;
@@ -28,6 +30,7 @@ import com.facebook.swift.service.ThriftServer;
 import com.facebook.swift.service.ThriftServerConfig;
 import com.facebook.swift.service.ThriftServiceProcessor;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Map;
@@ -58,6 +61,8 @@ public class JobProgressTrackerService implements JobProgressTracker {
   /** Map of worker progresses */
   private final Map<Integer, WorkerProgress> workerProgresses =
       new ConcurrentHashMap<>();
+  /** Job */
+  private Job job;
 
   /**
    * Constructor
@@ -107,12 +112,49 @@ public class JobProgressTrackerService implements JobProgressTracker {
     writerThread.start();
   }
 
+  public void setJob(Job job) {
+    this.job = job;
+  }
+
+  /**
+   * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
+   * and potentially start a thread which will kill the job after this time
+   */
+  private void jobGotAllMappers() {
+    final long maxAllowedJobTimeMs =
+        GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
+    if (maxAllowedJobTimeMs > 0) {
+      // Start a thread which will kill the job if running for too long
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            Thread.sleep(maxAllowedJobTimeMs);
+            try {
+              LOG.warn("Killing job because it took longer than " +
+                  maxAllowedJobTimeMs + " milliseconds");
+              job.killJob();
+            } catch (IOException e) {
+              LOG.warn("Failed to kill job", e);
+            }
+          } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Thread checking for jobs max allowed time " +
+                  "interrupted");
+            }
+          }
+        }
+      }).start();
+    }
+  }
+
   @Override
   public synchronized void mapperStarted() {
     mappersStarted++;
     if (LOG.isInfoEnabled()) {
       if (mappersStarted == conf.getMaxWorkers() + 1) {
         LOG.info("Got all " + mappersStarted + " mappers");
+        jobGotAllMappers();
       } else {
         if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
             UPDATE_MILLISECONDS) {