You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/03/02 20:05:06 UTC

git commit: CRUNCH-156: Refactor PipelineExecution interface to provide more info/control over running jobs. Contributed by Chao Shi.

Updated Branches:
  refs/heads/master 453bff6fe -> d36b69ab8


CRUNCH-156: Refactor PipelineExecution interface to provide more info/control over running jobs. Contributed by Chao Shi.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d36b69ab
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d36b69ab
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d36b69ab

Branch: refs/heads/master
Commit: d36b69ab8d07daaca3ffdad6082b26acd6d1e49b
Parents: 453bff6
Author: Josh Wills <jw...@apache.org>
Authored: Sat Mar 2 11:04:30 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Mar 2 11:04:30 2013 -0800

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/CancelJobsIT.java    |   53 +++--
 .../java/org/apache/crunch/PipelineExecution.java  |   34 +++-
 .../lib/jobcontrol/CrunchControlledJob.java        |    6 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java    |   36 +--
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |    8 +-
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |  198 +++++++++------
 6 files changed, 198 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
index ecdc4e0..ff01a2f 100644
--- a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
+++ b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
@@ -20,7 +20,7 @@ package org.apache.crunch;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.util.concurrent.CancellationException;
+import java.io.IOException;
 
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.To;
@@ -36,36 +36,49 @@ public class CancelJobsIT {
 
   @Rule
   public TemporaryPath tmpDir = TemporaryPaths.create();
-  
+
   @Test
   public void testRun() throws Exception {
-    run(false);
+    PipelineExecution pe = run();
+    pe.waitUntilDone();
+    PipelineResult pr = pe.getResult();
+    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+    assertEquals(2, pr.getStageResults().size());
   }
   
   @Test
-  public void testCancel() throws Exception {
-    run(true);
+  public void testKill() throws Exception {
+    PipelineExecution pe = run();
+    pe.kill();
+    pe.waitUntilDone();
+    assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+  }
+
+  @Test
+  public void testKillMultipleTimes() throws Exception {
+    PipelineExecution pe = run();
+    for (int i = 0; i < 10; i++) {
+      pe.kill();
+    }
+    pe.waitUntilDone();
+    assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+  }
+
+  @Test
+  public void testKillAfterDone() throws Exception {
+    PipelineExecution pe = run();
+    pe.waitUntilDone();
+    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+    pe.kill(); // expect no-op
+    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
   }
   
-  public void run(boolean cancel) throws Exception {
+  public PipelineExecution run() throws IOException {
     String shakes = tmpDir.copyResourceFileName("shakes.txt");
     String out = tmpDir.getFileName("cancel");
     Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration());
     PCollection<String> words = p.readTextFile(shakes);
     p.write(words.count().top(20), To.textFile(out));
-    PipelineExecution pe = p.runAsync();
-    if (cancel) {
-      boolean cancelled = false;
-      pe.cancel(true);
-      try {
-        pe.get();
-      } catch (CancellationException e) {
-        cancelled = true;
-      }
-      assertTrue(cancelled);
-    } else {
-      PipelineResult pr = pe.get();
-      assertEquals(2, pr.getStageResults().size());
-    }
+    return p.runAsync(); // need to hack to slow down job start up if this test becomes flaky.
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
index e74738f..fc6bb91 100644
--- a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
+++ b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -17,12 +17,38 @@
  */
 package org.apache.crunch;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
- * A handle that extends the {@code ListenableFuture} interface to allow clients
- * to control a Crunch pipeline as it runs.
+ * A handle to allow clients to control a Crunch pipeline as it runs.
+ *
+ * This interface is thread-safe.
  */
-public interface PipelineExecution extends ListenableFuture<PipelineResult> {
+public interface PipelineExecution {
+
+  enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
+
+  /** Returns the .dot file that allows a client to graph the Crunch execution plan for this
+   * pipeline.
+   */
   String getPlanDotFile();
+
+  /** Blocks until pipeline completes or the specified waiting time elapsed. */
+   void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException;
+
+   /** Blocks until pipeline completes, i.e. {@code SUCCEEDED}, {@code FAILED} or {@code KILLED}. */
+  void waitUntilDone() throws InterruptedException;
+
+  Status getStatus();
+
+  /** Retrieve the result of a pipeline if it has been completed, otherwise {@code null}. */
+  PipelineResult getResult();
+
+  /**
+   * Kills the pipeline if it is running, no-op otherwise.
+   *
+   * This method only delivers a kill signal to the pipeline, and does not guarantee the pipeline exits on return.
+   * To wait for completely exits, use {@link #waitUntilDone()} after this call.
+   */
+  void kill() throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index f30ada3..223673e 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -33,10 +33,10 @@ import org.apache.hadoop.util.StringUtils;
  * This class encapsulates a MapReduce job and its dependency. It monitors the
  * states of the depending jobs and updates the state of this job. A job starts
  * in the WAITING state. If it does not have any depending jobs, or all of the
- * depending jobs are in SUCCESS state, then the job state will become READY. If
+ * depending jobs are in SUCCEEDED state, then the job state will become READY. If
  * any depending jobs fail, the job will fail too. When in READY state, the job
  * can be submitted to Hadoop for execution, with the state changing into
- * RUNNING state. From RUNNING state, the job can get into SUCCESS or FAILED
+ * RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED
  * state, depending the status of the job execution.
  */
 public class CrunchControlledJob {
@@ -244,7 +244,7 @@ public class CrunchControlledJob {
 
   /**
    * Check the state of this running job. The state may remain the same, become
-   * SUCCESS or FAILED.
+   * SUCCEEDED or FAILED.
    */
   protected void checkRunningState() throws IOException, InterruptedException {
     try {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 9c958a2..d7c1a4f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -20,10 +20,7 @@ package org.apache.crunch.impl.mem;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -227,48 +224,37 @@ public class MemPipeline implements Pipeline {
       public String getPlanDotFile() {
         return "";
       }
-      
-      @Override
-      public void addListener(Runnable listener, Executor executor) {
-        lf.addListener(listener, executor);
-      }
 
       @Override
-      public boolean cancel(boolean mayInterruptIfRunning) {
-        return lf.cancel(mayInterruptIfRunning);
+      public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+        // no-po
       }
 
       @Override
-      public PipelineResult get() throws InterruptedException, ExecutionException {
-        return lf.get();
+      public void waitUntilDone() throws InterruptedException {
+        // no-po
       }
 
       @Override
-      public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException,
-          ExecutionException, TimeoutException {
-        return lf.get(timeout, unit);
+      public Status getStatus() {
+        return Status.SUCCEEDED;
       }
 
       @Override
-      public boolean isCancelled() {
-        return lf.isCancelled();
+      public PipelineResult getResult() {
+        return PipelineResult.EMPTY;
       }
 
       @Override
-      public boolean isDone() {
-        return lf.isDone();
+      public void kill() {
       }
     };
   }
   
   @Override
   public PipelineResult run() {
-    try {
-      return runAsync().get();
-    } catch (Exception e) {
-      LOG.error("Exception running pipeline", e);
-      return PipelineResult.EMPTY;
-    }
+    activeTargets.clear();
+    return PipelineResult.EMPTY;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 19110b3..00cf486 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Pipeline implementation that is executed within Hadoop MapReduce.
@@ -158,8 +157,11 @@ public class MRPipeline implements Pipeline {
   @Override
   public PipelineResult run() {
     try {
-      return runAsync().get();
-    } catch (Exception e) {
+      PipelineExecution pipelineExecution = runAsync();
+      pipelineExecution.waitUntilDone();
+      return pipelineExecution.getResult();
+    } catch (InterruptedException e) {
+      // TODO: How to handle this without changing signature?
       LOG.error("Exception running pipeline", e);
       return PipelineResult.EMPTY;
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/d36b69ab/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 791fe28..901f91a 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -20,6 +20,9 @@ package org.apache.crunch.impl.mr.exec;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,21 +36,24 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.materialize.MaterializableIterable;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  *
  *
  */
-public class MRExecutor {
+public class MRExecutor implements PipelineExecution {
 
   private static final Log LOG = LogFactory.getLog(MRExecutor.class);
 
   private final CrunchJobControl control;
   private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
   private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
-  
+  private final CountDownLatch doneSignal = new CountDownLatch(1);
+  private final CountDownLatch killSignal = new CountDownLatch(1);
+  private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
+  private PipelineResult result;
+  private Thread monitorThread;
+
   private String planDotFile;
   
   public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
@@ -55,6 +61,12 @@ public class MRExecutor {
     this.control = new CrunchJobControl(jarClass.toString());
     this.outputTargets = outputTargets;
     this.toMaterialize = toMaterialize;
+    this.monitorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        monitorLoop();
+      }
+    });
   }
 
   public void addJob(CrunchJob job) {
@@ -66,95 +78,117 @@ public class MRExecutor {
   }
   
   public PipelineExecution execute() {
-    FutureImpl fi = new FutureImpl();
-    fi.init();
-    return fi;
+    monitorThread.start();
+    return this;
   }
-  
-  private class FutureImpl extends AbstractFuture<PipelineResult> implements PipelineExecution {
-    @Override
-    public String getPlanDotFile() {
-      return planDotFile;
-    }
-    
-    public void init() {
-      Thread t = new Thread() {
-        @Override
-        public void run() {
-          try {
-            Thread controlThread = new Thread(control);
-            controlThread.start();
-            while (!control.allFinished()) {
-              Thread.sleep(1000);
-            }
-            control.stop();
-          } catch (InterruptedException e) {
-            setException(e);
-            return;
-          }
-          
-          List<CrunchControlledJob> failures = control.getFailedJobList();
-          if (!failures.isEmpty()) {
-            System.err.println(failures.size() + " job failure(s) occurred:");
-            for (CrunchControlledJob job : failures) {
-              System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
-            }
-          }
-          List<PipelineResult.StageResult> stages = Lists.newArrayList();
-          for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-            try {
-              stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
-            } catch (Exception e) {
-              LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
-            }
+
+  /** Monitors running status. It is called in {@code MonitorThread}. */
+  private void monitorLoop() {
+    try {
+      Thread controlThread = new Thread(control);
+      controlThread.start();
+      while (killSignal.getCount() > 0 && !control.allFinished()) {
+        killSignal.await(1, TimeUnit.SECONDS);
+      }
+      control.stop();
+      killAllRunningJobs();
+
+      List<CrunchControlledJob> failures = control.getFailedJobList();
+      if (!failures.isEmpty()) {
+        System.err.println(failures.size() + " job failure(s) occurred:");
+        for (CrunchControlledJob job : failures) {
+          System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
+        }
+      }
+      List<PipelineResult.StageResult> stages = Lists.newArrayList();
+      for (CrunchControlledJob job : control.getSuccessfulJobList()) {
+        try {
+          stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
+        } catch (Exception e) {
+          LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
+        }
+      }
+
+      for (PCollectionImpl<?> c : outputTargets.keySet()) {
+        if (toMaterialize.containsKey(c)) {
+          MaterializableIterable iter = toMaterialize.get(c);
+          if (iter.isSourceTarget()) {
+            iter.materialize();
+            c.materializeAt((SourceTarget) iter.getSource());
           }
-          
-          for (PCollectionImpl<?> c : outputTargets.keySet()) {
-            if (toMaterialize.containsKey(c)) {
-              MaterializableIterable iter = toMaterialize.get(c);
-              if (iter.isSourceTarget()) {
-                iter.materialize();
-                c.materializeAt((SourceTarget) iter.getSource());
-              }
-            } else {
-              boolean materialized = false;
-              for (Target t : outputTargets.get(c)) {
-                if (!materialized) {
-                  if (t instanceof SourceTarget) {
-                    c.materializeAt((SourceTarget) t);
-                    materialized = true;
-                  } else {
-                    SourceTarget st = t.asSourceTarget(c.getPType());
-                    if (st != null) {
-                      c.materializeAt(st);
-                      materialized = true;
-                    }
-                  }
+        } else {
+          boolean materialized = false;
+          for (Target t : outputTargets.get(c)) {
+            if (!materialized) {
+              if (t instanceof SourceTarget) {
+                c.materializeAt((SourceTarget) t);
+                materialized = true;
+              } else {
+                SourceTarget st = t.asSourceTarget(c.getPType());
+                if (st != null) {
+                  c.materializeAt(st);
+                  materialized = true;
                 }
               }
             }
           }
+        }
+      }
 
-          set(new PipelineResult(stages));
+      synchronized (this) {
+        result = new PipelineResult(stages);
+        if (killSignal.getCount() == 0) {
+          status.set(Status.KILLED);
+        } else {
+          status.set(result.succeeded() ? Status.SUCCEEDED : Status.FAILED);
         }
-      };
-      t.start();
-    }
-    
-    @Override
-    public void interruptTask() {
-      if (!control.allFinished()) {
-        control.stop();
       }
-      for (CrunchControlledJob job : control.getRunningJobList()) {
-        if (!job.isCompleted()) {
-          try {
-            job.killJob();
-          } catch (Exception e) {
-            LOG.error("Exception killing job: " + job.getJobName(), e);
-          }
+    } catch (InterruptedException e) {
+      throw new AssertionError(e); // Nobody should interrupt us.
+    } finally {
+      doneSignal.countDown();
+    }
+  }
+
+  private void killAllRunningJobs() {
+    for (CrunchControlledJob job : control.getRunningJobList()) {
+      if (!job.isCompleted()) {
+        try {
+          job.killJob();
+        } catch (Exception e) {
+          LOG.error("Exception killing job: " + job.getJobName(), e);
         }
       }
     }
   }
+
+  @Override
+  public String getPlanDotFile() {
+    return planDotFile;
+  }
+
+  @Override
+  public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+    doneSignal.await(timeout, timeUnit);
+  }
+
+  @Override
+  public void waitUntilDone() throws InterruptedException {
+    doneSignal.await();
+  }
+
+  @Override
+  public synchronized Status getStatus() {
+    return status.get();
+  }
+
+  @Override
+  public synchronized PipelineResult getResult() {
+    return result;
+  }
+
+  @Override
+  public void kill() throws InterruptedException {
+    killSignal.countDown();
+  }
 }