You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/11 20:28:18 UTC
svn commit: r1540818 - in /pig/branches/tez: ivy/
shims/src/hadoop23/org/apache/pig/backend/hadoop23/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/impl/plan/ src/org/apache/pig/impl/util/
test/org/apache/pig/test/data/Golde...
Author: cheolsoo
Date: Mon Nov 11 19:28:17 2013
New Revision: 1540818
URL: http://svn.apache.org/r1540818
Log:
PIG-3539: Pig should be able to submit multiple DAG (daijy via cheolsoo)
Added:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
Modified:
pig/branches/tez/ivy/libraries.properties
pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java
pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
Modified: pig/branches/tez/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/ivy/libraries.properties?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/ivy/libraries.properties (original)
+++ pig/branches/tez/ivy/libraries.properties Mon Nov 11 19:28:17 2013
@@ -19,7 +19,7 @@ apacherat.version=0.8
automaton.version=1.11-8
avro.version=1.7.4
commons-beanutils.version=1.7.0
-commons-cli.version=1.0
+commons-cli.version=1.2
commons-codec.version=1.4
commons-io.version=2.3
commons-el.version=1.0
Modified: pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java (original)
+++ pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop23/PigJobControl.java Mon Nov 11 19:28:17 2013
@@ -77,7 +77,7 @@ public class PigJobControl extends JobCo
}
}
- private int timeToSleep;
+ protected int timeToSleep;
/**
* Construct a job control for a group of jobs.
@@ -174,6 +174,9 @@ public class PigJobControl extends JobCo
synchronized(this) {
Iterator<ControlledJob> it = getJobs(jobsInProgress).iterator();
+ if (!it.hasNext()) {
+ stop();
+ }
while(it.hasNext()) {
ControlledJob j = it.next();
log.debug("Checking state of job "+j);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Nov 11 19:28:17 2013
@@ -120,11 +120,8 @@ public class TezCompiler extends PhyPlan
private int fileConcatenationThreshold = 100;
private boolean optimisticFileConcatenation = false;
- public TezCompiler(PhysicalPlan plan) throws TezCompilerException {
- this(plan, null);
- }
-
- public TezCompiler(PhysicalPlan plan, PigContext pigContext) throws TezCompilerException {
+ public TezCompiler(PhysicalPlan plan, PigContext pigContext)
+ throws TezCompilerException {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.plan = plan;
this.pigContext = pigContext;
@@ -154,6 +151,15 @@ public class TezCompiler extends PhyPlan
public TezOperPlan getTezPlan() {
return tezPlan;
}
+
+ // Segment a single DAG into a DAG graph
+ public TezPlanContainer getPlanContainer() throws PlanException {
+ TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
+ TezPlanContainerNode node = new TezPlanContainerNode(new OperatorKey(scope, nig.getNextNodeId(scope)), tezPlan);
+ tezPlanContainer.add(node);
+ tezPlanContainer.split(node);
+ return tezPlanContainer;
+ }
/**
* The front-end method that the user calls to compile the plan. Assumes
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Mon Nov 11 19:28:17 2013
@@ -22,11 +22,10 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -39,24 +38,19 @@ import org.apache.tez.dag.api.client.DAG
*/
public class TezJob extends ControlledJob {
private static final Log log = LogFactory.getLog(TezJob.class);
- private AMConfiguration amConfig;
- private ApplicationId appId;
- private TezClient tezClient;
- private DAGClient dagClient;
private DAGStatus dagStatus;
+ private Configuration conf;
private DAG dag;
+ private DAGClient dagClient;
+ private Map<String, LocalResource> requestAMResources;
+ private TezSession tezSession;
- public TezJob(TezConfiguration conf, ApplicationId appId, DAG dag,
- Map<String, LocalResource> localResources) throws IOException {
+ public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
+ throws IOException {
super(conf);
- this.amConfig = new AMConfiguration(null, null, localResources, conf, null);
- this.tezClient = new TezClient(conf);
- this.appId = appId;
+ this.conf = conf;
this.dag = dag;
- }
-
- public ApplicationId getAppId() {
- return appId;
+ this.requestAMResources = requestAMResources;
}
public DAG getDag() {
@@ -70,17 +64,20 @@ public class TezJob extends ControlledJo
@Override
public void submit() {
try {
- log.info("Submitting DAG - Application id: " + appId);
- dagClient = tezClient.submitDAGApplication(appId, dag, amConfig);
+ tezSession = TezSessionManager.getSession(conf, requestAMResources);
+ log.info("Submitting DAG - Application id: " + tezSession.getApplicationId());
+ dagClient = tezSession.submitDAG(dag);
} catch (Exception e) {
- log.info("Cannot submit DAG - Application id: " + appId, e);
+ if (tezSession!=null) {
+ log.info("Cannot submit DAG - Application id: " + tezSession.getApplicationId(), e);
+ }
setJobState(ControlledJob.State.FAILED);
return;
}
while (true) {
try {
- dagStatus = dagClient.getDAGStatus();
+ dagStatus = dagClient.getDAGStatus(null);
} catch (Exception e) {
log.info("Cannot retrieve DAG status", e);
setJobState(ControlledJob.State.FAILED);
@@ -96,6 +93,9 @@ public class TezJob extends ControlledJo
sb.append("\n");
}
setMessage(sb.toString());
+ TezSessionManager.freeSession(tezSession);
+ tezSession = null;
+ dagClient = null;
break;
}
@@ -111,8 +111,9 @@ public class TezJob extends ControlledJo
public void killJob() throws IOException {
try {
dagClient.tryKillDAG();
+ tezSession.stop();
} catch (TezException e) {
- throw new IOException("Cannot kill DAG - Application Id: " + appId, e);
+ throw new IOException("Cannot kill DAG - Application Id: " + tezSession.getApplicationId(), e);
}
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControl.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.backend.hadoop23.PigJobControl;
+import org.apache.pig.tools.pigstats.tez.TezStats;
+
+public class TezJobControl extends PigJobControl {
+
+ private TezJobNotifier notifier = null;
+ private TezStats stats = null;
+
+ public TezJobControl(String groupName, int timeToSleep) {
+ super(groupName, timeToSleep);
+ }
+
+ public void setJobNotifier(TezJobNotifier notifier) {
+ this.notifier = notifier;
+ }
+
+ public void setTezStats(TezStats stats) {
+ this.stats = stats;
+ }
+
+ @Override
+ public void run() {
+ try {
+ super.run();
+ try {
+ // Wait for the only jobs finished.
+ while (!allFinished()) {
+ try {
+ Thread.sleep(timeToSleep);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ stop();
+ if (stats!=null) {
+ stats.accumulateStats(this);
+ }
+ if (notifier!=null) {
+ notifier.complete(this);
+ }
+ }
+ } catch (Exception e) {
+ // should not happen
+ throw new RuntimeException(e);
+ }
+ }
+
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java Mon Nov 11 19:28:17 2013
@@ -17,39 +17,20 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.net.URL;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.JarManager;
-import org.apache.pig.impl.util.Utils;
-import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
-import org.python.google.common.collect.Sets;
-
-import com.google.common.collect.Maps;
/**
* This is compiler class that takes a TezOperPlan and converts it into a
@@ -58,28 +39,27 @@ import com.google.common.collect.Maps;
*/
public class TezJobControlCompiler {
private static final Log log = LogFactory.getLog(TezJobControlCompiler.class);
- private static final String DAG_JAR_NAME = "dag_job.jar";
private PigContext pigContext;
- private TezClient tezClient;
private TezConfiguration tezConf;
public TezJobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
this.pigContext = pigContext;
this.tezConf = new TezConfiguration(conf);
- this.tezClient = new TezClient(tezConf);
}
public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
throws IOException, YarnException {
- String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, DAG_JAR_NAME);
+ String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
DAG tezDag = new DAG(jobName);
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
dagBuilder.visit();
return tezDag;
}
- public JobControl compile(TezOperPlan tezPlan, String grpName) throws JobCreationException {
+ public TezJobControl compile(TezOperPlan tezPlan, String grpName,
+ Configuration conf, TezPlanContainer planContainer)
+ throws JobCreationException {
int timeToSleep;
String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "5000";
String pigJobControlSleep = tezConf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
@@ -96,14 +76,12 @@ public class TezJobControlCompiler {
" should be a time in ms. default=" + defaultPigJobControlSleep, e);
}
- JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
+ TezJobControl jobCtrl = new TezJobControl(grpName, timeToSleep);
try {
- // TODO: for now, we assume that the whole Tez plan can be always
- // packaged into a single Tez job. But that may be not always true.
- tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
- TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT);
- TezJob job = getJob(tezPlan);
+ // A single Tez job always pack only 1 Tez plan. We will track
+ // Tez job asynchronously to exploit parallel execution opportunities.
+ TezJob job = getJob(tezPlan, conf, planContainer);
jobCtrl.addJob(job);
} catch (JobCreationException jce) {
throw jce;
@@ -116,59 +94,14 @@ public class TezJobControlCompiler {
return jobCtrl;
}
- private TezJob getJob(TezOperPlan tezPlan) throws JobCreationException {
+ private TezJob getJob(TezOperPlan tezPlan, Configuration conf, TezPlanContainer planContainer)
+ throws JobCreationException {
try {
- ApplicationId appId = tezClient.createApplication();
- Map<String, LocalResource> localResources = Maps.newHashMap();
- FileSystem remoteFs = FileSystem.get(tezConf);
- Path remoteStagingDir = remoteFs.makeQualified(new Path(
- tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR), appId.toString()));
-
- for (URL extraJar : pigContext.extraJars) {
- Path pathInHDFS = Utils.shipToHDFS(pigContext, tezConf, extraJar);
- FileStatus fstat = remoteFs.getFileStatus(pathInHDFS);
- LocalResource extraJarRsrc = LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
- LocalResourceType.FILE,
- LocalResourceVisibility.APPLICATION,
- fstat.getLen(),
- fstat.getModificationTime());
- localResources.put(pathInHDFS.getName(), extraJarRsrc);
- pigContext.skipJars.add(extraJar.getPath());
- }
-
- // Collect all the UDFs registered in tezPlan
- Set<String> udfs = Sets.newHashSet();
- Iterator<TezOperator> it = tezPlan.iterator();
- while (it.hasNext()) {
- udfs.addAll(it.next().UDFs);
- }
-
- // Create the jar of all functions and classes required
- File jobJar = File.createTempFile("Job", ".jar");
- jobJar.deleteOnExit();
- FileOutputStream fos = new FileOutputStream(jobJar);
- try {
- JarManager.createJar(fos, udfs, pigContext);
- } catch (ClassNotFoundException e) {
- throw new JobCreationException("UDF is not found in classpath: ", e);
- }
-
- // Ship the job jar to the staging directory on hdfs
- Path remoteJarPath = remoteFs.makeQualified(new Path(remoteStagingDir, DAG_JAR_NAME));
- remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
- FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);
-
- LocalResource dagJarLocalRsrc = LocalResource.newInstance(
- ConverterUtils.getYarnUrlFromPath(remoteJarPath),
- LocalResourceType.FILE,
- LocalResourceVisibility.APPLICATION,
- jarFileStatus.getLen(),
- jarFileStatus.getModificationTime());
- localResources.put(DAG_JAR_NAME, dagJarLocalRsrc);
-
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ localResources.putAll(planContainer.getLocalResources());
+ localResources.putAll(tezPlan.getLocalExtraResources());
DAG tezDag = buildDAG(tezPlan, localResources);
- return new TezJob(tezConf, appId, tezDag, localResources);
+ return new TezJob(tezConf, tezDag, planContainer.getLocalResources());
} catch (Exception e) {
int errCode = 2017;
String msg = "Internal error creating job configuration.";
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobNotifier.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
+
+public class TezJobNotifier {
+ private TezPlanContainer planContainer;
+ private TezOperPlan plan;
+ public TezJobNotifier(TezPlanContainer planContainer, TezOperPlan plan) {
+ this.planContainer = planContainer;
+ this.plan = plan;
+ }
+
+ public void complete(JobControl jobControl) {
+ boolean succ = true;
+ if (jobControl.getFailedJobList().size()!=0) {
+ succ = false;
+ }
+
+ planContainer.updatePlan(plan, succ);
+ }
+}
+
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Nov 11 19:28:17 2013
@@ -19,11 +19,15 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.PrintStream;
+import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.pig.backend.BackendException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -35,82 +39,86 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.tez.TezStats;
-
+import org.apache.tez.dag.api.TezConfiguration;
/**
* Main class that launches pig for Tez
*/
public class TezLauncher extends Launcher {
+
private static final Log log = LogFactory.getLog(TezLauncher.class);
-
+
@Override
public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
+ Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+ FileSystem fs = FileSystem.get(conf);
+ Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+
+ TezResourceManager.initialize(stagingDir, pc, conf);
+
+ conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+
+ List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
+
TezStats tezStats = new TezStats(pc);
PigStats.start(tezStats);
-
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
TezJobControlCompiler jcc = new TezJobControlCompiler(pc, conf);
- TezOperPlan tezPlan = compile(php, pc);
-
- TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
- pkgAnnotator.visit();
-
- tezStats.initialize(tezPlan);
-
- jc = jcc.compile(tezPlan, grpName);
-
- // Initially, all jobs are in wait state.
- List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
- log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
-
- // TODO: MapReduceLauncher does a couple of things here. For example,
- // notify PPNL of job submission, update PigStas, etc. We will worry
- // about them later.
-
- // Set the thread UDFContext so registered classes are available.
- final UDFContext udfContext = UDFContext.getUDFContext();
- Thread jcThread = new Thread(jc, "JobControl") {
- @Override
- public void run() {
- UDFContext.setUdfContext(udfContext.clone());
- super.run();
- }
- };
-
- JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
- jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
- jcThread.setContextClassLoader(PigContext.getClassLoader());
-
- // Mark the times that the jobs were submitted so it's reflected in job
- // history props
- long scriptSubmittedTimestamp = System.currentTimeMillis();
- for (ControlledJob job : jobsWithoutIds) {
- // Job.getConfiguration returns the shared configuration object
- Configuration jobConf = job.getJob().getConfiguration();
- jobConf.set("pig.script.submitted.timestamp",
- Long.toString(scriptSubmittedTimestamp));
- jobConf.set("pig.job.submitted.timestamp",
- Long.toString(System.currentTimeMillis()));
- }
-
- // All the setup done, now lets launch the jobs. DAG is submitted to
- // YARN cluster by TezJob.submit().
- jcThread.start();
-
- try {
- // Wait for all the jobs are finished.
- while (!jc.allFinished()) {
- try {
- jcThread.join(500);
- } catch (InterruptedException e) {
- // Do nothing
+ TezPlanContainer tezPlanContainer = compile(php, pc);
+
+ TezOperPlan tezPlan;
+
+ while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans))!=null) {
+ processedPlans.add(tezPlan);
+
+ TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
+ pkgAnnotator.visit();
+
+ tezStats.initialize(tezPlan);
+
+ jc = jcc.compile(tezPlan, grpName, conf, tezPlanContainer);
+ TezJobNotifier notifier = new TezJobNotifier(tezPlanContainer, tezPlan);
+ ((TezJobControl)jc).setJobNotifier(notifier);
+ ((TezJobControl)jc).setTezStats(tezStats);
+
+ // Initially, all jobs are in wait state.
+ List<ControlledJob> jobsWithoutIds = jc.getWaitingJobList();
+ log.info(jobsWithoutIds.size() + " tez job(s) waiting for submission.");
+
+ // TODO: MapReduceLauncher does a couple of things here. For example,
+ // notify PPNL of job submission, update PigStas, etc. We will worry
+ // about them later.
+
+ // Set the thread UDFContext so registered classes are available.
+ final UDFContext udfContext = UDFContext.getUDFContext();
+ Thread jcThread = new Thread(jc, "JobControl") {
+ @Override
+ public void run() {
+ UDFContext.setUdfContext(udfContext.clone());
+ super.run();
}
+ };
+
+ JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
+ jcThread.setUncaughtExceptionHandler(jctExceptionHandler);
+ jcThread.setContextClassLoader(PigContext.getClassLoader());
+
+ // Mark the times that the jobs were submitted so it's reflected in job
+ // history props
+ long scriptSubmittedTimestamp = System.currentTimeMillis();
+ for (ControlledJob job : jobsWithoutIds) {
+ // Job.getConfiguration returns the shared configuration object
+ Configuration jobConf = job.getJob().getConfiguration();
+ jobConf.set("pig.script.submitted.timestamp",
+ Long.toString(scriptSubmittedTimestamp));
+ jobConf.set("pig.job.submitted.timestamp",
+ Long.toString(System.currentTimeMillis()));
}
- } catch (Exception e) {
- throw e;
- } finally {
- tezStats.accumulateStats(jc);
- jc.stop();
+
+ // All the setup done, now lets launch the jobs. DAG is submitted to
+ // YARN cluster by TezJob.submit().
+ jcThread.start();
}
tezStats.finish();
@@ -122,10 +130,10 @@ public class TezLauncher extends Launche
String format, boolean verbose) throws PlanException,
VisitorException, IOException {
log.debug("Entering TezLauncher.explain");
- TezOperPlan tezp = compile(php, pc);
+ TezPlanContainer tezPlanContainer = compile(php, pc);
if (format.equals("text")) {
- TezPrinter printer = new TezPrinter(ps, tezp);
+ TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
printer.setVerbose(verbose);
printer.visit();
} else {
@@ -134,12 +142,13 @@ public class TezLauncher extends Launche
}
}
- public TezOperPlan compile(PhysicalPlan php, PigContext pc)
+ public TezPlanContainer compile(PhysicalPlan php, PigContext pc)
throws PlanException, IOException, VisitorException {
TezCompiler comp = new TezCompiler(php, pc);
comp.compile();
+ TezOperPlan plan = comp.getTezPlan();
// TODO: Run optimizations here
- return comp.getTezPlan();
+ return comp.getPlanContainer();
}
@Override
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Mon Nov 11 19:28:17 2013
@@ -18,8 +18,14 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.PrintStream;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
@@ -33,7 +39,6 @@ public class TezOperPlan extends Operato
private static final long serialVersionUID = 1L;
public TezOperPlan() {
- // TODO Auto-generated constructor stub
}
@Override
@@ -49,5 +54,11 @@ public class TezOperPlan extends Operato
}
return baos.toString();
}
+
+ public Map<String, LocalResource> getLocalExtraResources() throws IOException {
+ Set<URL> jarLists = new HashSet<URL>();
+ // TODO: Add script jars/pig-misc jars
+ return TezResourceManager.getTezResources(jarLists);
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Mon Nov 11 19:28:17 2013
@@ -57,6 +57,8 @@ public class TezOperator extends Operato
// Indicates that the plan creation is complete
boolean closed = false;
+
+ boolean segmentBelow = false;
// Types of blocking operators. For now, we only support the following ones.
private static enum OPER_FEATURE {
@@ -173,5 +175,9 @@ public class TezOperator extends Operato
sb.delete(sb.length() - "\n".length(), sb.length());
return sb.toString();
}
+
+ public boolean needSegmentBelow() {
+ return segmentBelow;
+ }
}
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.JarManager;
+
+public class TezPlanContainer extends OperatorPlan<TezPlanContainerNode> {
+ private static final long serialVersionUID = 1L;
+ private PigContext pigContext;
+
+ public TezPlanContainer(PigContext pigContext) {
+ this.pigContext = pigContext;
+ }
+
+ // Use pig.jar and udf jars for the AM resources (all DAG in the planContainer will
+ // use it for simplicity)
+ public Map<String, LocalResource> getLocalResources() throws IOException {
+ Set<URL> jarLists = new HashSet<URL>();
+
+ jarLists.add(TezResourceManager.getBootStrapJar());
+
+ jarLists.addAll(pigContext.extraJars);
+
+ TezPlanContainerUDFCollector tezPlanContainerUDFCollector = new TezPlanContainerUDFCollector(this);
+ tezPlanContainerUDFCollector.visit();
+ Set<String> udfs = tezPlanContainerUDFCollector.getUdfs();
+
+ for (String func: udfs) {
+ Class clazz = pigContext.getClassForAlias(func);
+ if (clazz != null) {
+ String jarName = JarManager.findContainingJar(clazz);
+ URL jarUrl = new File(jarName).toURI().toURL();
+ jarLists.add(jarUrl);
+ }
+ }
+
+ return TezResourceManager.getTezResources(jarLists);
+ }
+
+ public TezOperPlan getNextPlan(List<TezOperPlan> processedPlans) {
+ synchronized(this) {
+ while (getRoots()!=null && !getRoots().isEmpty()) {
+ TezPlanContainerNode currentPlan = null;
+ for (TezPlanContainerNode plan : getRoots()) {
+ if (!processedPlans.contains(plan.getNode())) {
+ currentPlan = plan;
+ break;
+ }
+ }
+ if (currentPlan!=null) {
+ return currentPlan.getNode();
+ } else {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public void updatePlan(TezOperPlan plan, boolean succ) {
+ String scope = getRoots().get(0).getOperatorKey().getScope();
+ TezPlanContainerNode tezPlanContainerNode = new TezPlanContainerNode(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)), plan);
+ synchronized(this) {
+ if (succ) {
+ remove(tezPlanContainerNode);
+ } else {
+ // job fail
+ trimBelow(tezPlanContainerNode);
+ remove(tezPlanContainerNode);
+ }
+ notify();
+ }
+ }
+
+ public void split(TezPlanContainerNode planNode) throws PlanException {
+ TezOperPlan tezOperPlan = planNode.getNode();
+ for (TezOperator tezOper : tezOperPlan) {
+ if (tezOper.needSegmentBelow() && tezOperPlan.getSuccessors(tezOper)!=null) {
+ for (TezOperator succ : tezOperPlan.getSuccessors(tezOper)) {
+ tezOperPlan.disconnect(tezOper, succ);
+ TezOperPlan newOperPlan = new TezOperPlan();
+ tezOperPlan.moveTree(succ, newOperPlan);
+ TezPlanContainerNode newPlanNode = new TezPlanContainerNode(tezOper.getOperatorKey(), newOperPlan);
+ add(newPlanNode);
+ connect(planNode, newPlanNode);
+ split(planNode);
+ split(newPlanNode);
+ }
+ }
+ }
+ }
+}
+
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerNode.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerNode extends Operator<TezPlanContainerVisitor>{
+ private static final long serialVersionUID = 1L;
+ TezOperPlan node;
+ public TezPlanContainerNode(OperatorKey k, TezOperPlan node) {
+ super(k);
+ this.node = node;
+ }
+
+ @Override
+ public void visit(TezPlanContainerVisitor v) throws VisitorException {
+ v.visitTezPlanContainerNode(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return true;
+ }
+
+ @Override
+ public String name() {
+ return "DAG " + mKey;
+ }
+
+ public TezOperPlan getNode() {
+ return node;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o != null && o instanceof TezPlanContainerNode) {
+ return ((TezPlanContainerNode)o).getNode().equals(getNode());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getNode().hashCode();
+ }
+}
+
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerPrinter.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.PrintStream;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerPrinter extends TezPlanContainerVisitor {
+ private PrintStream mStream = null;
+ private boolean isVerbose = true;
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan tez plan to print
+ */
+ public TezPlanContainerPrinter(PrintStream ps, TezPlanContainer planContainer) {
+ super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+ mStream = ps;
+ mStream.println("#--------------------------------------------------");
+ mStream.println("# There are " + planContainer.size() + " DAGs in the session");
+ mStream.println("#--------------------------------------------------");
+ }
+
+ public void setVerbose(boolean verbose) {
+ isVerbose = verbose;
+ }
+
+ @Override
+ public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+ mStream.println("#--------------------------------------------------");
+ mStream.println("# TEZ plan: " + tezPlanContainerNode.getOperatorKey());
+ mStream.println("#--------------------------------------------------");
+ TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getNode());
+ printer.setVerbose(isVerbose);
+ printer.visit();
+ }
+}
+
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerUDFCollector.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.python.google.common.collect.Sets;
+
+public class TezPlanContainerUDFCollector extends TezPlanContainerVisitor {
+ private Set<String> udfs = Sets.newHashSet();
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan tez plan to print
+ */
+ public TezPlanContainerUDFCollector(TezPlanContainer planContainer) {
+ super(planContainer, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(planContainer));
+ }
+
+ @Override
+ public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+ Iterator<TezOperator> it = tezPlanContainerNode.getNode().iterator();
+ while (it.hasNext()) {
+ udfs.addAll(it.next().UDFs);
+ }
+ }
+
+ public Set<String> getUdfs() {
+ return udfs;
+ }
+}
+
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainerVisitor.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezPlanContainerVisitor extends PlanVisitor<TezPlanContainerNode, TezPlanContainer> {
+ public TezPlanContainerVisitor(TezPlanContainer plan, PlanWalker<TezPlanContainerNode, TezPlanContainer> walker) {
+ super(plan, walker);
+ // TODO Auto-generated constructor stub
+ }
+
+ public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+ // TODO Auto-generated method stub
+ }
+}
+
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Mon Nov 11 19:28:17 2013
@@ -40,9 +40,6 @@ public class TezPrinter extends TezOpPla
public TezPrinter(PrintStream ps, TezOperPlan plan) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
mStream = ps;
- mStream.println("#--------------------------------------------------");
- mStream.println("# TEZ plan:");
- mStream.println("#--------------------------------------------------");
}
public void setVerbose(boolean verbose) {
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.Utils;
+
+public class TezResourceManager {
+ private static Path stagingDir;
+ private static PigContext pigContext;
+ private static Configuration conf;
+ private static URL bootStrapJar;
+ private static FileSystem remoteFs;
+
+ public static Map<URL, Path> resources = new HashMap<URL, Path>();
+
+ public static URL getBootStrapJar() {
+ return bootStrapJar;
+ }
+
+ public static void initialize(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+ TezResourceManager.stagingDir = stagingDir;
+ TezResourceManager.pigContext = pigContext;
+ TezResourceManager.conf = conf;
+ String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
+ TezResourceManager.bootStrapJar = new File(jar).toURI().toURL();
+ remoteFs = FileSystem.get(conf);
+ addBootStrapJar();
+ }
+
+ public static void addLocalResource(URL url) throws IOException {
+ if (resources.containsKey(url)) {
+ return;
+ }
+
+ Path pathInHDFS = Utils.shipToHDFS(pigContext, conf, url);
+ resources.put(url, pathInHDFS);
+ }
+
+ public static void addBootStrapJar() throws IOException {
+ if (resources.containsKey(bootStrapJar)) {
+ return;
+ }
+
+ FileSystem remoteFs = FileSystem.get(conf);
+
+ File jobJar = File.createTempFile("Job", ".jar");
+ jobJar.deleteOnExit();
+ FileOutputStream fos = new FileOutputStream(jobJar);
+ JarManager.createBootStrapJar(fos, pigContext);
+
+ // Ship the job.jar to the staging directory on hdfs
+ Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName()));
+ remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
+
+ resources.put(bootStrapJar, remoteJarPath);
+ }
+
+ public static Path get(URL url) {
+ return resources.get(url);
+ }
+
+ public static Map<String, LocalResource> getTezResources(Set<URL> urls) throws IOException {
+ Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
+ for (URL url : urls) {
+ if (!resources.containsKey(url)) {
+ addLocalResource(url);
+ }
+ FileStatus fstat = remoteFs.getFileStatus(resources.get(url));
+ LocalResource tezResource = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ fstat.getLen(),
+ fstat.getModificationTime());
+ tezResources.put(resources.get(url).getName(), tezResource);
+ }
+ return tezResources;
+ }
+}
+
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1540818&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Mon Nov 11 19:28:17 2013
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.impl.PigContext;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+
+public class TezSessionManager {
+ public static class SessionInfo {
+ SessionInfo(TezSession session, Map<String, LocalResource> resources) {
+ this.session = session;
+ this.resources = resources;
+ }
+ public Map<String, LocalResource> getResources() {
+ return resources;
+ }
+ public TezSession getTezSession() {
+ return session;
+ }
+ public void setInUse(boolean inUse) {
+ this.inUse = inUse;
+ }
+ private TezSession session;
+ private Map<String, LocalResource> resources;
+ private boolean inUse = false;
+ }
+
+ private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
+
+ private static void waitForTezSessionReady(TezSession tezSession)
+ throws IOException, TezException {
+ while (true) {
+ TezSessionStatus status = tezSession.getSessionStatus();
+ if (status.equals(TezSessionStatus.SHUTDOWN)) {
+ throw new RuntimeException("TezSession has already shutdown");
+ }
+ if (status.equals(TezSessionStatus.READY)) {
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while trying to check session status", e);
+ }
+ }
+ }
+
+ private static SessionInfo createSession(Configuration conf, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+ TezConfiguration tezConf = new TezConfiguration(conf);
+ TezClient tezClient = new TezClient(tezConf);
+ ApplicationId appId = tezClient.createApplication();
+
+ Map<String, LocalResource> resources = new HashMap<String, LocalResource>();
+ resources.putAll(requestedAMResources);
+
+ String jobName = conf.get(PigContext.JOB_NAME, "pig");
+ AMConfiguration amConfig = new AMConfiguration(null, resources, tezConf, null);
+ TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+ TezSession tezSession = new TezSession(jobName, appId, sessionConfig);
+ tezSession.start();
+ waitForTezSessionReady(tezSession);
+ return new SessionInfo(tezSession, resources);
+ }
+
+ private static boolean validateSessionResources(SessionInfo currentSession, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+ for (Map.Entry<String, LocalResource> entry : requestedAMResources.entrySet()) {
+ if (!currentSession.resources.entrySet().contains(entry)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ static TezSession getSession(Configuration conf, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+ synchronized (sessionPool) {
+ List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
+ for (SessionInfo sessionInfo : sessionPool) {
+ if (sessionInfo.session.getSessionStatus()==TezSessionStatus.SHUTDOWN) {
+ sessionsToRemove.add(sessionInfo);
+ }
+ else if (!sessionInfo.inUse && sessionInfo.session.getSessionStatus()==TezSessionStatus.READY &&
+ validateSessionResources(sessionInfo, requestedAMResources)) {
+ sessionInfo.inUse = true;
+ return sessionInfo.session;
+ }
+ }
+
+ for (SessionInfo sessionToRemove : sessionsToRemove) {
+ sessionPool.remove(sessionToRemove);
+ }
+
+ // We cannot find available AM, create new one
+ SessionInfo sessionInfo = createSession(conf, requestedAMResources);
+ sessionInfo.inUse = true;
+ sessionPool.add(sessionInfo);
+ return sessionInfo.session;
+ }
+ }
+
+ static void freeSession(TezSession session) {
+ synchronized (sessionPool) {
+ for (SessionInfo sessionInfo : sessionPool) {
+ if (sessionInfo.session==session) {
+ sessionInfo.inUse = false;
+ }
+ }
+ }
+ }
+}
+
Modified: pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Nov 11 19:28:17 2013
@@ -281,6 +281,24 @@ public abstract class OperatorPlan<E ext
}
}
}
+
+ /**
+ * Move everything below a given operator to the new operator plan. The specified operator will
+ * be moved and will be the root of the new operator plan
+ * @param root Operator to move everything after
+ * @param newPlan new operator plan to move things into
+ * @throws PlanException
+ */
+ public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
+ List<E> succs = getSuccessors(root);
+ for (E succ : succs) {
+ moveTree(succ, newPlan);
+ }
+ remove(root);
+ for (E succ : succs) {
+ newPlan.connect(root, succ);
+ }
+ }
/**
* Trim everything above a given operator. The specified operator will
Modified: pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java Mon Nov 11 19:28:17 2013
@@ -128,6 +128,23 @@ public class JarManager {
return pkgClass;
}
}
+
+ public static void createBootStrapJar(OutputStream os, PigContext pigContext) throws IOException {
+ JarOutputStream jarFile = new JarOutputStream(os);
+ HashMap<String, String> contents = new HashMap<String, String>();
+ Vector<JarListEntry> jarList = new Vector<JarListEntry>();
+
+ for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
+ addContainingJar(jarList, pkgToSend.getPkgClass(), pkgToSend.getPkgPrefix(), pigContext);
+ }
+
+ Iterator<JarListEntry> it = jarList.iterator();
+ while (it.hasNext()) {
+ JarListEntry jarEntry = it.next();
+ mergeJar(jarFile, jarEntry.jar, jarEntry.prefix, contents);
+ }
+ jarFile.close();
+ }
/**
* Create a jarfile in a temporary path, that is a merge of all the jarfiles containing the
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC1.gld Mon Nov 11 19:28:17 2013
@@ -1,5 +1,8 @@
#--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ plan: scope-17
#--------------------------------------------------
Tez vertex scope-16
c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-15
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC2.gld Mon Nov 11 19:28:17 2013
@@ -1,29 +1,32 @@
#--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
#--------------------------------------------------
-Tez vertex scope-35
-b: Local Rearrange[tuple]{int}(false) - scope-27
+#--------------------------------------------------
+# TEZ plan: scope-38
+#--------------------------------------------------
+Tez vertex scope-36
+b: Local Rearrange[tuple]{int}(false) - scope-28
| |
-| Project[int][0] - scope-28
+| Project[int][0] - scope-29
|
-|---a: New For Each(false,false)[bag] - scope-24
+|---a: New For Each(false,false)[bag] - scope-25
| |
- | Cast[int] - scope-19
+ | Cast[int] - scope-20
| |
- | |---Project[bytearray][0] - scope-18
+ | |---Project[bytearray][0] - scope-19
| |
- | Cast[int] - scope-22
+ | Cast[int] - scope-23
| |
- | |---Project[bytearray][1] - scope-21
+ | |---Project[bytearray][1] - scope-22
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-17
-Tez vertex scope-36
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-34
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-18
+Tez vertex scope-37
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-35
|
-|---c: New For Each(false,false)[bag] - scope-33
+|---c: New For Each(false,false)[bag] - scope-34
| |
- | Project[int][0] - scope-29
+ | Project[int][0] - scope-30
| |
- | Project[bag][1] - scope-31
+ | Project[bag][1] - scope-32
|
- |---b: Package[tuple]{int} - scope-26
+ |---b: Package[tuple]{int} - scope-27
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC3.gld Mon Nov 11 19:28:17 2013
@@ -1,53 +1,56 @@
#--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
#--------------------------------------------------
-Tez vertex scope-72
-c: Local Rearrange[tuple]{int}(false) - scope-57
+#--------------------------------------------------
+# TEZ plan: scope-77
+#--------------------------------------------------
+Tez vertex scope-74
+c: Local Rearrange[tuple]{int}(false) - scope-59
| |
-| Project[int][0] - scope-58
+| Project[int][0] - scope-60
|
-|---a: New For Each(false,false)[bag] - scope-44
+|---a: New For Each(false,false)[bag] - scope-46
| |
- | Cast[int] - scope-39
+ | Cast[int] - scope-41
| |
- | |---Project[bytearray][0] - scope-38
+ | |---Project[bytearray][0] - scope-40
| |
- | Cast[int] - scope-42
+ | Cast[int] - scope-44
| |
- | |---Project[bytearray][1] - scope-41
+ | |---Project[bytearray][1] - scope-43
|
- |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-37
-Tez vertex scope-73
-c: Local Rearrange[tuple]{int}(false) - scope-59
+ |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-39
+Tez vertex scope-75
+c: Local Rearrange[tuple]{int}(false) - scope-61
| |
-| Project[int][0] - scope-60
+| Project[int][0] - scope-62
|
-|---b: New For Each(false,false)[bag] - scope-52
+|---b: New For Each(false,false)[bag] - scope-54
| |
- | Cast[int] - scope-47
+ | Cast[int] - scope-49
| |
- | |---Project[bytearray][0] - scope-46
+ | |---Project[bytearray][0] - scope-48
| |
- | Cast[int] - scope-50
+ | Cast[int] - scope-52
| |
- | |---Project[bytearray][1] - scope-49
+ | |---Project[bytearray][1] - scope-51
|
- |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-45
-Tez vertex scope-74
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-71
+ |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-47
+Tez vertex scope-76
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-73
|
-|---d: New For Each(false,false,false)[bag] - scope-70
+|---d: New For Each(false,false,false)[bag] - scope-72
| |
- | Project[int][0] - scope-64
+ | Project[int][0] - scope-66
| |
- | Project[int][1] - scope-66
+ | Project[int][1] - scope-68
| |
- | Project[int][3] - scope-68
+ | Project[int][3] - scope-70
|
- |---c: New For Each(true,true)[tuple] - scope-63
+ |---c: New For Each(true,true)[tuple] - scope-65
| |
- | Project[bag][1] - scope-61
+ | Project[bag][1] - scope-63
| |
- | Project[bag][2] - scope-62
+ | Project[bag][2] - scope-64
|
- |---c: Package[tuple]{int} - scope-56
+ |---c: Package[tuple]{int} - scope-58
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld Mon Nov 11 19:28:17 2013
@@ -1,31 +1,34 @@
#--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
#--------------------------------------------------
-Tez vertex scope-85
-Local Rearrange[tuple]{tuple}(false) - scope-87
+#--------------------------------------------------
+# TEZ plan: scope-96
+#--------------------------------------------------
+Tez vertex scope-88
+Local Rearrange[tuple]{tuple}(false) - scope-90
| |
-| Project[tuple][*] - scope-86
+| Project[tuple][*] - scope-89
|
-|---b: Limit - scope-76
+|---b: Limit - scope-79
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-75
-Tez vertex scope-88
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-84
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-78
+Tez vertex scope-91
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-87
|
-|---c: New For Each(false)[bag] - scope-83
+|---c: New For Each(false)[bag] - scope-86
| |
- | Project[int][0] - scope-81
+ | Project[int][0] - scope-84
|
- |---a: New For Each(false)[bag] - scope-80
+ |---a: New For Each(false)[bag] - scope-83
| |
- | Cast[int] - scope-78
+ | Cast[int] - scope-81
| |
- | |---Project[bytearray][0] - scope-77
+ | |---Project[bytearray][0] - scope-80
|
- |---Limit - scope-92
+ |---Limit - scope-95
|
- |---New For Each(true)[bag] - scope-91
+ |---New For Each(true)[bag] - scope-94
| |
- | Project[tuple][1] - scope-90
+ | Project[tuple][1] - scope-93
|
- |---Package[tuple]{tuple} - scope-89
+ |---Package[tuple]{tuple} - scope-92
Modified: pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld (original)
+++ pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld Mon Nov 11 19:28:17 2013
@@ -1,31 +1,34 @@
#--------------------------------------------------
-# TEZ plan:
+# There are 1 DAGs in the session
#--------------------------------------------------
-Tez vertex scope-106
-Local Rearrange[tuple]{tuple}(true) - scope-108
+#--------------------------------------------------
+# TEZ plan: scope-117
+#--------------------------------------------------
+Tez vertex scope-110
+Local Rearrange[tuple]{tuple}(true) - scope-112
| |
-| Project[tuple][*] - scope-107
+| Project[tuple][*] - scope-111
|
-|---a: New For Each(false,false)[bag] - scope-100
+|---a: New For Each(false,false)[bag] - scope-104
| |
- | Cast[int] - scope-95
+ | Cast[int] - scope-99
| |
- | |---Project[bytearray][0] - scope-94
+ | |---Project[bytearray][0] - scope-98
| |
- | Cast[int] - scope-98
+ | Cast[int] - scope-102
| |
- | |---Project[bytearray][1] - scope-97
+ | |---Project[bytearray][1] - scope-101
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-93
-Tez vertex scope-109
-c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-105
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-97
+Tez vertex scope-113
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-109
|
-|---c: New For Each(false)[bag] - scope-104
+|---c: New For Each(false)[bag] - scope-108
| |
- | Project[int][1] - scope-102
+ | Project[int][1] - scope-106
|
- |---New For Each(true)[bag] - scope-112
+ |---New For Each(true)[bag] - scope-116
| |
- | Project[tuple][0] - scope-111
+ | Project[tuple][0] - scope-115
|
- |---Package[tuple]{tuple} - scope-110
+ |---Package[tuple]{tuple} - scope-114
Modified: pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java?rev=1540818&r1=1540817&r2=1540818&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java Mon Nov 11 19:28:17 2013
@@ -29,8 +29,8 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezCompiler;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLocalExecType;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainer;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezPlanContainerPrinter;
import org.apache.pig.impl.PigContext;
import org.apache.pig.test.Util;
import org.apache.pig.test.junit.OrderedJUnit4Runner;
@@ -141,11 +141,12 @@ public class TestTezCompiler {
private void run(PhysicalPlan pp, String expectedFile) throws Exception {
TezCompiler comp = new TezCompiler(pp, pc);
- TezOperPlan tezPlan = comp.compile();
+ comp.compile();
+ TezPlanContainer tezPlanContainer = comp.getPlanContainer();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
- TezPrinter printer = new TezPrinter(ps, tezPlan);
+ TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer);
printer.visit();
String compiledPlan = baos.toString();