You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/10/06 19:40:11 UTC

git commit: CRUNCH-273: Make PipelineExecution implement ListenableFuture

Updated Branches:
  refs/heads/master ecf5dd01c -> 7a8af2865


CRUNCH-273: Make PipelineExecution implement ListenableFuture


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7a8af286
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7a8af286
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7a8af286

Branch: refs/heads/master
Commit: 7a8af286529aeacb07bd7bc6daf313d5e3f31b49
Parents: ecf5dd0
Author: Josh Wills <jw...@apache.org>
Authored: Thu Oct 3 00:31:58 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Oct 6 10:37:24 2013 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/CancelJobsIT.java | 36 +++++++-
 .../org/apache/crunch/PipelineExecution.java    |  4 +-
 .../org/apache/crunch/impl/mem/MemPipeline.java | 97 +++++++++++++-------
 .../apache/crunch/impl/mr/exec/MRExecutor.java  | 42 ++++++++-
 4 files changed, 140 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
index ff01a2f..5fbca57 100644
--- a/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/CancelJobsIT.java
@@ -18,6 +18,7 @@
 package org.apache.crunch;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -45,7 +46,15 @@ public class CancelJobsIT {
     assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
     assertEquals(2, pr.getStageResults().size());
   }
-  
+
+  @Test
+  public void testGet() throws Exception {
+    PipelineExecution pe = run();
+    PipelineResult pr = pe.get();
+    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+    assertEquals(2, pr.getStageResults().size());
+  }
+
   @Test
   public void testKill() throws Exception {
     PipelineExecution pe = run();
@@ -55,6 +64,31 @@ public class CancelJobsIT {
   }
 
   @Test
+  public void testKillGet() throws Exception {
+    PipelineExecution pe = run();
+    pe.kill();
+    PipelineResult res = pe.get();
+    assertFalse(res.succeeded());
+    assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+  }
+
+  @Test
+  public void testCancelNoInterrupt() throws Exception {
+    PipelineExecution pe = run();
+    pe.cancel(false);
+    pe.waitUntilDone();
+    assertEquals(PipelineExecution.Status.SUCCEEDED, pe.getStatus());
+  }
+
+  @Test
+  public void testCancelMayInterrupt() throws Exception {
+    PipelineExecution pe = run();
+    pe.cancel(true);
+    pe.waitUntilDone();
+    assertEquals(PipelineExecution.Status.KILLED, pe.getStatus());
+  }
+
+  @Test
   public void testKillMultipleTimes() throws Exception {
     PipelineExecution pe = run();
     for (int i = 0; i < 10; i++) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
index fc6bb91..af6a177 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -17,6 +17,8 @@
  */
 package org.apache.crunch;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -24,7 +26,7 @@ import java.util.concurrent.TimeUnit;
  *
  * This interface is thread-safe.
  */
-public interface PipelineExecution {
+public interface PipelineExecution extends ListenableFuture<PipelineResult> {
 
   enum Status { READY, RUNNING, SUCCEEDED, FAILED, KILLED }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 60677fc..cc9ad69 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -20,8 +20,11 @@ package org.apache.crunch.impl.mem;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import com.google.common.util.concurrent.AbstractFuture;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericContainer;
@@ -306,44 +309,16 @@ public class MemPipeline implements Pipeline {
   @Override
   public PipelineExecution runAsync() {
     activeTargets.clear();
-    return new PipelineExecution() {
-      @Override
-      public String getPlanDotFile() {
-        return "";
-      }
-
-      @Override
-      public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
-        // no-po
-      }
-
-      @Override
-      public void waitUntilDone() throws InterruptedException {
-        // no-po
-      }
-
-      @Override
-      public Status getStatus() {
-        return Status.SUCCEEDED;
-      }
-
-      @Override
-      public PipelineResult getResult() {
-        return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)),
-            Status.SUCCEEDED);
-      }
-
-      @Override
-      public void kill() {
-      }
-    };
+    return new MemExecution();
   }
   
   @Override
   public PipelineResult run() {
-    activeTargets.clear();
-    return new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)),
-        PipelineExecution.Status.SUCCEEDED);
+    try {
+      return runAsync().get();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -360,4 +335,58 @@ public class MemPipeline implements Pipeline {
   public String getName() {
     return "Memory Pipeline";
   }
+
+  private static class MemExecution extends AbstractFuture<PipelineResult> implements PipelineExecution {
+
+    private PipelineResult res;
+
+    public MemExecution() {
+      this.res = new PipelineResult(
+          ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", COUNTERS)),
+          PipelineExecution.Status.SUCCEEDED);
+    }
+
+    @Override
+    public String getPlanDotFile() {
+      return "";
+    }
+
+    @Override
+    public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException {
+      set(res);
+    }
+
+    @Override
+    public void waitUntilDone() throws InterruptedException {
+      set(res);
+    }
+
+    @Override
+    public PipelineResult get() throws ExecutionException, InterruptedException {
+      set(res);
+      return super.get();
+    }
+
+    @Override
+    public PipelineResult get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException,
+        TimeoutException {
+      set(res);
+      return super.get(timeout, timeUnit);
+    }
+
+    @Override
+    public Status getStatus() {
+      return isDone() ? Status.SUCCEEDED : Status.READY;
+    }
+
+    @Override
+    public PipelineResult getResult() {
+      return isDone() ? res : null;
+    }
+
+    @Override
+    public void kill() {
+      // No-op
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7a8af286/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 532e37c..a655b23 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -19,6 +19,7 @@ package org.apache.crunch.impl.mr.exec;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractFuture;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.PipelineResult;
@@ -37,7 +38,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -49,7 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
  *
  * It is thread-safe.
  */
-public class MRExecutor implements MRPipelineExecution {
+public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipelineExecution {
 
   private static final Log LOG = LogFactory.getLog(MRExecutor.class);
 
@@ -62,6 +65,7 @@ public class MRExecutor implements MRPipelineExecution {
   private AtomicReference<Status> status = new AtomicReference<Status>(Status.READY);
   private PipelineResult result;
   private Thread monitorThread;
+  private boolean started;
 
   private String planDotFile;
   
@@ -89,13 +93,17 @@ public class MRExecutor implements MRPipelineExecution {
     this.planDotFile = planDotFile;
   }
   
-  public MRPipelineExecution execute() {
-    monitorThread.start();
+  public synchronized MRPipelineExecution execute() {
+    if (!started) {
+      monitorThread.start();
+      started = true;
+    }
     return this;
   }
 
   /** Monitors running status. It is called in {@code MonitorThread}. */
   private void monitorLoop() {
+    status.set(Status.RUNNING);
     try {
       while (killSignal.getCount() > 0 && !control.allFinished()) {
         control.pollJobStatusAndStartNewOnes();
@@ -150,12 +158,14 @@ public class MRExecutor implements MRPipelineExecution {
           status.set(Status.SUCCEEDED);
         }
         result = new PipelineResult(stages, status.get());
+        set(result);
       }
     } catch (InterruptedException e) {
       throw new AssertionError(e); // Nobody should interrupt us.
     } catch (IOException e) {
       LOG.error("Pipeline failed due to exception", e);
       status.set(Status.FAILED);
+      setException(e);
     } finally {
       doneSignal.countDown();
     }
@@ -177,6 +187,23 @@ public class MRExecutor implements MRPipelineExecution {
   }
 
   @Override
+  public PipelineResult get() throws InterruptedException, ExecutionException {
+    if (getStatus() == Status.READY) {
+      execute();
+    }
+    return super.get();
+  }
+
+  @Override
+  public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
+      ExecutionException {
+    if (getStatus() == Status.READY) {
+      execute();
+    }
+    return super.get(timeout, unit);
+  }
+
+  @Override
   public synchronized Status getStatus() {
     return status.get();
   }
@@ -191,6 +218,15 @@ public class MRExecutor implements MRPipelineExecution {
     killSignal.countDown();
   }
 
+  @Override
+  protected void interruptTask() {
+    try {
+      kill();
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private static boolean isLocalMode() {
     Configuration conf = new Configuration();
     // Try to handle MapReduce version 0.20 or 0.22