You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/02/27 21:19:50 UTC

git commit: CRUNCH-156: Add a runAsync option to Pipeline that returns a PipelineExecution interface that extends ListenableFuture and allows client's to interactively control running pipelines.

Updated Branches:
  refs/heads/master c0564f181 -> 236d97dbb


CRUNCH-156: Add a runAsync option to Pipeline that returns a PipelineExecution interface
that extends ListenableFuture and allows client's to interactively control running pipelines.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/236d97db
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/236d97db
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/236d97db

Branch: refs/heads/master
Commit: 236d97dbb249b3c4b98ec893893e2d2c591d0ae0
Parents: c0564f1
Author: Josh Wills <jw...@apache.org>
Authored: Tue Feb 26 20:14:49 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Feb 27 11:54:19 2013 -0800

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/CancelJobsIT.java    |   71 ++++++++
 .../src/main/java/org/apache/crunch/Pipeline.java  |   10 +
 .../java/org/apache/crunch/PipelineExecution.java  |   28 +++
 .../lib/jobcontrol/CrunchControlledJob.java        |    1 -
 .../org/apache/crunch/impl/mem/MemPipeline.java    |   58 ++++++-
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |   49 ++----
 .../org/apache/crunch/impl/mr/exec/MRExecutor.java |  134 ++++++++++++---
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |   14 +-
 8 files changed, 302 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
new file mode 100644
index 0000000..ecdc4e0
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/CancelJobsIT.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CancellationException;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class CancelJobsIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  
+  @Test
+  public void testRun() throws Exception {
+    run(false);
+  }
+  
+  @Test
+  public void testCancel() throws Exception {
+    run(true);
+  }
+  
+  public void run(boolean cancel) throws Exception {
+    String shakes = tmpDir.copyResourceFileName("shakes.txt");
+    String out = tmpDir.getFileName("cancel");
+    Pipeline p = new MRPipeline(CancelJobsIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> words = p.readTextFile(shakes);
+    p.write(words.count().top(20), To.textFile(out));
+    PipelineExecution pe = p.runAsync();
+    if (cancel) {
+      boolean cancelled = false;
+      pe.cancel(true);
+      try {
+        pe.get();
+      } catch (CancellationException e) {
+        cancelled = true;
+      }
+      assertTrue(cancelled);
+    } else {
+      PipelineResult pr = pe.get();
+      assertEquals(2, pr.getStageResults().size());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
index af1d86a..9540eac 100644
--- a/crunch/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java
@@ -19,6 +19,8 @@ package org.apache.crunch;
 
 import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 /**
  * Manages the state of a pipeline execution.
  * 
@@ -107,6 +109,14 @@ public interface Pipeline {
   PipelineResult run();
 
   /**
+   * Constructs and starts a series of MapReduce jobs in order ot write data to
+   * the output targets, but returns a {@code ListenableFuture} to allow clients to control
+   * job execution.
+   * @return
+   */
+  PipelineExecution runAsync();
+  
+  /**
    * Run any remaining jobs required to generate outputs and then clean up any
    * intermediate data files that were created in this run or previous calls to
    * {@code run}.

http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
new file mode 100644
index 0000000..e74738f
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/PipelineExecution.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A handle that extends the {@code ListenableFuture} interface to allow clients
+ * to control a Crunch pipeline as it runs.
+ */
+public interface PipelineExecution extends ListenableFuture<PipelineResult> {
+  String getPlanDotFile();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index 396ea2d..f30ada3 100644
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;

http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 488cdd9..9c958a2 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -20,6 +20,10 @@ package org.apache.crunch.impl.mem;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +32,7 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
@@ -49,6 +54,8 @@ import org.apache.hadoop.mapreduce.Counters;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 public class MemPipeline implements Pipeline {
 
@@ -212,9 +219,56 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
-  public PipelineResult run() {
+  public PipelineExecution runAsync() {
     activeTargets.clear();
-    return PipelineResult.EMPTY;
+    final ListenableFuture<PipelineResult> lf = Futures.immediateFuture(PipelineResult.EMPTY);
+    return new PipelineExecution() {
+      @Override
+      public String getPlanDotFile() {
+        return "";
+      }
+      
+      @Override
+      public void addListener(Runnable listener, Executor executor) {
+        lf.addListener(listener, executor);
+      }
+
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        return lf.cancel(mayInterruptIfRunning);
+      }
+
+      @Override
+      public PipelineResult get() throws InterruptedException, ExecutionException {
+        return lf.get();
+      }
+
+      @Override
+      public PipelineResult get(long timeout, TimeUnit unit) throws InterruptedException,
+          ExecutionException, TimeoutException {
+        return lf.get(timeout, unit);
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return lf.isCancelled();
+      }
+
+      @Override
+      public boolean isDone() {
+        return lf.isDone();
+      }
+    };
+  }
+  
+  @Override
+  public PipelineResult run() {
+    try {
+      return runAsync().get();
+    } catch (Exception e) {
+      LOG.error("Exception running pipeline", e);
+      return PipelineResult.EMPTY;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 2d4d137..19110b3 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -29,6 +29,7 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
@@ -58,6 +59,7 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  * Pipeline implementation that is executed within Hadoop MapReduce.
@@ -138,7 +140,14 @@ public class MRPipeline implements Pipeline {
   }
 
   public MRExecutor plan() {
-    MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
+    Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize = Maps.newHashMap();
+    for (PCollectionImpl<?> c : outputTargets.keySet()) {
+      if (outputTargetsToMaterialize.containsKey(c)) {
+        toMaterialize.put(c, outputTargetsToMaterialize.get(c));
+        outputTargetsToMaterialize.remove(c);
+      }
+    }
+    MSCRPlanner planner = new MSCRPlanner(this, outputTargets, toMaterialize);
     try {
       return planner.plan(jarClass, conf);
     } catch (IOException e) {
@@ -148,39 +157,17 @@ public class MRPipeline implements Pipeline {
 
   @Override
   public PipelineResult run() {
-    PipelineResult res = null;
     try {
-      res = plan().execute();
-    } catch (CrunchRuntimeException e) {
-      LOG.error(e);
+      return runAsync().get();
+    } catch (Exception e) {
+      LOG.error("Exception running pipeline", e);
       return PipelineResult.EMPTY;
     }
-    for (PCollectionImpl<?> c : outputTargets.keySet()) {
-      if (outputTargetsToMaterialize.containsKey(c)) {
-        MaterializableIterable iter = outputTargetsToMaterialize.get(c);
-        if (iter.isSourceTarget()) {
-          iter.materialize();
-          c.materializeAt((SourceTarget) iter.getSource());
-        }
-        outputTargetsToMaterialize.remove(c);
-      } else {
-        boolean materialized = false;
-        for (Target t : outputTargets.get(c)) {
-          if (!materialized) {
-            if (t instanceof SourceTarget) {
-              c.materializeAt((SourceTarget) t);
-              materialized = true;
-            } else {
-              SourceTarget st = t.asSourceTarget(c.getPType());
-              if (st != null) {
-                c.materializeAt(st);
-                materialized = true;
-              }
-            }
-          }
-        }
-      }
-    }
+  }
+  
+  @Override
+  public PipelineExecution runAsync() {
+    PipelineExecution res = plan().execute();
     outputTargets.clear();
     return res;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 9811600..791fe28 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -18,14 +18,23 @@
 package org.apache.crunch.impl.mr.exec;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.Target;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
+import org.apache.crunch.impl.mr.collect.PCollectionImpl;
+import org.apache.crunch.materialize.MaterializableIterable;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 
 /**
  *
@@ -36,41 +45,116 @@ public class MRExecutor {
   private static final Log LOG = LogFactory.getLog(MRExecutor.class);
 
   private final CrunchJobControl control;
-
-  public MRExecutor(Class<?> jarClass) {
+  private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
+  private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  
+  private String planDotFile;
+  
+  public MRExecutor(Class<?> jarClass, Map<PCollectionImpl<?>, Set<Target>> outputTargets,
+      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
     this.control = new CrunchJobControl(jarClass.toString());
+    this.outputTargets = outputTargets;
+    this.toMaterialize = toMaterialize;
   }
 
   public void addJob(CrunchJob job) {
     this.control.addJob(job);
   }
 
-  public PipelineResult execute() {
-    try {
-      Thread controlThread = new Thread(control);
-      controlThread.start();
-      while (!control.allFinished()) {
-        Thread.sleep(1000);
-      }
-      control.stop();
-    } catch (InterruptedException e) {
-      LOG.info(e);
+  public void setPlanDotFile(String planDotFile) {
+    this.planDotFile = planDotFile;
+  }
+  
+  public PipelineExecution execute() {
+    FutureImpl fi = new FutureImpl();
+    fi.init();
+    return fi;
+  }
+  
+  private class FutureImpl extends AbstractFuture<PipelineResult> implements PipelineExecution {
+    @Override
+    public String getPlanDotFile() {
+      return planDotFile;
     }
-    List<CrunchControlledJob> failures = control.getFailedJobList();
-    if (!failures.isEmpty()) {
-      System.err.println(failures.size() + " job failure(s) occurred:");
-      for (CrunchControlledJob job : failures) {
-        System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
-      }
+    
+    public void init() {
+      Thread t = new Thread() {
+        @Override
+        public void run() {
+          try {
+            Thread controlThread = new Thread(control);
+            controlThread.start();
+            while (!control.allFinished()) {
+              Thread.sleep(1000);
+            }
+            control.stop();
+          } catch (InterruptedException e) {
+            setException(e);
+            return;
+          }
+          
+          List<CrunchControlledJob> failures = control.getFailedJobList();
+          if (!failures.isEmpty()) {
+            System.err.println(failures.size() + " job failure(s) occurred:");
+            for (CrunchControlledJob job : failures) {
+              System.err.println(job.getJobName() + "(" + job.getJobID() + "): " + job.getMessage());
+            }
+          }
+          List<PipelineResult.StageResult> stages = Lists.newArrayList();
+          for (CrunchControlledJob job : control.getSuccessfulJobList()) {
+            try {
+              stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
+            } catch (Exception e) {
+              LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
+            }
+          }
+          
+          for (PCollectionImpl<?> c : outputTargets.keySet()) {
+            if (toMaterialize.containsKey(c)) {
+              MaterializableIterable iter = toMaterialize.get(c);
+              if (iter.isSourceTarget()) {
+                iter.materialize();
+                c.materializeAt((SourceTarget) iter.getSource());
+              }
+            } else {
+              boolean materialized = false;
+              for (Target t : outputTargets.get(c)) {
+                if (!materialized) {
+                  if (t instanceof SourceTarget) {
+                    c.materializeAt((SourceTarget) t);
+                    materialized = true;
+                  } else {
+                    SourceTarget st = t.asSourceTarget(c.getPType());
+                    if (st != null) {
+                      c.materializeAt(st);
+                      materialized = true;
+                    }
+                  }
+                }
+              }
+            }
+          }
+
+          set(new PipelineResult(stages));
+        }
+      };
+      t.start();
     }
-    List<PipelineResult.StageResult> stages = Lists.newArrayList();
-    for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-      try {
-        stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters()));
-      } catch (Exception e) {
-        LOG.error("Exception thrown fetching job counters for stage: " + job.getJobName(), e);
+    
+    @Override
+    public void interruptTask() {
+      if (!control.allFinished()) {
+        control.stop();
+      }
+      for (CrunchControlledJob job : control.getRunningJobList()) {
+        if (!job.isCompleted()) {
+          try {
+            job.killJob();
+          } catch (Exception e) {
+            LOG.error("Exception killing job: " + job.getJobName(), e);
+          }
+        }
       }
     }
-    return new PipelineResult(stages);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/236d97db/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 472505b..146bcbf 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -32,6 +32,7 @@ import org.apache.crunch.impl.mr.collect.InputCollection;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 import org.apache.crunch.impl.mr.exec.MRExecutor;
+import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.HashMultimap;
@@ -44,11 +45,14 @@ public class MSCRPlanner {
 
   private final MRPipeline pipeline;
   private final Map<PCollectionImpl<?>, Set<Target>> outputs;
-
-  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs) {
+  private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
+  
+  public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs,
+      Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) {
     this.pipeline = pipeline;
     this.outputs = new TreeMap<PCollectionImpl<?>, Set<Target>>(DEPTH_COMPARATOR);
     this.outputs.putAll(outputs);
+    this.toMaterialize = toMaterialize;
   }
 
   // Used to ensure that we always build pipelines starting from the deepest
@@ -148,13 +152,15 @@ public class MSCRPlanner {
     
     // Finally, construct the jobs from the prototypes and return.
     DotfileWriter dotfileWriter = new DotfileWriter();
-    MRExecutor exec = new MRExecutor(jarClass);
+    MRExecutor exec = new MRExecutor(jarClass, outputs, toMaterialize);
     for (JobPrototype proto : Sets.newHashSet(assignments.values())) {
       dotfileWriter.addJobPrototype(proto);
       exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline));
     }
 
-    conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, dotfileWriter.buildDotfile());
+    String planDotFile = dotfileWriter.buildDotfile();
+    exec.setPlanDotFile(planDotFile);
+    conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile);
 
     return exec;
   }