You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC
svn commit: r1598702 [11/23] - in /pig/trunk: ./ ivy/
shims/src/hadoop23/org/apache/pig/backend/hadoop23/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/ba...
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Fri May 30 19:07:23 2014
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.impl.PigContext;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+
+/**
+ * This is compiler class that takes a TezOperPlan and converts it into a
+ * JobControl object with the relevant dependency info maintained. The
+ * JobControl object is made up of TezJobs each of which has a JobConf.
+ */
+public class TezJobControlCompiler {
+ private static final Log log = LogFactory.getLog(TezJobControlCompiler.class);
+ private static int dagIdentifier = 0;
+
+ private PigContext pigContext;
+ private TezConfiguration tezConf;
+
+ public TezJobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+ this.pigContext = pigContext;
+ this.tezConf = new TezConfiguration(conf);
+ }
+
+ public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
+ throws IOException, YarnException {
+ String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
+ DAG tezDag = new DAG(jobName + "-" + dagIdentifier);
+ dagIdentifier++;
+ tezDag.setCredentials(new Credentials());
+ TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
+ dagBuilder.visit();
+ return tezDag;
+ }
+
+ public TezJobControl compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer)
+ throws JobCreationException {
+ int timeToSleep;
+ String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "1000";
+ String pigJobControlSleep = tezConf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
+ if (!pigJobControlSleep.equals(defaultPigJobControlSleep)) {
+ log.info("overriding default JobControl sleep (" +
+ defaultPigJobControlSleep + ") to " + pigJobControlSleep);
+ }
+
+ try {
+ timeToSleep = Integer.parseInt(pigJobControlSleep);
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid configuration " +
+ "pig.jobcontrol.sleep=" + pigJobControlSleep +
+ " should be a time in ms. default=" + defaultPigJobControlSleep, e);
+ }
+
+ TezJobControl jobCtrl = new TezJobControl(grpName, timeToSleep);
+
+ try {
+ // A single Tez job always pack only 1 Tez plan. We will track
+ // Tez job asynchronously to exploit parallel execution opportunities.
+ TezJob job = getJob(tezPlan, planContainer);
+ jobCtrl.addJob(job);
+ } catch (JobCreationException jce) {
+ throw jce;
+ } catch(Exception e) {
+ int errCode = 2017;
+ String msg = "Internal error creating job configuration.";
+ throw new JobCreationException(msg, errCode, PigException.BUG, e);
+ }
+
+ return jobCtrl;
+ }
+
+ private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer)
+ throws JobCreationException {
+ try {
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ localResources.putAll(planContainer.getLocalResources());
+ localResources.putAll(tezPlan.getExtraResources());
+ DAG tezDag = buildDAG(tezPlan, localResources);
+ return new TezJob(tezConf, tezDag, localResources);
+ } catch (Exception e) {
+ int errCode = 2017;
+ String msg = "Internal error creating job configuration.";
+ throw new JobCreationException(msg, errCode, PigException.BUG, e);
+ }
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java Fri May 30 19:07:23 2014
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
+
+public class TezJobNotifier {
+ private TezPlanContainer planContainer;
+ private TezOperPlan plan;
+ public TezJobNotifier(TezPlanContainer planContainer, TezOperPlan plan) {
+ this.planContainer = planContainer;
+ this.plan = plan;
+ }
+
+ public void complete(JobControl jobControl) {
+ boolean succ = true;
+ if (jobControl.getFailedJobList().size()!=0) {
+ succ = false;
+ }
+
+ planContainer.updatePlan(plan, succ);
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri May 30 19:07:23 2014
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.NoopFilterRemover;
+import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezStats;
+import org.apache.pig.tools.pigstats.tez.TezTaskStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.TezConfiguration;
+
+/**
+ * Main class that launches pig for Tez
+ */
+public class TezLauncher extends Launcher {
+
+ private static final Log log = LogFactory.getLog(TezLauncher.class);
+ private boolean aggregateWarning = false;
+ private TezScriptState tezScriptState;
+ private TezStats tezStats;
+
+ @Override
+ public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
+ aggregateWarning = Boolean.parseBoolean(pc.getProperties().getProperty("aggregate.warning", "false"));
+ Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+ Path stagingDir = FileLocalizer.getTemporaryPath(pc, "-tez");
+
+ TezResourceManager tezResourceManager = new TezResourceManager(stagingDir, pc, conf);
+
+ log.info("Tez staging directory is " + stagingDir.toString());
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+
+ List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
+
+ tezScriptState = TezScriptState.get();
+ tezStats = new TezStats(pc);
+ PigStats.start(tezStats);
+
+ TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
+ TezPlanContainer tezPlanContainer = compile(php, pc, tezResourceManager);
+
+ TezOperPlan tezPlan;
+
+ while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans))!=null) {
+ processedPlans.add(tezPlan);
+
+ TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
+ pkgAnnotator.visit();
+
+ tezStats.initialize(tezPlan);
+
+ jc = jcc.compile(tezPlan, grpName, tezPlanContainer);
+ TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
+ ((TezJobControl)jc).setJobNotifier(notifier);
+
+ // Initially, all jobs are in wait state.
+ List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
+ log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
+
+ tezScriptState.emitInitialPlanNotification(tezPlan);
+ tezScriptState.emitLaunchStartedNotification(tezPlan.size());
+
+ // Set the thread UDFContext so registered classes are available.
+ final UDFContext udfContext = UDFContext.getUDFContext();
+ Thread jcThread = new Thread(jc, "JobControl") {
+ @Override
+ public void run() {
+ UDFContext.setUdfContext(udfContext.clone());
+ super.run();
+ }
+ };
+
+ JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
+ jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+ jcThread.setContextClassLoader(PigContext.getClassLoader());
+
+ // TezJobControl always holds a single TezJob. We use JobControl
+ // only because it is convenient to launch the job via
+ // ControlledJob.submit().
+ TezJob job = (TezJob)jobsWithoutIds.get(0);
+ tezStats.setTezJob(job);
+
+ // Mark the times that the jobs were submitted so it's reflected in job
+ // history props
+ long scriptSubmittedTimestamp = System.currentTimeMillis();
+ // Job.getConfiguration returns the shared configuration object
+ Configuration jobConf = job.getJob().getConfiguration();
+ jobConf.set("pig.script.submitted.timestamp",
+ Long.toString(scriptSubmittedTimestamp));
+ jobConf.set("pig.job.submitted.timestamp",
+ Long.toString(System.currentTimeMillis()));
+
+ // Inform ppnl of jobs submission
+ tezScriptState.emitJobsSubmittedNotification(jobsWithoutIds.size());
+
+ // All the setup done, now lets launch the jobs. DAG is submitted to
+ // YARN cluster by TezJob.submit().
+ jcThread.start();
+
+ Double prevProgress = 0.0;
+ while(!jc.allFinished()) {
+ List<ControlledJob> jobsAssignedIdInThisRun = new ArrayList<ControlledJob>();
+ if (job.getApplicationId() != null) {
+ jobsAssignedIdInThisRun.add(job);
+ }
+ notifyStarted(job);
+ jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
+ prevProgress = notifyProgress(job, prevProgress);
+ }
+
+ notifyFinishedOrFailed(job);
+ tezStats.accumulateStats(job);
+ tezScriptState.emitProgressUpdatedNotification(100);
+ }
+
+ tezStats.finish();
+ tezScriptState.emitLaunchCompletedNotification(tezStats.getNumberSuccessfulJobs());
+
+ return tezStats;
+ }
+
+ private void notifyStarted(TezJob job) throws IOException {
+ for (Vertex v : job.getDAG().getVertices()) {
+ TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+ byte[] bb = v.getProcessorDescriptor().getUserPayload();
+ Configuration conf = TezUtils.createConfFromUserPayload(bb);
+ tts.setConf(conf);
+ tts.setId(v.getVertexName());
+ tezScriptState.emitJobStartedNotification(v.getVertexName());
+ }
+ }
+
+ private Double notifyProgress(TezJob job, Double prevProgress) {
+ int numberOfJobs = tezStats.getNumberJobs();
+ Double perCom = 0.0;
+ if (job.getJobState() == ControlledJob.State.RUNNING) {
+ Double fractionComplete = job.getDAGProgress();
+ perCom += fractionComplete;
+ }
+ perCom = (perCom/(double)numberOfJobs)*100;
+ if (perCom >= (prevProgress + 4.0)) {
+ tezScriptState.emitProgressUpdatedNotification( perCom.intValue() );
+ return perCom;
+ } else {
+ return prevProgress;
+ }
+ }
+
+ private void notifyFinishedOrFailed(TezJob job) {
+ if (job.getJobState() == ControlledJob.State.SUCCESS) {
+ for (Vertex v : job.getDAG().getVertices()) {
+ TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+ tezScriptState.emitjobFinishedNotification(tts);
+ }
+ } else if (job.getJobState() == ControlledJob.State.FAILED) {
+ for (Vertex v : ((TezJob)job).getDAG().getVertices()) {
+ TezTaskStats tts = tezStats.getVertexStats(v.getVertexName());
+ tezScriptState.emitJobFailedNotification(tts);
+ }
+ }
+ }
+
+ @Override
+ public void explain(PhysicalPlan php, PigContext pc, PrintStream ps,
+ String format, boolean verbose) throws PlanException,
+ VisitorException, IOException {
+ log.debug("Entering TezLauncher.explain");
+ TezPlanContainer tezPlanContainer = compile(php, pc, null);
+
+ if (format.equals("text")) {
+ TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
+ printer.setVerbose(verbose);
+ printer.visit();
+ } else {
+ // TODO: add support for other file format
+ throw new IOException("Non-text output of explain is not supported.");
+ }
+ }
+
+ public TezPlanContainer compile(PhysicalPlan php, PigContext pc, TezResourceManager tezResourceManager)
+ throws PlanException, IOException, VisitorException {
+ TezCompiler comp = new TezCompiler(php, pc, tezResourceManager);
+ TezOperPlan tezPlan = comp.compile();
+
+ NoopFilterRemover filter = new NoopFilterRemover(tezPlan);
+ filter.visit();
+
+ boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
+ PigConfiguration.PROP_NO_COMBINER, "false"));
+
+ // Run CombinerOptimizer on Tez plan
+ if (!pc.inIllustrator && !nocombiner) {
+ boolean doMapAgg = Boolean.parseBoolean(pc.getProperties().getProperty(
+ PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false"));
+ CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg);
+ co.visit();
+ co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
+ }
+
+ // Run optimizer to make use of secondary sort key when possible for nested foreach
+ // order by and distinct. Should be done before AccumulatorOptimizer
+ boolean noSecKeySort = Boolean.parseBoolean(pc.getProperties().getProperty(
+ PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "false"));
+ if (!pc.inIllustrator && !noSecKeySort) {
+ SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
+ skOptimizer.visit();
+ }
+
+ boolean isMultiQuery =
+ "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+
+ if (isMultiQuery) {
+ // reduces the number of TezOpers in the Tez plan generated
+ // by multi-query (multi-store) script.
+ MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(tezPlan);
+ mqOptimizer.visit();
+ }
+
+ // Run AccumulatorOptimizer on Tez plan
+ boolean isAccum = Boolean.parseBoolean(pc.getProperties().getProperty(
+ PigConfiguration.OPT_ACCUMULATOR, "true"));
+ if (isAccum) {
+ AccumulatorOptimizer accum = new AccumulatorOptimizer(tezPlan);
+ accum.visit();
+ }
+
+ boolean isUnionOpt = "true".equalsIgnoreCase(pc.getProperties()
+ .getProperty(PigConfiguration.TEZ_OPT_UNION, "true"));
+ // Use VertexGroup in Tez
+ if (isUnionOpt) {
+ UnionOptimizer uo = new UnionOptimizer(tezPlan);
+ uo.visit();
+ }
+
+ return comp.getPlanContainer();
+ }
+
+ @Override
+ public void kill() throws BackendException {
+ if (jc == null) return;
+ for (ControlledJob job : jc.getRunningJobs()) {
+ try {
+ job.killJob();
+ break;
+ } catch (Exception e) {
+ throw new BackendException(e);
+ }
+ }
+ }
+
+ @Override
+ public void killJob(String jobID, Configuration conf) throws BackendException {
+ for (ControlledJob job : jc.getRunningJobs()) {
+ if (job.getJobID().equals(jobID)) {
+ try {
+ job.killJob();
+ } catch (Exception e) {
+ throw new BackendException(e);
+ }
+ break;
+ }
+ }
+ log.info("Cannot find job: " + jobID);
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLocalExecType.java Fri May 30 19:07:23 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Properties;
+
+/**
+ * TezExecType is the ExecType for distributed mode in Tez.
+ */
+public class TezLocalExecType extends TezExecType {
+
+ private static final long serialVersionUID = 1L;
+ private static final String[] modes = { "TEZ_LOCAL" };
+
+ @Override
+ public boolean accepts(Properties properties) {
+ String execTypeSpecified =
+ properties.getProperty("exectype", "").toUpperCase();
+ for (String mode : modes) {
+ if (execTypeSpecified.equals(mode)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return true;
+ }
+
+ @Override
+ public String name() {
+ return "TEZ_LOCAL";
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOpPlanVisitor.java Fri May 30 19:07:23 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+
+/**
+ * A visitor for the TezOperPlan class
+ */
+public class TezOpPlanVisitor extends PlanVisitor<TezOperator, TezOperPlan> {
+
+ public TezOpPlanVisitor(TezOperPlan plan, PlanWalker<TezOperator, TezOperPlan> walker) {
+ super(plan, walker);
+ // TODO Auto-generated constructor stub
+ }
+
+ public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+ // TODO Auto-generated method stub
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Fri May 30 19:07:23 2014
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A Plan used to create the plan of Tez operators which can be converted into
+ * the Job Control object. This is necessary to capture the dependencies among
+ * jobs.
+ */
+public class TezOperPlan extends OperatorPlan<TezOperator> {
+
+ private static final long serialVersionUID = 1L;
+
+ private TezResourceManager tezResourceManager;
+
+ private Map<String, Path> extraResources = new HashMap<String, Path>();
+
+ public TezOperPlan(TezResourceManager tezResourceManager) {
+ this.tezResourceManager = tezResourceManager;
+ }
+
+ @Override
+ public String toString() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ TezPrinter printer = new TezPrinter(ps, this);
+ printer.setVerbose(true);
+ try {
+ printer.visit();
+ } catch (VisitorException e) {
+ throw new RuntimeException(e);
+ }
+ return baos.toString();
+ }
+
+ // Add extra plan-specific local resources from the source FS
+ public void addExtraResource(URL url) throws IOException {
+ Path resourcePath = new Path(url.getFile());
+ String resourceName = resourcePath.getName();
+
+ if (!extraResources.containsKey(resourceName)) {
+ Path remoteFsPath = tezResourceManager.addTezResource(url);
+ extraResources.put(resourceName, remoteFsPath);
+ }
+ }
+
+ // Add extra plan-specific local resources already present in the remote FS
+ public void addExtraResource(String resourceName, Path remoteFsPath) throws IOException {
+ if (!extraResources.containsKey(resourceName)) {
+ tezResourceManager.addTezResource(resourceName, remoteFsPath);
+ extraResources.put(resourceName, remoteFsPath);
+ }
+ }
+
+ // Get the plan-specific resources
+ public Map<String, LocalResource> getExtraResources() throws Exception {
+ TezPOStreamVisitor streamVisitor = new TezPOStreamVisitor(this);
+ streamVisitor.visit();
+
+ // In a STREAM add the files specified in SHIP and CACHE
+ // as local resources for the plan.
+ addShipResources(streamVisitor.getShipFiles());
+ addCacheResources(streamVisitor.getCacheFiles());
+
+ return tezResourceManager.getTezResources(extraResources.keySet());
+ }
+
+ // In the statement "SHIP('/home/foo')" we'll map the resource name foo to
+ // the file that has been copied to the staging directory in the remote FS.
+ private void addShipResources(Set<String> fileNames) throws IOException {
+ for (String fileName : fileNames) {
+ fileName = fileName.trim();
+ if (fileName.length() > 0) {
+ URL url = new File(fileName).toURI().toURL();
+ addExtraResource(url);
+ }
+ }
+ }
+
+ // In the statement "CACHE('/input/data.txt#alias.txt')" we'll map the
+ // resource name alias.txt to the actual resource path in the remote FS
+ // at '/input/data.txt'.
+ private void addCacheResources(Set<String> fileNames) throws Exception {
+ for (String fileName : fileNames) {
+ fileName = fileName.trim();
+ if (fileName.length() > 0) {
+ URI resourceURI = new URI(fileName);
+ String fragment = resourceURI.getFragment();
+
+ Path remoteFsPath = new Path(resourceURI.getPath());
+ String resourceName = (fragment != null && fragment.length() > 0) ? fragment : remoteFsPath.getName();
+
+ addExtraResource(resourceName, remoteFsPath);
+ }
+ }
+ }
+
+ @Override
+ public void remove(TezOperator op) {
+ // The remove method does not replace output and input keys in TezInput
+ // and TezOutput. That has to be handled separately.
+ for (OperatorKey opKey : op.outEdges.keySet()) {
+ getOperator(opKey).inEdges.remove(op.getOperatorKey());
+ }
+ for (OperatorKey opKey : op.inEdges.keySet()) {
+ getOperator(opKey).outEdges.remove(op.getOperatorKey());
+ }
+ super.remove(op);
+ }
+
+ @Override
+ public boolean disconnect(TezOperator from, TezOperator to) {
+ from.outEdges.remove(to.getOperatorKey());
+ to.inEdges.remove(from.getOperatorKey());
+ return super.disconnect(from, to);
+ }
+
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Fri May 30 19:07:23 2014
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexGroup;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * An operator model for a Tez job. Acts as a host to the plans that will
+ * execute in Tez vertices.
+ */
+public class TezOperator extends Operator<TezOpPlanVisitor> {
+ private static final long serialVersionUID = 1L;
+
+ // Processor pipeline
+ public PhysicalPlan plan;
+
+ // Descriptors for out-bound edges.
+ public Map<OperatorKey, TezEdgeDescriptor> outEdges;
+ // Descriptors for in-bound edges.
+ public Map<OperatorKey, TezEdgeDescriptor> inEdges;
+
+ public Set<String> UDFs;
+ public Set<PhysicalOperator> scalars;
+
+ // Use AtomicInteger for access by reference and being able to reset in
+ // TezDAGBuilder based on number of input splits.
+ // We just need mutability and not concurrency
+ // This is to ensure that vertexes with 1-1 edge have same parallelism
+ // even when parallelism of source vertex changes.
+ // Can change to int and set to -1 if TEZ-800 gets fixed.
+ private AtomicInteger requestedParallelism = new AtomicInteger(-1);
+
+ // TODO: When constructing Tez vertex, we have to specify how much resource
+ // the vertex will need. So we need to estimate these values while compiling
+ // physical plan into tez plan. For now, we're using default values - 1G mem
+ // and 1 core.
+ //int requestedMemory = 1024;
+ //int requestedCpu = 1;
+
+ // This indicates that this TezOper is a split operator
+ private boolean splitter;
+
+ // This indicates that this TezOper has POSplit as a predecessor.
+ private OperatorKey splitParent = null;
+
+ // Indicates that the plan creation is complete
+ boolean closed = false;
+
+ // Indicate whether we need to split the DAG below the operator
+ // The result is two or more DAG connected DAG inside the same plan container
+ boolean segmentBelow = false;
+
+ // Indicates if this is a limit after a sort
+ boolean limitAfterSort = false;
+
+ //Indicates if this job is an order by job
+ boolean globalSort = false;
+
+ //The sort order of the columns;
+ //asc is true and desc is false
+ boolean[] sortOrder;
+
+ // Last POLimit value in this map reduce operator, needed by LimitAdjuster
+ // to add additional map reduce operator with 1 reducer after this
+ long limit = -1;
+
+ private boolean skewedJoin = false;
+
+ // Flag to indicate if the small input splits need to be combined to form a larger
+ // one in order to reduce the number of mappers. For merge join, both tables
+ // are NOT combinable for correctness.
+ private boolean combineSmallSplits = true;
+
+ // If not null, need to collect sample sent from predecessor
+ TezOperator sampleOperator = null;
+
+ // If true, we will use secondary key sort in the job
+ private boolean useSecondaryKey = false;
+
+ // Types of blocking operators. For now, we only support the following ones.
+ private static enum OPER_FEATURE {
+ NONE,
+ // Indicate if this job is a union job
+ UNION,
+ // Indicate if this job is a merge indexer
+ INDEXER,
+ // Indicate if this job is a sampling job
+ SAMPLER,
+ // Indicate if this job is a group by job
+ GROUPBY,
+ // Indicate if this job is a cogroup job
+ COGROUP,
+ // Indicate if this job is a regular join job
+ HASHJOIN;
+ };
+
+ OPER_FEATURE feature = OPER_FEATURE.NONE;
+
+ private List<OperatorKey> vertexGroupMembers;
+ // For union
+ private VertexGroupInfo vertexGroupInfo;
+ // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
+ private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
+
+ public TezOperator(OperatorKey k) {
+ super(k);
+ plan = new PhysicalPlan();
+ outEdges = Maps.newHashMap();
+ inEdges = Maps.newHashMap();
+ UDFs = Sets.newHashSet();
+ scalars = Sets.newHashSet();
+ }
+
+ public String getProcessorName() {
+ return PigProcessor.class.getName();
+ }
+
+ @Override
+ public void visit(TezOpPlanVisitor v) throws VisitorException {
+ v.visitTezOp(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return true;
+ }
+
+ public int getRequestedParallelism() {
+ return requestedParallelism.get();
+ }
+
+ public void setRequestedParallelism(int requestedParallelism) {
+ this.requestedParallelism.set(requestedParallelism);
+ }
+
+ public void setRequestedParallelismByReference(TezOperator oper) {
+ this.requestedParallelism = oper.requestedParallelism;
+ }
+
+ public OperatorKey getSplitParent() {
+ return splitParent;
+ }
+
+ public void setSplitParent(OperatorKey splitParent) {
+ this.splitParent = splitParent;
+ }
+
+ public void setSplitter(boolean spl) {
+ splitter = spl;
+ }
+
+ public boolean isSplitter() {
+ return splitter;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public void setClosed(boolean closed) {
+ this.closed = closed;
+ }
+
+ public boolean isGroupBy() {
+ return (feature == OPER_FEATURE.GROUPBY);
+ }
+
+ public void markGroupBy() {
+ feature = OPER_FEATURE.GROUPBY;
+ }
+
+ public boolean isCogroup() {
+ return (feature == OPER_FEATURE.COGROUP);
+ }
+
+ public void markCogroup() {
+ feature = OPER_FEATURE.COGROUP;
+ }
+
+ public boolean isRegularJoin() {
+ return (feature == OPER_FEATURE.HASHJOIN);
+ }
+
+ public void markRegularJoin() {
+ feature = OPER_FEATURE.HASHJOIN;
+ }
+
+ public boolean isUnion() {
+ return (feature == OPER_FEATURE.UNION);
+ }
+
+ public void markUnion() {
+ feature = OPER_FEATURE.UNION;
+ }
+
+ public boolean isIndexer() {
+ return (feature == OPER_FEATURE.INDEXER);
+ }
+
+ public void markIndexer() {
+ feature = OPER_FEATURE.INDEXER;
+ }
+
+ public boolean isSampler() {
+ return (feature == OPER_FEATURE.SAMPLER);
+ }
+
+ public void markSampler() {
+ feature = OPER_FEATURE.SAMPLER;
+ }
+
+ public boolean isUseSecondaryKey() {
+ return useSecondaryKey;
+ }
+
+ public void setUseSecondaryKey(boolean useSecondaryKey) {
+ this.useSecondaryKey = useSecondaryKey;
+ }
+
+ public List<OperatorKey> getUnionPredecessors() {
+ return vertexGroupMembers;
+ }
+
+ public List<OperatorKey> getVertexGroupMembers() {
+ return vertexGroupMembers;
+ }
+
+ public void addUnionPredecessor(OperatorKey unionPredecessor) {
+ if (vertexGroupMembers == null) {
+ vertexGroupMembers = new ArrayList<OperatorKey>();
+ }
+ this.vertexGroupMembers.add(unionPredecessor);
+ }
+
+ public void setVertexGroupMembers(List<OperatorKey> vertexGroupMembers) {
+ this.vertexGroupMembers = vertexGroupMembers;
+ }
+
+ // Union is the only operator that uses alias vertex (VertexGroup) now. But
+ // more operators could be added to the list in the future.
+ public boolean isVertexGroup() {
+ return vertexGroupInfo != null;
+ }
+
+ public VertexGroupInfo getVertexGroupInfo() {
+ return vertexGroupInfo;
+ }
+
+ public void setVertexGroupInfo(VertexGroupInfo vertexGroup) {
+ this.vertexGroupInfo = vertexGroup;
+ }
+
+ public void addVertexGroupStore(OperatorKey storeKey, OperatorKey vertexGroupKey) {
+ if (this.vertexGroupStores == null) {
+ this.vertexGroupStores = new HashMap<OperatorKey, OperatorKey>();
+ }
+ this.vertexGroupStores.put(storeKey, vertexGroupKey);
+ }
+
+ public Map<OperatorKey, OperatorKey> getVertexGroupStores() {
+ return this.vertexGroupStores;
+ }
+
+ @Override
+ public String name() {
+ String udfStr = getUDFsAsStr();
+ StringBuilder sb = new StringBuilder("Tez" + "(" + requestedParallelism +
+ (udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString());
+ return sb.toString();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(name() + ":\n");
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ if (!plan.isEmpty()) {
+ plan.explain(baos);
+ String mp = new String(baos.toByteArray());
+ sb.append(shiftStringByTabs(mp, "| "));
+ } else {
+ sb.append("Plan Empty");
+ }
+ return sb.toString();
+ }
+
+ private String getUDFsAsStr() {
+ StringBuilder sb = new StringBuilder();
+ if(UDFs!=null && UDFs.size()>0){
+ for (String str : UDFs) {
+ sb.append(str.substring(str.lastIndexOf('.')+1));
+ sb.append(',');
+ }
+ sb.deleteCharAt(sb.length()-1);
+ }
+ return sb.toString();
+ }
+
+ private String shiftStringByTabs(String DFStr, String tab) {
+ StringBuilder sb = new StringBuilder();
+ String[] spl = DFStr.split("\n");
+ for (int i = 0; i < spl.length; i++) {
+ sb.append(tab);
+ sb.append(spl[i]);
+ sb.append("\n");
+ }
+ sb.delete(sb.length() - "\n".length(), sb.length());
+ return sb.toString();
+ }
+
+ public boolean needSegmentBelow() {
+ return segmentBelow;
+ }
+
+ public void setSortOrder(boolean[] sortOrder) {
+ if(null == sortOrder) return;
+ this.sortOrder = new boolean[sortOrder.length];
+ for(int i = 0; i < sortOrder.length; ++i) {
+ this.sortOrder[i] = sortOrder[i];
+ }
+ }
+
+ public boolean[] getSortOrder() {
+ return sortOrder;
+ }
+
+ public void setGlobalSort(boolean globalSort) {
+ this.globalSort = globalSort;
+ }
+
+ public boolean isGlobalSort() {
+ return globalSort;
+ }
+
+ public void setLimitAfterSort(boolean limitAfterSort) {
+ this.limitAfterSort = limitAfterSort;
+ }
+
+ public boolean isLimitAfterSort() {
+ return limitAfterSort;
+ }
+
+ public void setSkewedJoin(boolean skewedJoin) {
+ this.skewedJoin = skewedJoin;
+ }
+
+ public boolean isSkewedJoin() {
+ return skewedJoin;
+ }
+
+ protected void noCombineSmallSplits() {
+ combineSmallSplits = false;
+ }
+
+ public boolean combineSmallSplits() {
+ return combineSmallSplits;
+ }
+
+ public static class VertexGroupInfo {
+
+ private List<OperatorKey> inputKeys;
+ private String outputKey;
+ private POStore store;
+ private OutputDescriptor storeOutDescriptor;
+ private VertexGroup vertexGroup;
+
+ public VertexGroupInfo() {
+ }
+
+ public VertexGroupInfo(POStore store) {
+ this.store = store;
+ }
+
+ public List<OperatorKey> getInputs() {
+ return inputKeys;
+ }
+
+ public void addInput(OperatorKey input) {
+ if (inputKeys == null) {
+ inputKeys = new ArrayList<OperatorKey>();
+ }
+ this.inputKeys.add(input);
+ }
+
+ public boolean removeInput(OperatorKey input) {
+ return this.inputKeys.remove(input);
+ }
+
+ public String getOutput() {
+ return outputKey;
+ }
+
+ public void setOutput(String output) {
+ this.outputKey = output;
+ }
+
+ public POStore getStore() {
+ return store;
+ }
+
+ public OutputDescriptor getStoreOutputDescriptor() {
+ return storeOutDescriptor;
+ }
+
+ public void setStoreOutputDescriptor(OutputDescriptor storeOutDescriptor) {
+ this.storeOutDescriptor = storeOutDescriptor;
+ }
+
+ public VertexGroup getVertexGroup() {
+ return vertexGroup;
+ }
+
+ public void setVertexGroup(VertexGroup vertexGroup) {
+ this.vertexGroup = vertexGroup;
+ }
+
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOutput.java Fri May 30 19:07:23 2014
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.LogicalOutput;
+
+/**
+ * This interface is implemented by PhysicalOperators that can have Tez outputs
+ * attached directly to the operator.
+ */
+
+public interface TezOutput {
+
+ public String[] getTezOutputs();
+
+ public void replaceOutput(String oldOutputKey, String newOutputKey);
+
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException;
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Fri May 30 19:07:23 2014
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * A port of the POPackageAnnotator from MR to Tez.
+ *
+ */
+public class TezPOPackageAnnotator extends TezOpPlanVisitor {
+
+ /**
+ * @param plan Tez plan to visit
+ */
+ public TezPOPackageAnnotator(TezOperPlan plan) {
+ super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if(!tezOp.plan.isEmpty()) {
+ PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(tezOp.plan);
+ pkgDiscoverer.visit();
+ POPackage pkg = pkgDiscoverer.getPkg();
+ if(pkg != null) {
+ handlePackage(tezOp, pkg);
+ }
+ }
+ }
+
+ private void handlePackage(TezOperator pkgTezOp, POPackage pkg) throws VisitorException {
+ // the LocalRearrange(s) must be in the plan of a predecessor tez op
+ int lrFound = 0;
+ List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
+ for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
+ TezOperator predTezOp = it.next();
+ if (predTezOp.isVertexGroup()) {
+ // Just get one of the inputs to vertex group
+ predTezOp = getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
+ }
+ lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
+ if(lrFound == pkg.getNumInps()) {
+ break;
+ }
+ }
+
+ if(lrFound != pkg.getNumInps()) {
+ int errCode = 2086;
+ String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ private int patchPackage(TezOperator predTezOp, TezOperator pkgTezOp, POPackage pkg) throws VisitorException {
+ LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(predTezOp.plan, pkgTezOp, pkg);
+ lrDiscoverer.visit();
+ // let our caller know if we managed to patch
+ // the package
+ return lrDiscoverer.getLoRearrangeFound();
+ }
+
+ /**
+ * Simple visitor of the "Reduce" physical plan
+ * which will get a reference to the POPacakge
+ * present in the plan
+ */
+ static class PackageDiscoverer extends PhyPlanVisitor {
+
+ private POPackage pkg;
+
+ public PackageDiscoverer(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitPackage(POPackage pkg) throws VisitorException {
+ this.pkg = pkg;
+ };
+
+ /**
+ * @return the pkg
+ */
+ public POPackage getPkg() {
+ return pkg;
+ }
+
+ }
+
+ /**
+ * Physical Plan visitor which tries to get the
+ * LocalRearrange(s) present in the plan (if any) and
+ * annotate the POPackage given to it with the information
+ * in the LocalRearrange (regarding columns in the "value"
+ * present in the "key")
+ */
+ static class LoRearrangeDiscoverer extends PhyPlanVisitor {
+
+ private int loRearrangeFound = 0;
+ private TezOperator pkgTezOp;
+ private POPackage pkg;
+
+ public LoRearrangeDiscoverer(PhysicalPlan plan, TezOperator pkgTezOp, POPackage pkg) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ this.pkgTezOp = pkgTezOp;
+ this.pkg = pkg;
+ }
+
+ @Override
+ public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
+ if (!lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString())) {
+ return;
+ }
+ loRearrangeFound++;
+ Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+ if (pkg.getPkgr() instanceof LitePackager) {
+ if(lrearrange.getIndex() != 0) {
+ // Throw some exception here
+ throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
+ }
+ }
+
+ // annotate the package with information from the LORearrange
+ // update the keyInfo information if already present in the POPackage
+ keyInfo = pkg.getPkgr().getKeyInfo();
+ if(keyInfo == null)
+ keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+ if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+ // something is wrong - we should not be getting key info
+ // for the same index from two different Local Rearranges
+ int errCode = 2087;
+ String msg = "Unexpected problem during optimization." +
+ " Found index:" + lrearrange.getIndex() +
+ " in multiple LocalRearrange operators.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+
+ }
+ keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+ new Pair<Boolean, Map<Integer, Integer>>(
+ lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+ }
+
+ /**
+ * @return the loRearrangeFound
+ */
+ public int getLoRearrangeFound() {
+ return loRearrangeFound;
+ }
+
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java Fri May 30 19:07:23 2014
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+public class TezPOStreamVisitor extends TezOpPlanVisitor {
+
+ private Set<String> cacheFiles = new HashSet<String>();
+ private Set<String> shipFiles = new HashSet<String>();
+
+ public TezPOStreamVisitor(TezOperPlan plan) {
+ super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if(!tezOp.plan.isEmpty()) {
+ StreamFileVisitor streamFileVisitor = new StreamFileVisitor(tezOp.plan);
+ streamFileVisitor.visit();
+ }
+ }
+
+ class StreamFileVisitor extends PhyPlanVisitor {
+
+ public StreamFileVisitor(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ public void visitStream(POStream stream) throws VisitorException {
+ StreamingCommand command = stream.getCommand();
+ cacheFiles.addAll(command.getCacheSpecs());
+ shipFiles.addAll(command.getShipSpecs());
+ }
+ }
+
+ public Set<String> getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public Set<String> getShipFiles() {
+ return shipFiles;
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Fri May 30 19:07:23 2014
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.JarManager;
+
+public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
+ private static final long serialVersionUID = 1L;
+ private TezResourceManager tezResourceManager;
+ private PigContext pigContext;
+
+ public TezPlanContainer(PigContext pigContext, TezResourceManager tezResourceManager) {
+ this.pigContext = pigContext;
+ this.tezResourceManager = tezResourceManager;
+ }
+
+ // Add the Pig jar and the UDF jars as AM resources (all DAG's in the planContainer
+ // will use them for simplicity). This differs from MR Pig, where they are added to
+ // the job jar.
+ public Map<String, LocalResource> getLocalResources() throws Exception {
+ Set<URL> jarLists = new HashSet<URL>();
+
+ jarLists.add(tezResourceManager.getBootStrapJar());
+
+ // In MR Pig the extra jars and script jars get put in Distributed Cache, but
+ // in Tez we'll add them as local resources.
+ for (URL jarUrl : pigContext.extraJars) {
+ jarLists.add(jarUrl);
+ }
+
+ for (String jarFile : pigContext.scriptJars) {
+ jarLists.add(new File(jarFile).toURI().toURL());
+ }
+
+ // Script files for non-Java UDF's are added to the Job.jar by the JarManager class,
+ // except for Groovy files, which need to be explicitly added as local resources due
+ // to the GroovyScriptEngine (see JarManager.java for comments).
+ for (Map.Entry<String, File> scriptFile : pigContext.getScriptFiles().entrySet()) {
+ if (scriptFile.getKey().endsWith(".groovy")) {
+ jarLists.add(scriptFile.getValue().toURI().toURL());
+ }
+ }
+
+ TezPlanContainerUDFCollector tezPlanContainerUDFCollector = new TezPlanContainerUDFCollector(this);
+ tezPlanContainerUDFCollector.visit();
+ Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
+
+ for (String func : udfs) {
+ Class<?> clazz = pigContext.getClassForAlias(func);
+ if (clazz != null) {
+ String jarName = JarManager.findContainingJar(clazz);
+ if (jarName == null) {
+ // It's possible that clazz is not registered by an external
+ // jar. For eg, in unit tests, UDFs that are not in default
+ // packages may be used. In that case, we should skip it to
+ // avoid NPE.
+ continue;
+ }
+ URL jarUrl = new File(jarName).toURI().toURL();
+ jarLists.add(jarUrl);
+ }
+ }
+
+ // Streaming UDF's are not working under Hadoop 2 (PIG-3478), so don't bother adding
+ // resources for them yet.
+ // if ("StreamingUDF".equals(clazz.getSimpleName())) {
+ // for (String fileName : StreamingUDF.getResourcesForJar()) {
+ // jarLists.add(new File(fileName).toURI().toURL());
+ // }
+ // }
+
+ return tezResourceManager.addTezResources(jarLists);
+ }
+
+ public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
+ synchronized(this) {
+ while (getRoots()!=null && !getRoots().isEmpty()) {
+ TezPlanContainerNode currentPlan = null;
+ for (TezPlanContainerNode plan : getRoots()) {
+ if (!processedPlans.contains(plan.getNode())) {
+ currentPlan = plan;
+ break;
+ }
+ }
+ if (currentPlan!=null) {
+ return currentPlan.getNode();
+ } else {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public void updatePlan(TezOperPlan plan, boolean succ) {
+ String scope = getRoots().get(0).getOperatorKey().getScope();
+ TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)), plan);
+ synchronized(this) {
+ if (succ) {
+ remove(tezPlanContainerNode);
+ } else {
+ // job fail
+ trimBelow(tezPlanContainerNode);
+ remove(tezPlanContainerNode);
+ }
+ notify();
+ }
+ }
+
+ public void split(TezPlanContainerNode planNode) throws PlanException {
+ TezOperPlan tezOperPlan = planNode.getNode();
+ TezOperator operToSegment = null;
+ List<TezOperator> succs = new ArrayList<TezOperator>();
+ for (TezOperator tezOper : tezOperPlan) {
+ if (tezOper.needSegmentBelow() && tezOperPlan.getSuccessors(tezOper)!=null) {
+ operToSegment = tezOper;
+ succs.addAll(tezOperPlan.getSuccessors(tezOper));
+ break;
+ }
+ }
+ if (operToSegment != null) {
+ for (TezOperator succ : succs) {
+ tezOperPlan.disconnect(operToSegment, succ);
+ TezOperPlan newOperPlan = new TezOperPlan(tezResourceManager);
+ List<TezPlanContainerNode> containerSuccs = new ArrayList<TezPlanContainerNode>();
+ if (getSuccessors(planNode)!=null) {
+ containerSuccs.addAll(getSuccessors(planNode));
+ }
+ tezOperPlan.moveTree(succ, newOperPlan);
+ String scope = operToSegment.getOperatorKey().getScope();
+ TezPlanContainerNode newPlanNode = new TezPlanContainerNode(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)), newOperPlan);
+ add(newPlanNode);
+ for (TezPlanContainerNode containerNodeSucc : containerSuccs) {
+ disconnect(planNode, containerNodeSucc);
+ connect(newPlanNode, containerNodeSucc);
+ }
+ connect(planNode, newPlanNode);
+ split(newPlanNode);
+ }
+ split(planNode);
+ }
+ }
+
+ @Override
+ public String toString() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, this);
+ printer.setVerbose(true);
+ try {
+ printer.visit();
+ } catch (VisitorException e) {
+ throw new RuntimeException(e);
+ }
+ return baos.toString();
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java Fri May 30 19:07:23 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor>{
+ private static final long serialVersionUID = 1L;
+ TezOperPlan node;
+ public TezPlanContainerNode(OperatorKey k, TezOperPlan node) {
+ super(k);
+ this.node = node;
+ }
+
+ @Override
+ public void visit(TezPlanContainerVisitor v) throws VisitorException {
+ v.visitTezPlanContainerNode(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return true;
+ }
+
+ @Override
+ public String name() {
+ return "DAG " + mKey;
+ }
+
+ public TezOperPlan getNode() {
+ return node;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o != null && o instanceof TezPlanContainerNode) {
+ return ((TezPlanContainerNode)o).getNode().equals(getNode());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getNode().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getNode().toString();
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java Fri May 30 19:07:23 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPrinter.TezGraphPrinter;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerPrinter extends TezPlanContainerVisitor {
+ private PrintStream mStream = null;
+ private boolean isVerbose = true;
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan tez plan to print
+ */
+ public TezPlanContainerPrinter(PrintStream ps, TezPlanContainer planContainer) {
+ super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+ mStream = ps;
+ mStream.println("#--------------------------------------------------");
+ mStream.println("# There are " + planContainer.size() + " DAGs in the session");
+ mStream.println("#--------------------------------------------------");
+ }
+
+ public void setVerbose(boolean verbose) {
+ isVerbose = verbose;
+ }
+
+ @Override
+ public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+ mStream.println("#--------------------------------------------------");
+ mStream.println("# TEZ DAG plan: " + tezPlanContainerNode.getOperatorKey());
+ mStream.println("#--------------------------------------------------");
+ TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getNode());
+ graphPrinter.visit();
+ mStream.print(graphPrinter.toString());
+ TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getNode());
+ printer.setVerbose(isVerbose);
+ printer.visit();
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java Fri May 30 19:07:23 2014
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+import com.google.common.collect.Sets;
+
+public class TezPlanContainerUDFCollector extends TezPlanContainerVisitor {
+ private Set<String> udfs = Sets.newHashSet();
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan tez plan to print
+ */
+ public TezPlanContainerUDFCollector(TezPlanContainer planContainer) {
+ super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+ }
+
+ @Override
+ public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+ Iterator<TezOperator> it = tezPlanContainerNode.getNode().iterator();
+ while (it.hasNext()) {
+ udfs.addAll(it.next().UDFs);
+ }
+ }
+
+ public Set<String> getUdfs() {
+ return udfs;
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java Fri May 30 19:07:23 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerVisitor extends PlanVisitor<TezPlanContainerNode, TezPlanContainer> {
+ public TezPlanContainerVisitor(TezPlanContainer plan, PlanWalker<TezPlanContainerNode, TezPlanContainer> walker) {
+ super(plan, walker);
+ // TODO Auto-generated constructor stub
+ }
+
+ public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+ // TODO Auto-generated method stub
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Fri May 30 19:07:23 2014
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator.VertexGroupInfo;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to print out the Tez plan.
+ */
+public class TezPrinter extends TezOpPlanVisitor {
+
+ private PrintStream mStream = null;
+ private boolean isVerbose = true;
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan tez plan to print
+ */
+ public TezPrinter(PrintStream ps, TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ mStream = ps;
+ }
+
+ public void setVerbose(boolean verbose) {
+ isVerbose = verbose;
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOper) throws VisitorException {
+ if (tezOper.isVertexGroup()) {
+ VertexGroupInfo info = tezOper.getVertexGroupInfo();
+ mStream.println("Tez vertex group "
+ + tezOper.getOperatorKey().toString() + "\t<-\t "
+ + info.getInputs() + "\t->\t " + info.getOutput());
+ mStream.println("# No plan on vertex group");
+ } else {
+ mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
+ }
+ if (tezOper.inEdges.size() > 0) {
+ for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) {
+ //TODO: Print other edge properties like custom partitioner
+ if (!inEdge.getValue().combinePlan.isEmpty()) {
+ mStream.println("# Combine plan on edge <" + inEdge.getKey() + ">");
+ PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
+ new PlanPrinter<PhysicalOperator, PhysicalPlan>(
+ inEdge.getValue().combinePlan, mStream);
+ printer.setVerbose(isVerbose);
+ printer.visit();
+ mStream.println();
+ }
+ }
+ }
+ if (tezOper.plan != null && tezOper.plan.size() > 0) {
+ mStream.println("# Plan on vertex");
+ PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
+ new PlanPrinter<PhysicalOperator, PhysicalPlan>(tezOper.plan, mStream);
+ printer.setVerbose(isVerbose);
+ printer.visit();
+ mStream.println();
+ }
+ }
+
+ /**
+ * This class prints the Tez Vertex Graph
+ */
+ public static class TezGraphPrinter extends TezOpPlanVisitor {
+
+ StringBuffer buf;
+
+ public TezGraphPrinter(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ buf = new StringBuffer();
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOper) throws VisitorException {
+ buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
+ List<TezOperator> succs = mPlan.getSuccessors(tezOper);
+ if (succs != null) {
+ buf.append("\t->\t");
+ for (TezOperator op : succs) {
+ buf.append("Tez vertex " + op.getOperatorKey().toString()).append(",");
+ }
+ }
+ buf.append("\n");
+ }
+
+ @Override
+ public String toString() {
+ buf.append("\n");
+ return buf.toString();
+ }
+ }
+}