You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC

svn commit: r1598702 [11/23] - in /pig/trunk: ./ ivy/ shims/src/hadoop23/org/apache/pig/backend/hadoop23/ shims/test/hadoop20/org/apache/pig/test/ shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/ba...

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Fri May 30 19:07:23 2014
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.impl.PigContext;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+
+/**
+ * This is compiler class that takes a TezOperPlan and converts it into a
+ * JobControl object with the relevant dependency info maintained. The
+ * JobControl object is made up of TezJobs each of which has a JobConf.
+ */
+public class TezJobControlCompiler {
+    private static final Log log = LogFactory.getLog(TezJobControlCompiler.class);
+    private static int dagIdentifier = 0;
+
+    private PigContext pigContext;
+    private TezConfiguration tezConf;
+
+    public TezJobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+        this.pigContext = pigContext;
+        this.tezConf = new TezConfiguration(conf);
+    }
+
+    public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+            throws IOException, YarnException {
+        String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
+        DAG tezDag = new DAG(jobName + "-" + dagIdentifier);
+        dagIdentifier++;
+        tezDag.setCredentials(new Credentials());
+        TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
+        dagBuilder.visit();
+        return tezDag;
+    }
+
+    public TezJobControl compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+            throws JobCreationException {
+        int timeToSleep;
+        String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "1000";
+        String pigJobControlSleep = tezConf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
+        if (!pigJobControlSleep.equals(defaultPigJobControlSleep)) {
+            log.info("overriding default JobControl sleep (" +
+                    defaultPigJobControlSleep + ") to " + pigJobControlSleep);
+        }
+
+        try {
+            timeToSleep = Integer.parseInt(pigJobControlSleep);
+        } catch (NumberFormatException e) {
+            throw new RuntimeException("Invalid configuration " +
+                    "pig.jobcontrol.sleep=" + pigJobControlSleep +
+                    " should be a time in ms. default=" + defaultPigJobControlSleep, e);
+        }
+
+        TezJobControl jobCtrl = new TezJobControl(grpName, timeToSleep);
+
+        try {
+            // A single Tez job always pack only 1 Tez plan. We will track
+            // Tez job asynchronously to exploit parallel execution opportunities.
+            TezJob job = getJob(tezPlan, planContainer);
+            jobCtrl.addJob(job);
+        } catch (JobCreationException jce) {
+            throw jce;
+        } catch(Exception e) {
+            int errCode = 2017;
+            String msg = "Internal error creating job configuration.";
+            throw new JobCreationException(msg, errCode, PigException.BUG, e);
+        }
+
+        return jobCtrl;
+    }
+
+    private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
+            throws JobCreationException {
+        try {
+            Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+            localResources.putAll(planContainer.getLocalResources());
+            localResources.putAll(tezPlan.getExtraResources());
+            DAG tezDag = buildDAG(tezPlan, localResources);
+            return new TezJob(tezConf, tezDag, localResources);
+        } catch (Exception e) {
+            int errCode = 2017;
+            String msg = "Internal error creating job configuration.";
+            throw new JobCreationException(msg, errCode, PigException.BUG, e);
+        }
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java Fri May 30 19:07:23 2014
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
+
+public class TezJobNotifier {
+    private TezPlanContainer planContainer;
+    private TezOperPlan plan;
+    public TezJobNotifier(TezPlanContainer planContainer, TezOperPlan plan) {
+        this.planContainer = planContainer;
+        this.plan = plan;
+    }
+
+    public void complete(JobControl jobControl) {
+        boolean succ = true;
+        if (jobControl.getFailedJobList().size()!=0) {
+            succ = false;
+        }
+
+        planContainer.updatePlan(plan, succ);
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri May 30 19:07:23 2014
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.NoopFilterRemover;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezStats;
+import org.apache.pig.tools.pigstats.tez.TezTaskStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.TezConfiguration;
+
+/**
+ * Main class that launches pig for Tez
+ */
+public class TezLauncher extends Launcher {
+
+    private static final Log log = LogFactory.getLog(TezLauncher.class);
+    private boolean aggregateWarning = false;
+    private TezScriptState tezScriptState;
+    private TezStats tezStats;
+
+    @Override
+    public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
+        aggregateWarning = Boolean.parseBoolean(pc.getProperties().getProperty("aggregate.warning", "false"));
+        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+        Path stagingDir = FileLocalizer.getTemporaryPath(pc, "-tez");
+
+        TezResourceManager tezResourceManager = new TezResourceManager(stagingDir, pc, conf);
+
+        log.info("Tez staging directory is " + stagingDir.toString());
+        conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+
+        List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
+
+        tezScriptState = TezScriptState.get();
+        tezStats = new TezStats(pc);
+        PigStats.start(tezStats);
+
+        TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
+        TezPlanContainer tezPlanContainer = compile(php, pc, tezResourceManager);
+
+        TezOperPlan tezPlan;
+
+        while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans))!=null) {
+            processedPlans.add(tezPlan);
+
+            TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
+            pkgAnnotator.visit();
+
+            tezStats.initialize(tezPlan);
+
+            jc = jcc.compile(tezPlan, grpName, tezPlanContainer);
+            TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
+            ((TezJobControl)jc).setJobNotifier(notifier);
+
+            // Initially, all jobs are in wait state.
+            List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
+            log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
+
+            tezScriptState.emitInitialPlanNotification(tezPlan);
+            tezScriptState.emitLaunchStartedNotification(tezPlan.size());
+
+            // Set the thread UDFContext so registered classes are available.
+            final UDFContext udfContext = UDFContext.getUDFContext();
+            Thread jcThread = new Thread(jc, "JobControl") {
+                @Override
+                public void run() {
+                    UDFContext.setUdfContext(udfContext.clone());
+                    super.run();
+                }
+            };
+
+            JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
+            jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+            jcThread.setContextClassLoader(PigContext.getClassLoader());
+
+            // TezJobControl always holds a single TezJob. We use JobControl
+            // only because it is convenient to launch the job via
+            // ControlledJob.submit().
+            TezJob job = (TezJob)jobsWithoutIds.get(0);
+            tezStats.setTezJob(job);
+
+            // Mark the times that the jobs were submitted so it's reflected in job
+            // history props
+            long scriptSubmittedTimestamp = System.currentTimeMillis();
+            // Job.getConfiguration returns the shared configuration object
+            Configuration jobConf = job.getJob().getConfiguration();
+            jobConf.set("pig.script.submitted.timestamp",
+                    Long.toString(scriptSubmittedTimestamp));
+            jobConf.set("pig.job.submitted.timestamp",
+                    Long.toString(System.currentTimeMillis()));
+
+            // Inform ppnl of jobs submission
+            tezScriptState.emitJobsSubmittedNotification(jobsWithoutIds.size());
+
+            // All the setup done, now lets launch the jobs. DAG is submitted to
+            // YARN cluster by TezJob.submit().
+            jcThread.start();
+
+            Double prevProgress = 0.0;
+            while(!jc.allFinished()) {
+                List<ControlledJob> jobsAssignedIdInThisRun = new ArrayList<ControlledJob>();
+                if (job.getApplicationId() != null) {
+                    jobsAssignedIdInThisRun.add(job);
+                }
+                notifyStarted(job);
+                jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
+                prevProgress = notifyProgress(job, prevProgress);
+            }
+
+            notifyFinishedOrFailed(job);
+            tezStats.accumulateStats(job);
+            tezScriptState.emitProgressUpdatedNotification(100);
+        }
+
+        tezStats.finish();
+        tezScriptState.emitLaunchCompletedNotification(tezStats.getNumberSuccessfulJobs());
+
+        return tezStats;
+    }
+
+    private void notifyStarted(TezJob job) throws IOException {
+        for (Vertex v : job.getDAG().getVertices()) {
+            TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+            byte[] bb = v.getProcessorDescriptor().getUserPayload();
+            Configuration conf = TezUtils.createConfFromUserPayload(bb);
+            tts.setConf(conf);
+            tts.setId(v.getVertexName());
+            tezScriptState.emitJobStartedNotification(v.getVertexName());
+        }
+    }
+
+    private Double notifyProgress(TezJob job, Double prevProgress) {
+        int numberOfJobs = tezStats.getNumberJobs();
+        Double perCom = 0.0;
+        if (job.getJobState() == ControlledJob.State.RUNNING) {
+            Double fractionComplete = job.getDAGProgress();
+            perCom += fractionComplete;
+        }
+        perCom = (perCom/(double)numberOfJobs)*100;
+        if (perCom >= (prevProgress + 4.0)) {
+            tezScriptState.emitProgressUpdatedNotification( perCom.intValue() );
+            return perCom;
+        } else {
+            return prevProgress;
+        }
+    }
+
+    private void notifyFinishedOrFailed(TezJob job) {
+        if (job.getJobState() == ControlledJob.State.SUCCESS) {
+            for (Vertex v : job.getDAG().getVertices()) {
+                TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+                tezScriptState.emitjobFinishedNotification(tts);
+            }
+        } else if (job.getJobState() == ControlledJob.State.FAILED) {
+            for (Vertex v : ((TezJob)job).getDAG().getVertices()) {
+                TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+                tezScriptState.emitJobFailedNotification(tts);
+            }
+        }
+    }
+
+    @Override
+    public void explain(PhysicalPlan php, PigContext pc, PrintStream ps,
+            String format, boolean verbose) throws PlanException,
+            VisitorException, IOException {
+        log.debug("Entering TezLauncher.explain");
+        TezPlanContainer tezPlanContainer = compile(php, pc, null);
+
+        if (format.equals("text")) {
+            TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
+            printer.setVerbose(verbose);
+            printer.visit();
+        } else {
+            // TODO: add support for other file format
+            throw new IOException("Non-text output of explain is not supported.");
+        }
+    }
+
+    public TezPlanContainer compile(PhysicalPlan php, PigContext pc, TezResourceManager tezResourceManager)
+            throws PlanException, IOException, VisitorException {
+        TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
+        TezOperPlan tezPlan = comp.compile();
+
+        NoopFilterRemover filter = new NoopFilterRemover(tezPlan);
+        filter.visit();
+
+        boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
+                PigConfiguration.PROP_NO_COMBINER, "false"));
+
+        // Run CombinerOptimizer on Tez plan
+        if (!pc.inIllustrator && !nocombiner)  {
+            boolean doMapAgg = Boolean.parseBoolean(pc.getProperties().getProperty(
+                    PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false"));
+            CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg);
+            co.visit();
+            co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
+        }
+
+        // Run optimizer to make use of secondary sort key when possible for nested foreach
+        // order by and distinct. Should be done before AccumulatorOptimizer
+        boolean noSecKeySort = Boolean.parseBoolean(pc.getProperties().getProperty(
+                PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "false"));
+        if (!pc.inIllustrator && !noSecKeySort)  {
+            SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
+            skOptimizer.visit();
+        }
+
+        boolean isMultiQuery =
+                "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+
+        if (isMultiQuery) {
+            // reduces the number of TezOpers in the Tez plan generated
+            // by multi-query (multi-store) script.
+            MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan);
+            mqOptimizer.visit();
+        }
+
+        // Run AccumulatorOptimizer on Tez plan
+        boolean isAccum = Boolean.parseBoolean(pc.getProperties().getProperty(
+                    PigConfiguration.OPT_ACCUMULATOR, "true"));
+        if (isAccum) {
+            AccumulatorOptimizer accum = new AccumulatorOptimizer(tezPlan);
+            accum.visit();
+        }
+
+        boolean isUnionOpt = "true".equalsIgnoreCase(pc.getProperties()
+                .getProperty(PigConfiguration.TEZ_OPT_UNION, "true"));
+        // Use VertexGroup in Tez
+        if (isUnionOpt) {
+            UnionOptimizer uo = new UnionOptimizer(tezPlan);
+            uo.visit();
+        }
+
+        return comp.getPlanContainer();
+    }
+
+    @Override
+    public void kill() throws BackendException {
+        if (jc == null) return;
+        for (ControlledJob job : jc.getRunningJobs()) {
+            try {
+                job.killJob();
+                break;
+            } catch (Exception e) {
+                throw new BackendException(e);
+            }
+        }
+    }
+
+    @Override
+    public void killJob(String jobID, Configuration conf) throws BackendException {
+        for (ControlledJob job : jc.getRunningJobs()) {
+            if (job.getJobID().equals(jobID)) {
+                try {
+                    job.killJob();
+                } catch (Exception e) {
+                    throw new BackendException(e);
+                }
+                break;
+            }
+        }
+        log.info("Cannot find job: " + jobID);
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java Fri May 30 19:07:23 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Properties;
+
+/**
+ * TezExecType is the ExecType for distributed mode in Tez.
+ */
+public class TezLocalExecType extends TezExecType {
+
+    private static final long serialVersionUID = 1L;
+    private static final String[] modes = { "TEZ_LOCAL" };
+
+    @Override
+    public boolean accepts(Properties properties) {
+        String execTypeSpecified =
+                properties.getProperty("exectype", "").toUpperCase();
+        for (String mode : modes) {
+            if (execTypeSpecified.equals(mode)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isLocal() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        return "TEZ_LOCAL";
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java Fri May 30 19:07:23 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+
+/**
+ * A visitor for the TezOperPlan class
+ */
+public class TezOpPlanVisitor extends PlanVisitor<TezOperator, TezOperPlan> {
+
+    public TezOpPlanVisitor(TezOperPlan plan, PlanWalker<TezOperator, TezOperPlan> walker) {
+        super(plan, walker);
+        // TODO Auto-generated constructor stub
+    }
+
+    public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+        // TODO Auto-generated method stub
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Fri May 30 19:07:23 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A Plan used to create the plan of Tez operators which can be converted into
+ * the Job Control object. This is necessary to capture the dependencies among
+ * jobs.
+ */
+public class TezOperPlan extends OperatorPlan<TezOperator> {
+
+    private static final long serialVersionUID = 1L;
+
+    private TezResourceManager tezResourceManager;
+
+    private Map<String, Path> extraResources = new HashMap<String, Path>();
+
+    public TezOperPlan(TezResourceManager tezResourceManager) {
+        this.tezResourceManager = tezResourceManager;
+    }
+
+    @Override
+    public String toString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        TezPrinter printer = new TezPrinter(ps, this);
+        printer.setVerbose(true);
+        try {
+            printer.visit();
+        } catch (VisitorException e) {
+            throw new RuntimeException(e);
+        }
+        return baos.toString();
+    }
+
+    // Add extra plan-specific local resources from the source FS
+    public void addExtraResource(URL url) throws IOException {
+        Path resourcePath = new Path(url.getFile());
+        String resourceName = resourcePath.getName();
+
+        if (!extraResources.containsKey(resourceName)) {
+            Path remoteFsPath = tezResourceManager.addTezResource(url);
+            extraResources.put(resourceName, remoteFsPath);
+        }
+    }
+
+    // Add extra plan-specific local resources already present in the remote FS
+    public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException {
+        if (!extraResources.containsKey(resourceName)) {
+            tezResourceManager.addTezResource(resourceName, remoteFsPath);
+            extraResources.put(resourceName, remoteFsPath);
+        }
+    }
+
+    // Get the plan-specific resources
+    public Map<String, LocalResource> getExtraResources() throws Exception {
+        TezPOStreamVisitor streamVisitor = new TezPOStreamVisitor(this);
+        streamVisitor.visit();
+
+        // In a STREAM add the files specified in SHIP and CACHE
+        // as local resources for the plan.
+        addShipResources(streamVisitor.getShipFiles());
+        addCacheResources(streamVisitor.getCacheFiles());
+
+        return tezResourceManager.getTezResources(extraResources.keySet());
+    }
+
+    // In the statement "SHIP('/home/foo')" we'll map the resource name foo to
+    // the file that has been copied to the staging directory in the remote FS.
+    private void addShipResources(Set<String> fileNames) throws IOException {
+        for (String fileName : fileNames) {
+            fileName = fileName.trim();
+            if (fileName.length() > 0) {
+                URL url = new File(fileName).toURI().toURL();
+                addExtraResource(url);
+            }
+        }
+    }
+
+    // In the statement "CACHE('/input/data.txt#alias.txt')" we'll map the
+    // resource name alias.txt to the actual resource path in the remote FS
+    // at '/input/data.txt'.
+    private void addCacheResources(Set<String> fileNames) throws Exception {
+        for (String fileName : fileNames) {
+            fileName = fileName.trim();
+            if (fileName.length() > 0) {
+                URI resourceURI = new URI(fileName);
+                String fragment = resourceURI.getFragment();
+
+                Path remoteFsPath = new Path(resourceURI.getPath());
+                String resourceName = (fragment != null && fragment.length() > 0) ? fragment : remoteFsPath.getName();
+
+                addExtraResource(resourceName, remoteFsPath);
+            }
+        }
+    }
+
+    @Override
+    public void remove(TezOperator op) {
+        // The remove method does not replace output and input keys in TezInput
+        // and TezOutput. That has to be handled separately.
+        for (OperatorKey opKey : op.outEdges.keySet()) {
+            getOperator(opKey).inEdges.remove(op.getOperatorKey());
+        }
+        for (OperatorKey opKey : op.inEdges.keySet()) {
+            getOperator(opKey).outEdges.remove(op.getOperatorKey());
+        }
+        super.remove(op);
+    }
+
+    @Override
+    public boolean disconnect(TezOperator from, TezOperator to) {
+        from.outEdges.remove(to.getOperatorKey());
+        to.inEdges.remove(from.getOperatorKey());
+        return super.disconnect(from, to);
+    }
+
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Fri May 30 19:07:23 2014
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexGroup;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * An operator model for a Tez job. Acts as a host to the plans that will
+ * execute in Tez vertices.
+ */
+public class TezOperator extends Operator<TezOpPlanVisitor> {
+    private static final long serialVersionUID = 1L;
+
+    // Processor pipeline
+    public PhysicalPlan plan;
+
+    // Descriptors for out-bound edges.
+    public Map<OperatorKey, TezEdgeDescriptor> outEdges;
+    // Descriptors for in-bound edges.
+    public Map<OperatorKey, TezEdgeDescriptor> inEdges;
+
+    public Set<String> UDFs;
+    public Set<PhysicalOperator> scalars;
+
+    // Use AtomicInteger for access by reference and being able to reset in
+    // TezDAGBuilder based on number of input splits.
+    // We just need mutability and not concurrency
+    // This is to ensure that vertexes with 1-1 edge have same parallelism
+    // even when parallelism of source vertex changes.
+    // Can change to int and set to -1 if TEZ-800 gets fixed.
+    private AtomicInteger requestedParallelism = new AtomicInteger(-1);
+
+    // TODO: When constructing Tez vertex, we have to specify how much resource
+    // the vertex will need. So we need to estimate these values while compiling
+    // physical plan into tez plan. For now, we're using default values - 1G mem
+    // and 1 core.
+    //int requestedMemory = 1024;
+    //int requestedCpu = 1;
+
+    // This indicates that this TezOper is a split operator
+    private boolean splitter;
+
+    // This indicates that this TezOper has POSplit as a predecessor.
+    private OperatorKey splitParent = null;
+
+    // Indicates that the plan creation is complete
+    boolean closed = false;
+
+    // Indicate whether we need to split the DAG below the operator
+    // The result is two or more DAG connected DAG inside the same plan container
+    boolean segmentBelow = false;
+
+    // Indicates if this is a limit after a sort
+    boolean limitAfterSort = false;
+
+    //Indicates if this job is an order by job
+    boolean globalSort = false;
+
+    //The sort order of the columns;
+    //asc is true and desc is false
+    boolean[] sortOrder;
+
+    // Last POLimit value in this map reduce operator, needed by LimitAdjuster
+    // to add additional map reduce operator with 1 reducer after this
+    long limit = -1;
+
+    private boolean skewedJoin = false;
+
+    // Flag to indicate if the small input splits need to be combined to form a larger
+    // one in order to reduce the number of mappers. For merge join, both tables
+    // are NOT combinable for correctness.
+    private boolean combineSmallSplits = true;
+
+    // If not null, need to collect sample sent from predecessor
+    TezOperator sampleOperator = null;
+
+    // If true, we will use secondary key sort in the job
+    private boolean useSecondaryKey = false;
+
+    // Types of blocking operators. For now, we only support the following ones.
+    private static enum OPER_FEATURE {
+        NONE,
+        // Indicate if this job is a union job
+        UNION,
+        // Indicate if this job is a merge indexer
+        INDEXER,
+        // Indicate if this job is a sampling job
+        SAMPLER,
+        // Indicate if this job is a group by job
+        GROUPBY,
+        // Indicate if this job is a cogroup job
+        COGROUP,
+        // Indicate if this job is a regular join job
+        HASHJOIN;
+    };
+
+    OPER_FEATURE feature = OPER_FEATURE.NONE;
+
+    private List<OperatorKey> vertexGroupMembers;
+    // For union
+    private VertexGroupInfo vertexGroupInfo;
+    // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
+    private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
+
+    public TezOperator(OperatorKey k) {
+        super(k);
+        plan = new PhysicalPlan();
+        outEdges = Maps.newHashMap();
+        inEdges = Maps.newHashMap();
+        UDFs = Sets.newHashSet();
+        scalars = Sets.newHashSet();
+    }
+
+    public String getProcessorName() {
+        return PigProcessor.class.getName();
+    }
+
+    @Override
+    public void visit(TezOpPlanVisitor v) throws VisitorException {
+        v.visitTezOp(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    public int getRequestedParallelism() {
+        return requestedParallelism.get();
+    }
+
+    public void setRequestedParallelism(int requestedParallelism) {
+        this.requestedParallelism.set(requestedParallelism);
+    }
+
+    public void setRequestedParallelismByReference(TezOperator oper) {
+        this.requestedParallelism = oper.requestedParallelism;
+    }
+
+    public OperatorKey getSplitParent() {
+        return splitParent;
+    }
+
+    public void setSplitParent(OperatorKey splitParent) {
+        this.splitParent = splitParent;
+    }
+
+    public void setSplitter(boolean spl) {
+        splitter = spl;
+    }
+
+    public boolean isSplitter() {
+        return splitter;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void setClosed(boolean closed) {
+        this.closed = closed;
+    }
+
+    public boolean isGroupBy() {
+        return (feature == OPER_FEATURE.GROUPBY);
+    }
+
+    public void markGroupBy() {
+        feature = OPER_FEATURE.GROUPBY;
+    }
+
+    public boolean isCogroup() {
+        return (feature == OPER_FEATURE.COGROUP);
+    }
+
+    public void markCogroup() {
+        feature = OPER_FEATURE.COGROUP;
+    }
+
+    public boolean isRegularJoin() {
+        return (feature == OPER_FEATURE.HASHJOIN);
+    }
+
+    public void markRegularJoin() {
+        feature = OPER_FEATURE.HASHJOIN;
+    }
+
+    public boolean isUnion() {
+        return (feature == OPER_FEATURE.UNION);
+    }
+
+    public void markUnion() {
+        feature = OPER_FEATURE.UNION;
+    }
+
+    public boolean isIndexer() {
+        return (feature == OPER_FEATURE.INDEXER);
+    }
+
+    public void markIndexer() {
+        feature = OPER_FEATURE.INDEXER;
+    }
+
+    public boolean isSampler() {
+        return (feature == OPER_FEATURE.SAMPLER);
+    }
+
+    public void markSampler() {
+        feature = OPER_FEATURE.SAMPLER;
+    }
+
+    public boolean isUseSecondaryKey() {
+        return useSecondaryKey;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public List<OperatorKey> getUnionPredecessors() {
+        return vertexGroupMembers;
+    }
+
+    public List<OperatorKey> getVertexGroupMembers() {
+        return vertexGroupMembers;
+    }
+
+    public void addUnionPredecessor(OperatorKey unionPredecessor) {
+        if (vertexGroupMembers == null) {
+            vertexGroupMembers = new ArrayList<OperatorKey>();
+        }
+        this.vertexGroupMembers.add(unionPredecessor);
+    }
+
+    public void setVertexGroupMembers(List<OperatorKey> vertexGroupMembers) {
+        this.vertexGroupMembers = vertexGroupMembers;
+    }
+
+    // Union is the only operator that uses alias vertex (VertexGroup) now. But
+    // more operators could be added to the list in the future.
+    public boolean isVertexGroup() {
+        return vertexGroupInfo != null;
+    }
+
+    public VertexGroupInfo getVertexGroupInfo() {
+        return vertexGroupInfo;
+    }
+
+    public void setVertexGroupInfo(VertexGroupInfo vertexGroup) {
+        this.vertexGroupInfo = vertexGroup;
+    }
+
+    public void addVertexGroupStore(OperatorKey storeKey, OperatorKey vertexGroupKey) {
+        if (this.vertexGroupStores == null) {
+            this.vertexGroupStores = new HashMap<OperatorKey, OperatorKey>();
+        }
+        this.vertexGroupStores.put(storeKey, vertexGroupKey);
+    }
+
+    public Map<OperatorKey, OperatorKey> getVertexGroupStores() {
+        return this.vertexGroupStores;
+    }
+
+    @Override
+    public String name() {
+        String udfStr = getUDFsAsStr();
+        StringBuilder sb = new StringBuilder("Tez" + "(" + requestedParallelism +
+                (udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString());
+        return sb.toString();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder(name() + ":\n");
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        if (!plan.isEmpty()) {
+            plan.explain(baos);
+            String mp = new String(baos.toByteArray());
+            sb.append(shiftStringByTabs(mp, "|   "));
+        } else {
+            sb.append("Plan Empty");
+        }
+        return sb.toString();
+    }
+
+    private String getUDFsAsStr() {
+        StringBuilder sb = new StringBuilder();
+        if(UDFs!=null && UDFs.size()>0){
+            for (String str : UDFs) {
+                sb.append(str.substring(str.lastIndexOf('.')+1));
+                sb.append(',');
+            }
+            sb.deleteCharAt(sb.length()-1);
+        }
+        return sb.toString();
+    }
+
+    private String shiftStringByTabs(String DFStr, String tab) {
+        StringBuilder sb = new StringBuilder();
+        String[] spl = DFStr.split("\n");
+        for (int i = 0; i < spl.length; i++) {
+            sb.append(tab);
+            sb.append(spl[i]);
+            sb.append("\n");
+        }
+        sb.delete(sb.length() - "\n".length(), sb.length());
+        return sb.toString();
+    }
+
+    public boolean needSegmentBelow() {
+        return segmentBelow;
+    }
+
+    public void setSortOrder(boolean[] sortOrder) {
+        if(null == sortOrder) return;
+        this.sortOrder = new boolean[sortOrder.length];
+        for(int i = 0; i < sortOrder.length; ++i) {
+            this.sortOrder[i] = sortOrder[i];
+        }
+    }
+
+    public boolean[] getSortOrder() {
+        return sortOrder;
+    }
+
+    public void setGlobalSort(boolean globalSort) {
+        this.globalSort = globalSort;
+    }
+
+    public boolean isGlobalSort() {
+        return globalSort;
+    }
+
+    public void setLimitAfterSort(boolean limitAfterSort) {
+        this.limitAfterSort = limitAfterSort;
+    }
+
+    public boolean isLimitAfterSort() {
+        return limitAfterSort;
+    }
+
+    public void setSkewedJoin(boolean skewedJoin) {
+        this.skewedJoin = skewedJoin;
+    }
+
+    public boolean isSkewedJoin() {
+        return skewedJoin;
+    }
+
+    protected void noCombineSmallSplits() {
+        combineSmallSplits = false;
+    }
+
+    public boolean combineSmallSplits() {
+        return combineSmallSplits;
+    }
+
+    public static class VertexGroupInfo {
+
+        private List<OperatorKey> inputKeys;
+        private String outputKey;
+        private POStore store;
+        private OutputDescriptor storeOutDescriptor;
+        private VertexGroup vertexGroup;
+
+        public VertexGroupInfo() {
+        }
+
+        public VertexGroupInfo(POStore store) {
+            this.store = store;
+        }
+
+        public List<OperatorKey> getInputs() {
+            return inputKeys;
+        }
+
+        public void addInput(OperatorKey input) {
+            if (inputKeys == null) {
+                inputKeys = new ArrayList<OperatorKey>();
+            }
+            this.inputKeys.add(input);
+        }
+
+        public boolean removeInput(OperatorKey input) {
+            return this.inputKeys.remove(input);
+        }
+
+        public String getOutput() {
+            return outputKey;
+        }
+
+        public void setOutput(String output) {
+            this.outputKey = output;
+        }
+
+        public POStore getStore() {
+            return store;
+        }
+
+        public OutputDescriptor getStoreOutputDescriptor() {
+            return storeOutDescriptor;
+        }
+
+        public void setStoreOutputDescriptor(OutputDescriptor storeOutDescriptor) {
+            this.storeOutDescriptor = storeOutDescriptor;
+        }
+
+        public VertexGroup getVertexGroup() {
+            return vertexGroup;
+        }
+
+        public void setVertexGroup(VertexGroup vertexGroup) {
+            this.vertexGroup = vertexGroup;
+        }
+
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java Fri May 30 19:07:23 2014
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.LogicalOutput;
+
+/**
+ * This interface is implemented by PhysicalOperators that can have Tez outputs
+ * attached directly to the operator.
+ */
+
+public interface TezOutput {
+
+    public String[] getTezOutputs();
+
+    public void replaceOutput(String oldOutputKey, String newOutputKey);
+
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException;
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Fri May 30 19:07:23 2014
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * A port of the POPackageAnnotator from MR to Tez.
+ *
+ */
+public class TezPOPackageAnnotator extends TezOpPlanVisitor {
+
+    /**
+     * @param plan Tez plan to visit
+     */
+    public TezPOPackageAnnotator(TezOperPlan plan) {
+        super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        if(!tezOp.plan.isEmpty()) {
+            PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(tezOp.plan);
+            pkgDiscoverer.visit();
+            POPackage pkg = pkgDiscoverer.getPkg();
+            if(pkg != null) {
+                handlePackage(tezOp, pkg);
+            }
+        }
+    }
+
+    private void handlePackage(TezOperator pkgTezOp, POPackage pkg) throws VisitorException {
+        // the LocalRearrange(s) must be in the plan of a predecessor tez op
+        int lrFound = 0;
+        List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
+        for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
+            TezOperator predTezOp = it.next();
+            if (predTezOp.isVertexGroup()) {
+                // Just get one of the inputs to vertex group
+                predTezOp = getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
+            }
+            lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
+            if(lrFound == pkg.getNumInps()) {
+                break;
+            }
+        }
+
+        if(lrFound != pkg.getNumInps()) {
+            int errCode = 2086;
+            String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators.";
+            throw new OptimizerException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    private int patchPackage(TezOperator predTezOp, TezOperator pkgTezOp, POPackage pkg) throws VisitorException {
+        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(predTezOp.plan, pkgTezOp, pkg);
+        lrDiscoverer.visit();
+        // let our caller know if we managed to patch
+        // the package
+        return lrDiscoverer.getLoRearrangeFound();
+    }
+
+    /**
+     * Simple visitor of the "Reduce" physical plan
+     * which will get a reference to the POPacakge
+     * present in the plan
+     */
+    static class PackageDiscoverer extends PhyPlanVisitor {
+
+        private POPackage pkg;
+
+        public PackageDiscoverer(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            this.pkg = pkg;
+        };
+
+        /**
+         * @return the pkg
+         */
+        public POPackage getPkg() {
+            return pkg;
+        }
+
+    }
+
+    /**
+     * Physical Plan visitor which tries to get the
+     * LocalRearrange(s) present in the plan (if any) and
+     * annotate the POPackage given to it with the information
+     * in the LocalRearrange (regarding columns in the "value"
+     * present in the "key")
+     */
+    static class LoRearrangeDiscoverer extends PhyPlanVisitor {
+
+        private int loRearrangeFound = 0;
+        private TezOperator pkgTezOp;
+        private POPackage pkg;
+
+        public LoRearrangeDiscoverer(PhysicalPlan plan, TezOperator pkgTezOp, POPackage pkg) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.pkgTezOp = pkgTezOp;
+            this.pkg = pkg;
+        }
+
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
+            POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
+            if (!lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString())) {
+                return;
+            }
+            loRearrangeFound++;
+            Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+            if (pkg.getPkgr() instanceof LitePackager) {
+                if(lrearrange.getIndex() != 0) {
+                    // Throw some exception here
+                    throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
+                }
+            }
+
+            // annotate the package with information from the LORearrange
+            // update the keyInfo information if already present in the POPackage
+            keyInfo = pkg.getPkgr().getKeyInfo();
+            if(keyInfo == null)
+                keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+            if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+                // something is wrong - we should not be getting key info
+                // for the same index from two different Local Rearranges
+                int errCode = 2087;
+                String msg = "Unexpected problem during optimization." +
+                        " Found index:" + lrearrange.getIndex() +
+                        " in multiple LocalRearrange operators.";
+                throw new OptimizerException(msg, errCode, PigException.BUG);
+
+            }
+            keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+                    new Pair<Boolean, Map<Integer, Integer>>(
+                            lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+        }
+
+        /**
+         * @return the loRearrangeFound
+         */
+        public int getLoRearrangeFound() {
+            return loRearrangeFound;
+        }
+
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java Fri May 30 19:07:23 2014
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+public class TezPOStreamVisitor extends TezOpPlanVisitor {
+
+    private Set<String> cacheFiles = new HashSet<String>();
+    private Set<String> shipFiles = new HashSet<String>();
+
+    public TezPOStreamVisitor(TezOperPlan plan) {
+        super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        if(!tezOp.plan.isEmpty()) {
+            StreamFileVisitor streamFileVisitor = new StreamFileVisitor(tezOp.plan);
+            streamFileVisitor.visit();
+        }
+    }
+
+    class StreamFileVisitor extends PhyPlanVisitor {
+
+        public StreamFileVisitor(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        }
+
+        public void visitStream(POStream stream) throws VisitorException {
+            StreamingCommand command = stream.getCommand();
+            cacheFiles.addAll(command.getCacheSpecs());
+            shipFiles.addAll(command.getShipSpecs());
+        }
+    }
+
+    public Set<String> getCacheFiles() {
+        return cacheFiles;
+    }
+
+    public Set<String> getShipFiles() {
+        return shipFiles;
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Fri May 30 19:07:23 2014
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+
+public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
+    private static final long serialVersionUID = 1L;
+    private TezResourceManager tezResourceManager;
+    private PigContext pigContext;
+
+    public TezPlanContainer(PigContext pigContext, TezResourceManager tezResourceManager) {
+        this.pigContext = pigContext;
+        this.tezResourceManager = tezResourceManager;
+    }
+
+    // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
+    // will use them for simplicity). This differs from MR Pig, where they are added to
+    // the job jar.
+    public Map<String, LocalResource> getLocalResources() throws Exception {
+        Set<URL> jarLists = new HashSet<URL>();
+
+        jarLists.add(tezResourceManager.getBootStrapJar());
+
+        // In MR Pig the extra jars and script jars get put in Distributed Cache, but
+        // in Tez we'll add them as local resources.
+        for (URL jarUrl : pigContext.extraJars) {
+            jarLists.add(jarUrl);
+        }
+
+        for (String jarFile : pigContext.scriptJars) {
+            jarLists.add(new File(jarFile).toURI().toURL());
+        }
+
+        // Script files for non-Java UDF's are added to the Job.jar by the JarManager class,
+        // except for Groovy files, which need to be explicitly added as local resources due
+        // to the GroovyScriptEngine (see JarManager.java for comments).
+        for (Map.Entry<String, File> scriptFile : pigContext.getScriptFiles().entrySet()) {
+            if (scriptFile.getKey().endsWith(".groovy")) {
+                jarLists.add(scriptFile.getValue().toURI().toURL());
+            }
+        }
+
+        TezPlanContainerUDFCollector tezPlanContainerUDFCollector = new TezPlanContainerUDFCollector(this);
+        tezPlanContainerUDFCollector.visit();
+        Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
+
+        for (String func : udfs) {
+            Class<?> clazz = pigContext.getClassForAlias(func);
+            if (clazz != null) {
+                String jarName = JarManager.findContainingJar(clazz);
+                if (jarName == null) {
+                    // It's possible that clazz is not registered by an external
+                    // jar. For eg, in unit tests, UDFs that are not in default
+                    // packages may be used. In that case, we should skip it to
+                    // avoid NPE.
+                    continue;
+                }
+                URL jarUrl = new File(jarName).toURI().toURL();
+                jarLists.add(jarUrl);
+            }
+        }
+
+        // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
+        // resources for them yet.
+        // if ("StreamingUDF".equals(clazz.getSimpleName())) {
+        //     for (String fileName : StreamingUDF.getResourcesForJar()) {
+        //         jarLists.add(new File(fileName).toURI().toURL());
+        //     }
+        // }
+
+        return tezResourceManager.addTezResources(jarLists);
+    }
+
+    public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
+        synchronized(this) {
+            while (getRoots()!=null && !getRoots().isEmpty()) {
+                TezPlanContainerNode currentPlan = null;
+                for (TezPlanContainerNode plan : getRoots()) {
+                    if (!processedPlans.contains(plan.getNode())) {
+                        currentPlan = plan;
+                        break;
+                    }
+                }
+                if (currentPlan!=null) {
+                    return currentPlan.getNode();
+                } else {
+                    try {
+                        wait();
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public void updatePlan(TezOperPlan plan, boolean succ) {
+        String scope = getRoots().get(0).getOperatorKey().getScope();
+        TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(new OperatorKey(scope,
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)), plan);
+        synchronized(this) {
+            if (succ) {
+                remove(tezPlanContainerNode);
+            } else {
+                // job fail
+                trimBelow(tezPlanContainerNode);
+                remove(tezPlanContainerNode);
+            }
+            notify();
+        }
+    }
+
+    public void split(TezPlanContainerNode planNode) throws PlanException {
+        TezOperPlan tezOperPlan = planNode.getNode();
+        TezOperator operToSegment = null;
+        List<TezOperator> succs = new ArrayList<TezOperator>();
+        for (TezOperator tezOper : tezOperPlan) {
+            if (tezOper.needSegmentBelow() && tezOperPlan.getSuccessors(tezOper)!=null) {
+                operToSegment = tezOper;
+                succs.addAll(tezOperPlan.getSuccessors(tezOper));
+                break;
+            }
+        }
+        if (operToSegment != null) {
+            for (TezOperator succ : succs) {
+                tezOperPlan.disconnect(operToSegment, succ);
+                TezOperPlan newOperPlan = new TezOperPlan(tezResourceManager);
+                List<TezPlanContainerNode> containerSuccs = new ArrayList<TezPlanContainerNode>();
+                if (getSuccessors(planNode)!=null) {
+                    containerSuccs.addAll(getSuccessors(planNode));
+                }
+                tezOperPlan.moveTree(succ, newOperPlan);
+                String scope = operToSegment.getOperatorKey().getScope();
+                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), newOperPlan);
+                add(newPlanNode);
+                for (TezPlanContainerNode containerNodeSucc : containerSuccs) {
+                    disconnect(planNode, containerNodeSucc);
+                    connect(newPlanNode, containerNodeSucc);
+                }
+                connect(planNode, newPlanNode);
+                split(newPlanNode);
+            }
+            split(planNode);
+        }
+    }
+
+    @Override
+    public String toString() {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(baos);
+        TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, this);
+        printer.setVerbose(true);
+        try {
+            printer.visit();
+        } catch (VisitorException e) {
+            throw new RuntimeException(e);
+        }
+        return baos.toString();
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java Fri May 30 19:07:23 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor>{
+    private static final long serialVersionUID = 1L;
+    TezOperPlan node;
+    public TezPlanContainerNode(OperatorKey k, TezOperPlan node) {
+        super(k);
+        this.node = node;
+    }
+
+    @Override
+    public void visit(TezPlanContainerVisitor v) throws VisitorException {
+        v.visitTezPlanContainerNode(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        return "DAG " + mKey;
+    }
+
+    public TezOperPlan getNode() {
+        return node;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o != null && o instanceof TezPlanContainerNode) {
+            return ((TezPlanContainerNode)o).getNode().equals(getNode());
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return getNode().hashCode();
+    }
+    
+    @Override
+    public String toString() {
+        return getNode().toString();
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java Fri May 30 19:07:23 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPrinter.TezGraphPrinter;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerPrinter extends TezPlanContainerVisitor {
+    private PrintStream mStream = null;
+    private boolean isVerbose = true;
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan tez plan to print
+     */
+    public TezPlanContainerPrinter(PrintStream ps, TezPlanContainer planContainer) {
+        super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+        mStream = ps;
+        mStream.println("#--------------------------------------------------");
+        mStream.println("# There are " + planContainer.size() + " DAGs in the session");
+        mStream.println("#--------------------------------------------------");
+    }
+
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
+    @Override
+    public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+        mStream.println("#--------------------------------------------------");
+        mStream.println("# TEZ DAG plan: " + tezPlanContainerNode.getOperatorKey());
+        mStream.println("#--------------------------------------------------");
+        TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getNode());
+        graphPrinter.visit();
+        mStream.print(graphPrinter.toString());
+        TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getNode());
+        printer.setVerbose(isVerbose);
+        printer.visit();
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java Fri May 30 19:07:23 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+import com.google.common.collect.Sets;
+
+public class TezPlanContainerUDFCollector extends TezPlanContainerVisitor {
+    private Set<String> udfs = Sets.newHashSet();
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan tez plan to print
+     */
+    public TezPlanContainerUDFCollector(TezPlanContainer planContainer) {
+        super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+    }
+
+    @Override
+    public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+        Iterator<TezOperator> it = tezPlanContainerNode.getNode().iterator();
+        while (it.hasNext()) {
+            udfs.addAll(it.next().UDFs);
+        }
+    }
+
+    public Set<String> getUdfs() {
+        return udfs;
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java Fri May 30 19:07:23 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerVisitor extends PlanVisitor<TezPlanContainerNode, TezPlanContainer> {
+    public TezPlanContainerVisitor(TezPlanContainer plan, PlanWalker<TezPlanContainerNode, TezPlanContainer> walker) {
+        super(plan, walker);
+        // TODO Auto-generated constructor stub
+    }
+
+    public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+        // TODO Auto-generated method stub
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Fri May 30 19:07:23 2014
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator.VertexGroupInfo;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to print out the Tez plan.
+ */
+public class TezPrinter extends TezOpPlanVisitor {
+
+    private PrintStream mStream = null;
+    private boolean isVerbose = true;
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan tez plan to print
+     */
+    public TezPrinter(PrintStream ps, TezOperPlan plan) {
+        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+        mStream = ps;
+    }
+
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOper) throws VisitorException {
+        if (tezOper.isVertexGroup()) {
+            VertexGroupInfo info = tezOper.getVertexGroupInfo();
+            mStream.println("Tez vertex group "
+                    + tezOper.getOperatorKey().toString() + "\t<-\t "
+                    + info.getInputs() + "\t->\t " + info.getOutput());
+            mStream.println("# No plan on vertex group");
+        } else {
+            mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
+        }
+        if (tezOper.inEdges.size() > 0) {
+            for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) {
+                //TODO: Print other edge properties like custom partitioner
+                if (!inEdge.getValue().combinePlan.isEmpty()) {
+                    mStream.println("# Combine plan on edge <" + inEdge.getKey() + ">");
+                    PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
+                            new PlanPrinter<PhysicalOperator, PhysicalPlan>(
+                                    inEdge.getValue().combinePlan, mStream);
+                    printer.setVerbose(isVerbose);
+                    printer.visit();
+                    mStream.println();
+                }
+            }
+        }
+        if (tezOper.plan != null && tezOper.plan.size() > 0) {
+            mStream.println("# Plan on vertex");
+            PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
+                    new PlanPrinter<PhysicalOperator, PhysicalPlan>(tezOper.plan, mStream);
+            printer.setVerbose(isVerbose);
+            printer.visit();
+            mStream.println();
+        }
+    }
+
+    /**
+     * This class prints the Tez Vertex Graph
+     */
+    public static class TezGraphPrinter extends TezOpPlanVisitor {
+
+        StringBuffer buf;
+
+        public TezGraphPrinter(TezOperPlan plan) {
+            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+            buf = new StringBuffer();
+        }
+
+        @Override
+        public void visitTezOp(TezOperator tezOper) throws VisitorException {
+            buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
+            List<TezOperator> succs = mPlan.getSuccessors(tezOper);
+            if (succs != null) {
+                buf.append("\t->\t");
+                for (TezOperator op : succs) {
+                    buf.append("Tez vertex " + op.getOperatorKey().toString()).append(",");
+                }
+            }
+            buf.append("\n");
+        }
+
+        @Override
+        public String toString() {
+            buf.append("\n");
+            return buf.toString();
+        }
+    }
+}