You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/11 20:28:18 UTC

svn commit: r1540818 - in /pig/branches/tez: ivy/ shims/src/hadoop23/org/apache/pig/backend/hadoop23/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/impl/plan/ src/org/apache/pig/impl/util/ test/org/apache/pig/test/data/Golde...

Author: cheolsoo
Date: Mon Nov 11 19:28:17 2013
New Revision: 1540818

URL: http://svn.apache.org/r1540818
Log:
PIG-3539: Pig should be able to submit multiple DAG (daijy via cheolsoo)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
Modified:
    pig/branches/tez/ivy/libraries.properties
    pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
    pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java
    pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Mon Nov 11 19:28:17 2013
@@ -19,7 +19,7 @@ apacherat.version=0.8
 automaton.version=1.11-8
 avro.version=1.7.4
 commons-beanutils.version=1.7.0
-commons-cli.version=1.0
+commons-cli.version=1.2
 commons-codec.version=1.4
 commons-io.version=2.3
 commons-el.version=1.0

Modified: pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java (original)
+++ pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java Mon Nov 11 19:28:17 2013
@@ -77,7 +77,7 @@ public class PigJobControl extends JobCo
     }
   }
 
-  private int timeToSleep;
+  protected int timeToSleep;
 
   /**
    * Construct a job control for a group of jobs.
@@ -174,6 +174,9 @@ public class PigJobControl extends JobCo
 
         synchronized(this) {
           Iterator<ControlledJob> it = getJobs(jobsInProgress).iterator();
+          if (!it.hasNext()) {
+              stop();
+          }
           while(it.hasNext()) {
             ControlledJob j = it.next();
             log.debug("Checking state of job "+j);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Nov 11 19:28:17 2013
@@ -120,11 +120,8 @@ public class TezCompiler extends PhyPlan
     private int fileConcatenationThreshold = 100;
     private boolean optimisticFileConcatenation = false;
 
-    public TezCompiler(PhysicalPlan plan) throws TezCompilerException {
-        this(plan, null);
-    }
-
-    public TezCompiler(PhysicalPlan plan, PigContext pigContext) throws TezCompilerException {
+    public TezCompiler(PhysicalPlan plan, PigContext pigContext)
+            throws TezCompilerException {
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         this.plan = plan;
         this.pigContext = pigContext;
@@ -154,6 +151,15 @@ public class TezCompiler extends PhyPlan
     public TezOperPlan getTezPlan() {
         return tezPlan;
     }
+    
+    // Segment a single DAG into a DAG graph
+    public TezPlanContainer getPlanContainer() throws PlanException {
+        TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
+        TezPlanContainerNode node = new TezPlanContainerNode(new OperatorKey(scope, nig.getNextNodeId(scope)), tezPlan);
+        tezPlanContainer.add(node);
+        tezPlanContainer.split(node);
+        return tezPlanContainer;
+    }
 
     /**
      * The front-end method that the user calls to compile the plan. Assumes

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Mon Nov 11 19:28:17 2013
@@ -22,11 +22,10 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -39,24 +38,19 @@ import org.apache.tez.dag.api.client.DAG
  */
 public class TezJob extends ControlledJob {
     private static final Log log = LogFactory.getLog(TezJob.class);
-    private AMConfiguration amConfig;
-    private ApplicationId appId;
-    private TezClient tezClient;
-    private DAGClient dagClient;
     private DAGStatus dagStatus;
+    private Configuration conf;
     private DAG dag;
+    private DAGClient dagClient;
+    private Map<String, LocalResource> requestAMResources;
+    private TezSession tezSession;
 
-    public TezJob(TezConfiguration conf, ApplicationId appId, DAG dag,
-            Map<String, LocalResource> localResources) throws IOException {
+    public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
+            throws IOException {
         super(conf);
-        this.amConfig = new AMConfiguration(null, null, localResources, conf, null);
-        this.tezClient = new TezClient(conf);
-        this.appId = appId;
+        this.conf = conf;
         this.dag = dag;
-    }
-
-    public ApplicationId getAppId() {
-        return appId;
+        this.requestAMResources = requestAMResources;
     }
 
     public DAG getDag() {
@@ -70,17 +64,20 @@ public class TezJob extends ControlledJo
     @Override
     public void submit() {
         try {
-            log.info("Submitting DAG - Application id: " + appId);
-            dagClient = tezClient.submitDAGApplication(appId, dag, amConfig);
+            tezSession = TezSessionManager.getSession(conf, requestAMResources);
+            log.info("Submitting DAG - Application id: " + tezSession.getApplicationId());
+            dagClient = tezSession.submitDAG(dag);
         } catch (Exception e) {
-            log.info("Cannot submit DAG - Application id: " + appId, e);
+            if (tezSession!=null) {
+                log.info("Cannot submit DAG - Application id: " + tezSession.getApplicationId(), e);
+            }
             setJobState(ControlledJob.State.FAILED);
             return;
         }
 
         while (true) {
             try {
-                dagStatus = dagClient.getDAGStatus();
+                dagStatus = dagClient.getDAGStatus(null);
             } catch (Exception e) {
                 log.info("Cannot retrieve DAG status", e);
                 setJobState(ControlledJob.State.FAILED);
@@ -96,6 +93,9 @@ public class TezJob extends ControlledJo
                     sb.append("\n");
                 }
                 setMessage(sb.toString());
+                TezSessionManager.freeSession(tezSession);
+                tezSession = null;
+                dagClient = null;
                 break;
             }
 
@@ -111,8 +111,9 @@ public class TezJob extends ControlledJo
     public void killJob() throws IOException {
         try {
             dagClient.tryKillDAG();
+            tezSession.stop();
         } catch (TezException e) {
-            throw new IOException("Cannot kill DAG - Application Id: " + appId, e);
+            throw new IOException("Cannot kill DAG - Application Id: " + tezSession.getApplicationId(), e);
         }
     }
 

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.backend.hadoop23.PigJobControl;
+import org.apache.pig.tools.pigstats.tez.TezStats;
+
+public class TezJobControl extends PigJobControl {
+    
+    private TezJobNotifier notifier = null;
+    private TezStats stats = null;
+
+    public TezJobControl(String groupName, int timeToSleep) {
+        super(groupName, timeToSleep);
+    }
+    
+    public void setJobNotifier(TezJobNotifier notifier) {
+        this.notifier = notifier;
+    }
+    
+    public void setTezStats(TezStats stats) {
+        this.stats = stats;
+    }
+    
+    @Override
+    public void run() {
+        try {
+            super.run();
+            try {
+                // Wait for the only jobs finished.
+                while (!allFinished()) {
+                    try {
+                        Thread.sleep(timeToSleep);
+                    } catch (InterruptedException e) {
+                        // Do nothing
+                    }
+                }
+            } catch (Exception e) {
+                throw e;
+            } finally {
+                stop();
+                if (stats!=null) {
+                    stats.accumulateStats(this);
+                }
+                if (notifier!=null) {
+                    notifier.complete(this);
+                }
+            }
+        } catch (Exception e) {
+            // should not happen
+            throw new RuntimeException(e);
+        }
+    }
+
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Mon Nov 11 19:28:17 2013
@@ -17,39 +17,20 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.net.URL;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.impl.util.Utils;
-import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.python.google.common.collect.Sets;
-
-import com.google.common.collect.Maps;
 
 /**
  * This is compiler class that takes a TezOperPlan and converts it into a
@@ -58,28 +39,27 @@ import com.google.common.collect.Maps;
  */
 public class TezJobControlCompiler {
     private static final Log log = LogFactory.getLog(TezJobControlCompiler.class);
-    private static final String DAG_JAR_NAME = "dag_job.jar";
 
     private PigContext pigContext;
-    private TezClient tezClient;
     private TezConfiguration tezConf;
 
     public TezJobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
         this.pigContext = pigContext;
         this.tezConf = new TezConfiguration(conf);
-        this.tezClient = new TezClient(tezConf);
     }
 
     public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
             throws IOException, YarnException {
-        String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, DAG_JAR_NAME);
+        String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
         DAG tezDag = new DAG(jobName);
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
         dagBuilder.visit();
         return tezDag;
     }
 
-    public JobControl compile(TezOperPlan tezPlan, String grpName) throws JobCreationException {
+    public TezJobControl compile(TezOperPlan tezPlan, String grpName,
+            Configuration conf, TezPlanContainer planContainer)
+                    throws JobCreationException {
         int timeToSleep;
         String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "5000";
         String pigJobControlSleep = tezConf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
@@ -96,14 +76,12 @@ public class TezJobControlCompiler {
                     " should be a time in ms. default=" + defaultPigJobControlSleep, e);
         }
 
-        JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
+        TezJobControl jobCtrl = new TezJobControl(grpName, timeToSleep);
 
         try {
-            // TODO: for now, we assume that the whole Tez plan can be always
-            // packaged into a single Tez job. But that may be not always true.
-            tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
-                    TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT);
-            TezJob job = getJob(tezPlan);
+            // A single Tez job always pack only 1 Tez plan. We will track 
+            // Tez job asynchronously to exploit parallel execution opportunities.
+            TezJob job = getJob(tezPlan, conf, planContainer);
             jobCtrl.addJob(job);
         } catch (JobCreationException jce) {
             throw jce;
@@ -116,59 +94,14 @@ public class TezJobControlCompiler {
         return jobCtrl;
     }
 
-    private TezJob getJob(TezOperPlan tezPlan) throws JobCreationException {
+    private TezJob getJob(TezOperPlan tezPlan, Configuration conf, TezPlanContainer planContainer)
+            throws JobCreationException {
         try {
-            ApplicationId appId = tezClient.createApplication();
-            Map<String, LocalResource> localResources = Maps.newHashMap();
-            FileSystem remoteFs = FileSystem.get(tezConf);
-            Path remoteStagingDir = remoteFs.makeQualified(new Path(
-                    tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR), appId.toString()));
-
-            for (URL extraJar : pigContext.extraJars) {
-                Path pathInHDFS = Utils.shipToHDFS(pigContext, tezConf, extraJar);
-                FileStatus fstat = remoteFs.getFileStatus(pathInHDFS);
-                LocalResource extraJarRsrc = LocalResource.newInstance(
-                        ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
-                        LocalResourceType.FILE,
-                        LocalResourceVisibility.APPLICATION,
-                        fstat.getLen(),
-                        fstat.getModificationTime());
-                localResources.put(pathInHDFS.getName(), extraJarRsrc);
-                pigContext.skipJars.add(extraJar.getPath());
-            }
-
-            // Collect all the UDFs registered in tezPlan
-            Set<String> udfs = Sets.newHashSet();
-            Iterator<TezOperator> it = tezPlan.iterator();
-            while (it.hasNext()) {
-                udfs.addAll(it.next().UDFs);
-            }
-
-            // Create the jar of all functions and classes required
-            File jobJar = File.createTempFile("Job", ".jar");
-            jobJar.deleteOnExit();
-            FileOutputStream fos = new FileOutputStream(jobJar);
-            try {
-                JarManager.createJar(fos, udfs, pigContext);
-            } catch (ClassNotFoundException e) {
-                throw new JobCreationException("UDF is not found in classpath: ", e);
-            }
-
-            // Ship the job jar to the staging directory on hdfs
-            Path remoteJarPath = remoteFs.makeQualified(new Path(remoteStagingDir, DAG_JAR_NAME));
-            remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
-            FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);
-
-            LocalResource dagJarLocalRsrc = LocalResource.newInstance(
-                    ConverterUtils.getYarnUrlFromPath(remoteJarPath),
-                    LocalResourceType.FILE,
-                    LocalResourceVisibility.APPLICATION,
-                    jarFileStatus.getLen(),
-                    jarFileStatus.getModificationTime());
-            localResources.put(DAG_JAR_NAME, dagJarLocalRsrc);
-
+            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+            localResources.putAll(planContainer.getLocalResources());
+            localResources.putAll(tezPlan.getLocalExtraResources());
             DAG tezDag = buildDAG(tezPlan, localResources);
-            return new TezJob(tezConf, appId, tezDag, localResources);
+            return new TezJob(tezConf, tezDag, planContainer.getLocalResources());
         } catch (Exception e) {
             int errCode = 2017;
             String msg = "Internal error creating job configuration.";

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
+
+public class TezJobNotifier {
+    private TezPlanContainer planContainer;
+    private TezOperPlan plan;
+    public TezJobNotifier(TezPlanContainer planContainer, TezOperPlan plan) {
+        this.planContainer = planContainer;
+        this.plan = plan;
+    }
+
+    public void complete(JobControl jobControl) {
+        boolean succ = true;
+        if (jobControl.getFailedJobList().size()!=0) {
+            succ = false;
+        }
+
+        planContainer.updatePlan(plan, succ);
+    }
+}
+

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Nov 11 19:28:17 2013
@@ -19,11 +19,15 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -35,82 +39,86 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.tez.TezStats;
-
+import org.apache.tez.dag.api.TezConfiguration;
 
 /**
  * Main class that launches pig for Tez
  */
 public class TezLauncher extends Launcher {
+    
     private static final Log log = LogFactory.getLog(TezLauncher.class);
-
+    
     @Override
     public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
+        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+        FileSystem fs = FileSystem.get(conf);
+        Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+        
+        TezResourceManager.initialize(stagingDir, pc, conf);
+
+        conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+        
+        List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
+        
         TezStats tezStats = new TezStats(pc);
         PigStats.start(tezStats);
-
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+        
         TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
-        TezOperPlan tezPlan = compile(php, pc);
-
-        TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
-        pkgAnnotator.visit();
-
-        tezStats.initialize(tezPlan);
-
-        jc = jcc.compile(tezPlan, grpName);
-
-        // Initially, all jobs are in wait state.
-        List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
-        log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
-
-        // TODO: MapReduceLauncher does a couple of things here. For example,
-        // notify PPNL of job submission, update PigStas, etc. We will worry
-        // about them later.
-
-        // Set the thread UDFContext so registered classes are available.
-        final UDFContext udfContext = UDFContext.getUDFContext();
-        Thread jcThread = new Thread(jc, "JobControl") {
-            @Override
-            public void run() {
-                UDFContext.setUdfContext(udfContext.clone());
-                super.run();
-            }
-        };
-
-        JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
-        jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
-        jcThread.setContextClassLoader(PigContext.getClassLoader());
-
-        // Mark the times that the jobs were submitted so it's reflected in job
-        // history props
-        long scriptSubmittedTimestamp = System.currentTimeMillis();
-        for (ControlledJob job : jobsWithoutIds) {
-            // Job.getConfiguration returns the shared configuration object
-            Configuration jobConf = job.getJob().getConfiguration();
-            jobConf.set("pig.script.submitted.timestamp",
-                    Long.toString(scriptSubmittedTimestamp));
-            jobConf.set("pig.job.submitted.timestamp",
-                    Long.toString(System.currentTimeMillis()));
-        }
-
-        // All the setup done, now lets launch the jobs. DAG is submitted to
-        // YARN cluster by TezJob.submit().
-        jcThread.start();
-
-        try {
-            // Wait for all the jobs are finished.
-            while (!jc.allFinished()) {
-                try {
-                    jcThread.join(500);
-                } catch (InterruptedException e) {
-                    // Do nothing
+        TezPlanContainer tezPlanContainer = compile(php, pc);
+        
+        TezOperPlan tezPlan;
+        
+        while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans))!=null) {
+            processedPlans.add(tezPlan);
+
+            TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
+            pkgAnnotator.visit();
+    
+            tezStats.initialize(tezPlan);
+    
+            jc = jcc.compile(tezPlan, grpName, conf, tezPlanContainer);
+            TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
+            ((TezJobControl)jc).setJobNotifier(notifier);
+            ((TezJobControl)jc).setTezStats(tezStats);
+    
+            // Initially, all jobs are in wait state.
+            List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
+            log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
+    
+            // TODO: MapReduceLauncher does a couple of things here. For example,
+            // notify PPNL of job submission, update PigStas, etc. We will worry
+            // about them later.
+    
+            // Set the thread UDFContext so registered classes are available.
+            final UDFContext udfContext = UDFContext.getUDFContext();
+            Thread jcThread = new Thread(jc, "JobControl") {
+                @Override
+                public void run() {
+                    UDFContext.setUdfContext(udfContext.clone());
+                    super.run();
                 }
+            };
+    
+            JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
+            jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+            jcThread.setContextClassLoader(PigContext.getClassLoader());
+    
+            // Mark the times that the jobs were submitted so it's reflected in job
+            // history props
+            long scriptSubmittedTimestamp = System.currentTimeMillis();
+            for (ControlledJob job : jobsWithoutIds) {
+                // Job.getConfiguration returns the shared configuration object
+                Configuration jobConf = job.getJob().getConfiguration();
+                jobConf.set("pig.script.submitted.timestamp",
+                        Long.toString(scriptSubmittedTimestamp));
+                jobConf.set("pig.job.submitted.timestamp",
+                        Long.toString(System.currentTimeMillis()));
             }
-        } catch (Exception e) {
-            throw e;
-        } finally {
-            tezStats.accumulateStats(jc);
-            jc.stop();
+    
+            // All the setup done, now lets launch the jobs. DAG is submitted to
+            // YARN cluster by TezJob.submit().
+            jcThread.start();
         }
 
         tezStats.finish();
@@ -122,10 +130,10 @@ public class TezLauncher extends Launche
             String format, boolean verbose) throws PlanException,
             VisitorException, IOException {
         log.debug("Entering TezLauncher.explain");
-        TezOperPlan tezp = compile(php, pc);
+        TezPlanContainer tezPlanContainer = compile(php, pc);
 
         if (format.equals("text")) {
-            TezPrinter printer = new TezPrinter(ps, tezp);
+            TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
             printer.setVerbose(verbose);
             printer.visit();
         } else {
@@ -134,12 +142,13 @@ public class TezLauncher extends Launche
         }
     }
 
-    public TezOperPlan compile(PhysicalPlan php, PigContext pc)
+    public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
             throws PlanException, IOException, VisitorException {
         TezCompiler comp = new TezCompiler(php, pc);
         comp.compile();
+        TezOperPlan plan = comp.getTezPlan();
         // TODO: Run optimizations here
-        return comp.getTezPlan();
+        return comp.getPlanContainer();
     }
 
     @Override

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Mon Nov 11 19:28:17 2013
@@ -18,8 +18,14 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.PrintStream;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -33,7 +39,6 @@ public class TezOperPlan extends Operato
     private static final long serialVersionUID = 1L;
 
     public TezOperPlan() {
-        // TODO Auto-generated constructor stub
     }
 
     @Override
@@ -49,5 +54,11 @@ public class TezOperPlan extends Operato
         }
         return baos.toString();
     }
+    
+    public Map<String, LocalResource> getLocalExtraResources() throws IOException {
+        Set<URL> jarLists = new HashSet<URL>();
+        // TODO: Add script jars/pig-misc jars
+        return TezResourceManager.getTezResources(jarLists);
+    }
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Mon Nov 11 19:28:17 2013
@@ -57,6 +57,8 @@ public class TezOperator extends Operato
 
     // Indicates that the plan creation is complete
     boolean closed = false;
+    
+    boolean segmentBelow = false;
 
     // Types of blocking operators. For now, we only support the following ones.
     private static enum OPER_FEATURE {
@@ -173,5 +175,9 @@ public class TezOperator extends Operato
         sb.delete(sb.length() - "\n".length(), sb.length());
         return sb.toString();
     }
+    
+    public boolean needSegmentBelow() {
+        return segmentBelow;
+    }
 }
 

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.JarManager;
+
+public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
+    private static final long serialVersionUID = 1L;
+    private PigContext pigContext;
+
+    public TezPlanContainer(PigContext pigContext) {
+        this.pigContext = pigContext;
+    }
+
+    // Use pig.jar and udf jars for the AM resources (all DAG in the planContainer will
+    // use it for simplicity)
+    public Map<String, LocalResource> getLocalResources() throws IOException {
+        Set<URL> jarLists = new HashSet<URL>();
+
+        jarLists.add(TezResourceManager.getBootStrapJar());
+
+        jarLists.addAll(pigContext.extraJars);
+
+        TezPlanContainerUDFCollector tezPlanContainerUDFCollector = new TezPlanContainerUDFCollector(this);
+        tezPlanContainerUDFCollector.visit();
+        Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
+
+        for (String func: udfs) {
+            Class clazz = pigContext.getClassForAlias(func);
+            if (clazz != null) {
+                String jarName = JarManager.findContainingJar(clazz);
+                URL jarUrl = new File(jarName).toURI().toURL();
+                jarLists.add(jarUrl);
+            }
+        }
+
+        return TezResourceManager.getTezResources(jarLists);
+    }
+
+    public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
+        synchronized(this) {
+            while (getRoots()!=null && !getRoots().isEmpty()) {
+                TezPlanContainerNode currentPlan = null;
+                for (TezPlanContainerNode plan : getRoots()) {
+                    if (!processedPlans.contains(plan.getNode())) {
+                        currentPlan = plan;
+                        break;
+                    }
+                }
+                if (currentPlan!=null) {
+                    return currentPlan.getNode();
+                } else {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public void updatePlan(TezOperPlan plan, boolean succ) {
+        String scope = getRoots().get(0).getOperatorKey().getScope();
+        TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(new OperatorKey(scope,
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)), plan);
+        synchronized(this) {
+            if (succ) {
+                remove(tezPlanContainerNode);
+            } else {
+                // job fail
+                trimBelow(tezPlanContainerNode);
+                remove(tezPlanContainerNode);
+            }
+            notify();
+        }
+    }
+
+    public void split(TezPlanContainerNode planNode) throws PlanException {
+        TezOperPlan tezOperPlan = planNode.getNode();
+        for (TezOperator tezOper : tezOperPlan) {
+            if (tezOper.needSegmentBelow() && tezOperPlan.getSuccessors(tezOper)!=null) {
+                for (TezOperator succ : tezOperPlan.getSuccessors(tezOper)) {
+                    tezOperPlan.disconnect(tezOper, succ);
+                    TezOperPlan newOperPlan = new TezOperPlan();
+                    tezOperPlan.moveTree(succ, newOperPlan);
+                    TezPlanContainerNode newPlanNode = new TezPlanContainerNode(tezOper.getOperatorKey(), newOperPlan);
+                    add(newPlanNode);
+                    connect(planNode, newPlanNode);
+                    split(planNode);
+                    split(newPlanNode);
+                }
+            }
+        }
+    }
+}
+

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor>{
+    private static final long serialVersionUID = 1L;
+    TezOperPlan node;
+    public TezPlanContainerNode(OperatorKey k, TezOperPlan node) {
+        super(k);
+        this.node = node;
+    }
+
+    @Override
+    public void visit(TezPlanContainerVisitor v) throws VisitorException {
+        v.visitTezPlanContainerNode(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        return "DAG " + mKey;
+    }
+
+    public TezOperPlan getNode() {
+        return node;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o != null && o instanceof TezPlanContainerNode) {
+            return ((TezPlanContainerNode)o).getNode().equals(getNode());
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return getNode().hashCode();
+    }
+}
+

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.PrintStream;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerPrinter extends TezPlanContainerVisitor {
+    private PrintStream mStream = null;
+    private boolean isVerbose = true;
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan tez plan to print
+     */
+    public TezPlanContainerPrinter(PrintStream ps, TezPlanContainer planContainer) {
+        super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+        mStream = ps;
+        mStream.println("#--------------------------------------------------");
+        mStream.println("# There are " + planContainer.size() + " DAGs in the session");
+        mStream.println("#--------------------------------------------------");
+    }
+
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
+    @Override
+    public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+        mStream.println("#--------------------------------------------------");
+        mStream.println("# TEZ plan: " + tezPlanContainerNode.getOperatorKey());
+        mStream.println("#--------------------------------------------------");
+        TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getNode());
+        printer.setVerbose(isVerbose);
+        printer.visit();
+    }
+}
+

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.python.google.common.collect.Sets;
+
+public class TezPlanContainerUDFCollector extends TezPlanContainerVisitor {
+    private Set<String> udfs = Sets.newHashSet();
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan tez plan to print
+     */
+    public TezPlanContainerUDFCollector(TezPlanContainer planContainer) {
+        super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+    }
+
+    @Override
+    public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+        Iterator<TezOperator> it = tezPlanContainerNode.getNode().iterator();
+        while (it.hasNext()) {
+            udfs.addAll(it.next().UDFs);
+        }
+    }
+
+    public Set<String> getUdfs() {
+        return udfs;
+    }
+}
+

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerVisitor extends PlanVisitor<TezPlanContainerNode, TezPlanContainer> {
+    public TezPlanContainerVisitor(TezPlanContainer plan, PlanWalker<TezPlanContainerNode, TezPlanContainer> walker) {
+        super(plan, walker);
+        // TODO Auto-generated constructor stub
+    }
+
+    public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+        // TODO Auto-generated method stub
+    }
+}
+

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Mon Nov 11 19:28:17 2013
@@ -40,9 +40,6 @@ public class TezPrinter extends TezOpPla
     public TezPrinter(PrintStream ps, TezOperPlan plan) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         mStream = ps;
-        mStream.println("#--------------------------------------------------");
-        mStream.println("# TEZ plan:");
-        mStream.println("#--------------------------------------------------");
     }
 
     public void setVerbose(boolean verbose) {

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.Utils;
+
+public class TezResourceManager {
+    private static Path stagingDir;
+    private static PigContext pigContext;
+    private static Configuration conf;
+    private static URL bootStrapJar;
+    private static FileSystem remoteFs;
+
+    public static Map<URL, Path> resources = new HashMap<URL, Path>();
+
+    public static URL getBootStrapJar() {
+        return bootStrapJar;
+    }
+
+    public static void initialize(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+        TezResourceManager.stagingDir = stagingDir;
+        TezResourceManager.pigContext = pigContext;
+        TezResourceManager.conf = conf;
+        String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
+        TezResourceManager.bootStrapJar = new File(jar).toURI().toURL();
+        remoteFs = FileSystem.get(conf);
+        addBootStrapJar();
+    }
+
+    public static void addLocalResource(URL url) throws IOException {
+        if (resources.containsKey(url)) {
+            return;
+        }
+
+        Path pathInHDFS = Utils.shipToHDFS(pigContext, conf, url);
+        resources.put(url, pathInHDFS);
+    }
+
+    public static void addBootStrapJar() throws IOException {
+        if (resources.containsKey(bootStrapJar)) {
+            return;
+        }
+
+        FileSystem remoteFs = FileSystem.get(conf);
+
+        File jobJar = File.createTempFile("Job", ".jar");
+        jobJar.deleteOnExit();
+        FileOutputStream fos = new FileOutputStream(jobJar);
+        JarManager.createBootStrapJar(fos, pigContext);
+
+        // Ship the job.jar to the staging directory on hdfs
+        Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName()));
+        remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
+
+        resources.put(bootStrapJar, remoteJarPath);
+    }
+
+    public static Path get(URL url) {
+        return resources.get(url);
+    }
+
+    public static Map<String, LocalResource> getTezResources(Set<URL> urls) throws IOException {
+        Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
+        for (URL url : urls) {
+            if (!resources.containsKey(url)) {
+                addLocalResource(url);
+            }
+            FileStatus fstat = remoteFs.getFileStatus(resources.get(url));
+            LocalResource tezResource = LocalResource.newInstance(
+                    ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
+                    LocalResourceType.FILE,
+                    LocalResourceVisibility.APPLICATION,
+                    fstat.getLen(),
+                    fstat.getModificationTime());
+            tezResources.put(resources.get(url).getName(), tezResource);
+        }
+        return tezResources;
+    }
+}
+

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.PigContext;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+
+public class TezSessionManager {
+    public static class SessionInfo {
+        SessionInfo(TezSession session, Map<String, LocalResource> resources) {
+            this.session = session;
+            this.resources = resources;
+        }
+        public Map<String, LocalResource> getResources() {
+            return resources;
+        }
+        public TezSession getTezSession() {
+            return session;
+        }
+        public void setInUse(boolean inUse) {
+            this.inUse = inUse;
+        }
+        private TezSession session;
+        private Map<String, LocalResource> resources;
+        private boolean inUse = false;
+    }
+
+    private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
+
+    private static void waitForTezSessionReady(TezSession tezSession)
+        throws IOException, TezException {
+        while (true) {
+            TezSessionStatus status = tezSession.getSessionStatus();
+            if (status.equals(TezSessionStatus.SHUTDOWN)) {
+                throw new RuntimeException("TezSession has already shutdown");
+            }
+            if (status.equals(TezSessionStatus.READY)) {
+                return;
+            }
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                throw new IOException("Interrupted while trying to check session status", e);
+            }
+        }
+    }
+
+    private static SessionInfo createSession(Configuration conf, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+        TezConfiguration tezConf = new TezConfiguration(conf);
+        TezClient tezClient = new TezClient(tezConf);
+        ApplicationId appId = tezClient.createApplication();
+
+        Map<String, LocalResource> resources = new HashMap<String, LocalResource>();
+        resources.putAll(requestedAMResources);
+
+        String jobName = conf.get(PigContext.JOB_NAME, "pig");
+        AMConfiguration amConfig = new AMConfiguration(null, resources, tezConf, null);
+        TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+        TezSession tezSession = new TezSession(jobName, appId, sessionConfig);
+        tezSession.start();
+        waitForTezSessionReady(tezSession);
+        return new SessionInfo(tezSession, resources);
+    }
+
+    private static boolean validateSessionResources(SessionInfo currentSession, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+        for (Map.Entry<String, LocalResource> entry : requestedAMResources.entrySet()) {
+            if (!currentSession.resources.entrySet().contains(entry)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    static TezSession getSession(Configuration conf, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+        synchronized (sessionPool) {
+            List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
+            for (SessionInfo sessionInfo : sessionPool) {
+                if (sessionInfo.session.getSessionStatus()==TezSessionStatus.SHUTDOWN) {
+                    sessionsToRemove.add(sessionInfo);
+                }
+                else if (!sessionInfo.inUse && sessionInfo.session.getSessionStatus()==TezSessionStatus.READY &&
+                        validateSessionResources(sessionInfo, requestedAMResources)) {
+                    sessionInfo.inUse = true;
+                    return sessionInfo.session;
+                }
+            }
+
+            for (SessionInfo sessionToRemove : sessionsToRemove) {
+                sessionPool.remove(sessionToRemove);
+            }
+
+            // We cannot find available AM, create new one
+            SessionInfo sessionInfo = createSession(conf, requestedAMResources);
+            sessionInfo.inUse = true;
+            sessionPool.add(sessionInfo);
+            return sessionInfo.session;
+        }
+    }
+
+    static void freeSession(TezSession session) {
+        synchronized (sessionPool) {
+            for (SessionInfo sessionInfo : sessionPool) {
+                if (sessionInfo.session==session) {
+                    sessionInfo.inUse = false;
+                }
+            }
+        }
+    }
+}
+

Modified: pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Nov 11 19:28:17 2013
@@ -281,6 +281,24 @@ public abstract class OperatorPlan<E ext
             }
         }
     }
+    
+    /**
+     * Move everything below a given operator to the new operator plan.  The specified operator will
+     * be moved and will be the root of the new operator plan
+     * @param root Operator to move everything after
+     * @param newPlan new operator plan to move things into
+     * @throws PlanException 
+     */
+    public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
+        List<E> succs = getSuccessors(root);
+        for (E succ : succs) {
+            moveTree(succ, newPlan);
+        }
+        remove(root);
+        for (E succ : succs) {
+            newPlan.connect(root, succ);
+        }
+    }
 
     /**
      * Trim everything above a given operator.  The specified operator will

Modified: pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java Mon Nov 11 19:28:17 2013
@@ -128,6 +128,23 @@ public class JarManager {
             return pkgClass;
         }
     }
+    
+    public static void createBootStrapJar(OutputStream os, PigContext pigContext) throws IOException {
+        JarOutputStream jarFile = new JarOutputStream(os);
+        HashMap<String, String> contents = new HashMap<String, String>();
+        Vector<JarListEntry> jarList = new Vector<JarListEntry>();
+
+        for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
+            addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
+        }
+        
+        Iterator<JarListEntry> it = jarList.iterator();
+        while (it.hasNext()) {
+            JarListEntry jarEntry = it.next();
+            mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents);
+        }
+        jarFile.close();
+    }
 
     /**
      * Create a jarfile in a temporary path, that is a merge of all the jarfiles containing the

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld Mon Nov 11 19:28:17 2013
@@ -1,5 +1,8 @@
 #--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ plan: scope-17
 #--------------------------------------------------
 Tez vertex scope-16
 c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-15

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld Mon Nov 11 19:28:17 2013
@@ -1,29 +1,32 @@
 #--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
 #--------------------------------------------------
-Tez vertex scope-35
-b: Local Rearrange[tuple]{int}(false) - scope-27
+#--------------------------------------------------
+# TEZ plan: scope-38
+#--------------------------------------------------
+Tez vertex scope-36
+b: Local Rearrange[tuple]{int}(false) - scope-28
 |   |
-|   Project[int][0] - scope-28
+|   Project[int][0] - scope-29
 |
-|---a: New For Each(false,false)[bag] - scope-24
+|---a: New For Each(false,false)[bag] - scope-25
     |   |
-    |   Cast[int] - scope-19
+    |   Cast[int] - scope-20
     |   |
-    |   |---Project[bytearray][0] - scope-18
+    |   |---Project[bytearray][0] - scope-19
     |   |
-    |   Cast[int] - scope-22
+    |   Cast[int] - scope-23
     |   |
-    |   |---Project[bytearray][1] - scope-21
+    |   |---Project[bytearray][1] - scope-22
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-17
-Tez vertex scope-36
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-34
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-18
+Tez vertex scope-37
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-35
 |
-|---c: New For Each(false,false)[bag] - scope-33
+|---c: New For Each(false,false)[bag] - scope-34
     |   |
-    |   Project[int][0] - scope-29
+    |   Project[int][0] - scope-30
     |   |
-    |   Project[bag][1] - scope-31
+    |   Project[bag][1] - scope-32
     |
-    |---b: Package[tuple]{int} - scope-26
+    |---b: Package[tuple]{int} - scope-27

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld Mon Nov 11 19:28:17 2013
@@ -1,53 +1,56 @@
 #--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
 #--------------------------------------------------
-Tez vertex scope-72
-c: Local Rearrange[tuple]{int}(false) - scope-57
+#--------------------------------------------------
+# TEZ plan: scope-77
+#--------------------------------------------------
+Tez vertex scope-74
+c: Local Rearrange[tuple]{int}(false) - scope-59
 |   |
-|   Project[int][0] - scope-58
+|   Project[int][0] - scope-60
 |
-|---a: New For Each(false,false)[bag] - scope-44
+|---a: New For Each(false,false)[bag] - scope-46
     |   |
-    |   Cast[int] - scope-39
+    |   Cast[int] - scope-41
     |   |
-    |   |---Project[bytearray][0] - scope-38
+    |   |---Project[bytearray][0] - scope-40
     |   |
-    |   Cast[int] - scope-42
+    |   Cast[int] - scope-44
     |   |
-    |   |---Project[bytearray][1] - scope-41
+    |   |---Project[bytearray][1] - scope-43
     |
-    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-37
-Tez vertex scope-73
-c: Local Rearrange[tuple]{int}(false) - scope-59
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-39
+Tez vertex scope-75
+c: Local Rearrange[tuple]{int}(false) - scope-61
 |   |
-|   Project[int][0] - scope-60
+|   Project[int][0] - scope-62
 |
-|---b: New For Each(false,false)[bag] - scope-52
+|---b: New For Each(false,false)[bag] - scope-54
     |   |
-    |   Cast[int] - scope-47
+    |   Cast[int] - scope-49
     |   |
-    |   |---Project[bytearray][0] - scope-46
+    |   |---Project[bytearray][0] - scope-48
     |   |
-    |   Cast[int] - scope-50
+    |   Cast[int] - scope-52
     |   |
-    |   |---Project[bytearray][1] - scope-49
+    |   |---Project[bytearray][1] - scope-51
     |
-    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-45
-Tez vertex scope-74
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-71
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-47
+Tez vertex scope-76
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-73
 |
-|---d: New For Each(false,false,false)[bag] - scope-70
+|---d: New For Each(false,false,false)[bag] - scope-72
     |   |
-    |   Project[int][0] - scope-64
+    |   Project[int][0] - scope-66
     |   |
-    |   Project[int][1] - scope-66
+    |   Project[int][1] - scope-68
     |   |
-    |   Project[int][3] - scope-68
+    |   Project[int][3] - scope-70
     |
-    |---c: New For Each(true,true)[tuple] - scope-63
+    |---c: New For Each(true,true)[tuple] - scope-65
         |   |
-        |   Project[bag][1] - scope-61
+        |   Project[bag][1] - scope-63
         |   |
-        |   Project[bag][2] - scope-62
+        |   Project[bag][2] - scope-64
         |
-        |---c: Package[tuple]{int} - scope-56
+        |---c: Package[tuple]{int} - scope-58

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld Mon Nov 11 19:28:17 2013
@@ -1,31 +1,34 @@
 #--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
 #--------------------------------------------------
-Tez vertex scope-85
-Local Rearrange[tuple]{tuple}(false) - scope-87
+#--------------------------------------------------
+# TEZ plan: scope-96
+#--------------------------------------------------
+Tez vertex scope-88
+Local Rearrange[tuple]{tuple}(false) - scope-90
 |   |
-|   Project[tuple][*] - scope-86
+|   Project[tuple][*] - scope-89
 |
-|---b: Limit - scope-76
+|---b: Limit - scope-79
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-75
-Tez vertex scope-88
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-84
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-78
+Tez vertex scope-91
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-87
 |
-|---c: New For Each(false)[bag] - scope-83
+|---c: New For Each(false)[bag] - scope-86
     |   |
-    |   Project[int][0] - scope-81
+    |   Project[int][0] - scope-84
     |
-    |---a: New For Each(false)[bag] - scope-80
+    |---a: New For Each(false)[bag] - scope-83
         |   |
-        |   Cast[int] - scope-78
+        |   Cast[int] - scope-81
         |   |
-        |   |---Project[bytearray][0] - scope-77
+        |   |---Project[bytearray][0] - scope-80
         |
-        |---Limit - scope-92
+        |---Limit - scope-95
             |
-            |---New For Each(true)[bag] - scope-91
+            |---New For Each(true)[bag] - scope-94
                 |   |
-                |   Project[tuple][1] - scope-90
+                |   Project[tuple][1] - scope-93
                 |
-                |---Package[tuple]{tuple} - scope-89
+                |---Package[tuple]{tuple} - scope-92

Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld Mon Nov 11 19:28:17 2013
@@ -1,31 +1,34 @@
 #--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
 #--------------------------------------------------
-Tez vertex scope-106
-Local Rearrange[tuple]{tuple}(true) - scope-108
+#--------------------------------------------------
+# TEZ plan: scope-117
+#--------------------------------------------------
+Tez vertex scope-110
+Local Rearrange[tuple]{tuple}(true) - scope-112
 |   |
-|   Project[tuple][*] - scope-107
+|   Project[tuple][*] - scope-111
 |
-|---a: New For Each(false,false)[bag] - scope-100
+|---a: New For Each(false,false)[bag] - scope-104
     |   |
-    |   Cast[int] - scope-95
+    |   Cast[int] - scope-99
     |   |
-    |   |---Project[bytearray][0] - scope-94
+    |   |---Project[bytearray][0] - scope-98
     |   |
-    |   Cast[int] - scope-98
+    |   Cast[int] - scope-102
     |   |
-    |   |---Project[bytearray][1] - scope-97
+    |   |---Project[bytearray][1] - scope-101
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-93
-Tez vertex scope-109
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-105
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-97
+Tez vertex scope-113
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-109
 |
-|---c: New For Each(false)[bag] - scope-104
+|---c: New For Each(false)[bag] - scope-108
     |   |
-    |   Project[int][1] - scope-102
+    |   Project[int][1] - scope-106
     |
-    |---New For Each(true)[bag] - scope-112
+    |---New For Each(true)[bag] - scope-116
         |   |
-        |   Project[tuple][0] - scope-111
+        |   Project[tuple][0] - scope-115
         |
-        |---Package[tuple]{tuple} - scope-110
+        |---Package[tuple]{tuple} - scope-114

Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Mon Nov 11 19:28:17 2013
@@ -29,8 +29,8 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerPrinter;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.test.Util;
 import org.apache.pig.test.junit.OrderedJUnit4Runner;
@@ -141,11 +141,12 @@ public class TestTezCompiler {
 
     private void run(PhysicalPlan pp, String expectedFile) throws Exception {
         TezCompiler comp = new TezCompiler(pp, pc);
-        TezOperPlan tezPlan = comp.compile();
+        comp.compile();
+        TezPlanContainer tezPlanContainer = comp.getPlanContainer();
 
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
-        TezPrinter printer = new TezPrinter(ps, tezPlan);
+        TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
         printer.visit();
         String compiledPlan = baos.toString();