You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:19 UTC
[36/47] git commit: updated refs/heads/release-1.1 to 4c139ee
GIRAPH-952: Limit job runtime
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/61db6891
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/61db6891
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/61db6891
Branch: refs/heads/release-1.1
Commit: 61db689128679b886753fed0b8f310b0ece9e0cf
Parents: da3c7b2
Author: Maja Kabiljo <ma...@fb.com>
Authored: Thu Oct 2 15:28:23 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Oct 3 09:38:22 2014 -0700
----------------------------------------------------------------------
.../org/apache/giraph/conf/GiraphConstants.java | 10 +++++
.../java/org/apache/giraph/job/GiraphJob.java | 3 ++
.../giraph/job/JobProgressTrackerService.java | 42 ++++++++++++++++++++
3 files changed, 55 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index d1fdf57..e78eb42 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -251,6 +251,16 @@ public interface GiraphConstants {
"Class which decides whether a failed job should be retried - " +
"optional");
+ /**
+ * Maximum allowed time for job to run after getting all resources before it
+ * will be killed, in milliseconds (-1 if it has no limit)
+ */
+ LongConfOption MAX_ALLOWED_JOB_TIME_MS =
+ new LongConfOption("giraph.maxAllowedJobTimeMilliseconds", -1,
+ "Maximum allowed time for job to run after getting all resources " +
+ "before it will be killed, in milliseconds " +
+ "(-1 if it has no limit)");
+
// At least one of the input format classes is required.
/** VertexInputFormat class */
ClassConfOption<VertexInputFormat> VERTEX_INPUT_FORMAT_CLASS =
http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
index 491d3d2..ca1ad1c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java
@@ -254,6 +254,9 @@ public class GiraphJob {
submittedJob.setMapperClass(GraphMapper.class);
submittedJob.setInputFormatClass(BspInputFormat.class);
submittedJob.setOutputFormatClass(BspOutputFormat.class);
+ if (jobProgressTrackerService != null) {
+ jobProgressTrackerService.setJob(submittedJob);
+ }
GiraphJobObserver jobObserver = conf.getJobObserver();
jobObserver.launchingJob(submittedJob);
http://git-wip-us.apache.org/repos/asf/giraph/blob/61db6891/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
index 3a896e2..49610de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java
@@ -19,7 +19,9 @@
package org.apache.giraph.job;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.worker.WorkerProgress;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;
import com.facebook.swift.codec.ThriftCodecManager;
@@ -28,6 +30,7 @@ import com.facebook.swift.service.ThriftServer;
import com.facebook.swift.service.ThriftServerConfig;
import com.facebook.swift.service.ThriftServiceProcessor;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Map;
@@ -58,6 +61,8 @@ public class JobProgressTrackerService implements JobProgressTracker {
/** Map of worker progresses */
private final Map<Integer, WorkerProgress> workerProgresses =
new ConcurrentHashMap<>();
+ /** Job */
+ private Job job;
/**
* Constructor
@@ -107,12 +112,49 @@ public class JobProgressTrackerService implements JobProgressTracker {
writerThread.start();
}
+ public void setJob(Job job) {
+ this.job = job;
+ }
+
+ /**
+ * Called when job got all mappers, used to check MAX_ALLOWED_JOB_TIME_MS
+ * and potentially start a thread which will kill the job after this time
+ */
+ private void jobGotAllMappers() {
+ final long maxAllowedJobTimeMs =
+ GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
+ if (maxAllowedJobTimeMs > 0) {
+ // Start a thread which will kill the job if running for too long
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(maxAllowedJobTimeMs);
+ try {
+ LOG.warn("Killing job because it took longer than " +
+ maxAllowedJobTimeMs + " milliseconds");
+ job.killJob();
+ } catch (IOException e) {
+ LOG.warn("Failed to kill job", e);
+ }
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Thread checking for jobs max allowed time " +
+ "interrupted");
+ }
+ }
+ }
+ }).start();
+ }
+ }
+
@Override
public synchronized void mapperStarted() {
mappersStarted++;
if (LOG.isInfoEnabled()) {
if (mappersStarted == conf.getMaxWorkers() + 1) {
LOG.info("Got all " + mappersStarted + " mappers");
+ jobGotAllMappers();
} else {
if (System.currentTimeMillis() - lastTimeMappersStartedLogged >
UPDATE_MILLISECONDS) {