You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/03/05 02:04:07 UTC
svn commit: r750264 - in /hadoop/pig/branches/multiquery: ./
src/org/apache/pig/ src/org/apache/pig/backend/executionengine/util/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physic...
Author: pradeepkth
Date: Thu Mar 5 01:04:06 2009
New Revision: 750264
URL: http://svn.apache.org/viewvc?rev=750264&view=rev
Log:
PIG-627: multiquery support M2 (hagleitn via pradeepkth)
Modified:
hadoop/pig/branches/multiquery/CHANGES.txt
hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java
hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: hadoop/pig/branches/multiquery/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/CHANGES.txt?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/CHANGES.txt (original)
+++ hadoop/pig/branches/multiquery/CHANGES.txt Thu Mar 5 01:04:06 2009
@@ -408,3 +408,5 @@
PIG-642: Limit after FRJ causes problems (daijy)
PIG-627: multiquery support M1 (hagleitn via olgan)
+
+ PIG-627: multiquery support M2 (hagleitn via pradeepkth)
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/PigServer.java Thu Mar 5 01:04:06 2009
@@ -751,11 +751,9 @@
if(leaves.size() == 1) {
leaf = leaves.get(0);
} else {
- for (Iterator<LogicalOperator> it = leaves.iterator(); it.hasNext();) {
- LogicalOperator leafOp = it.next();
- if(leafOp.getAlias().equals(alias))
- leaf = leafOp;
- }
+ // should have exactly one leaf since we asked for a
+ // specific alias.
+ throw new AssertionError("Ceci n'est pas un bug.");
}
lp = QueryParser.generateStorePlan(scope, lp, "fakefile",
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/executionengine/util/ExecTools.java Thu Mar 5 01:04:06 2009
@@ -47,7 +47,6 @@
String scope = leaf.getOperatorKey().getScope();
POStore str = new POStore(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- str.setPc(pigContext);
spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
pigContext).toString(),
new FuncSpec(BinStorage.class.getName()));
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Mar 5 01:04:06 2009
@@ -22,12 +22,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
@@ -53,6 +49,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TupleFactory;
@@ -104,85 +101,121 @@
public static final String LOG_DIR = "_logs";
+ private List<Path> tmpPath;
+ private Path curTmpPath;
+
+ public JobControlCompiler(PigContext pigContext, Configuration conf) throws IOException {
+ this.pigContext = pigContext;
+ this.conf = conf;
+ tmpPath = new LinkedList<Path>();
+ }
+
+ /**
+ * Moves all the results of a collection of MR jobs to the final
+ * output directory. Some of the results may have been put into a
+ * temp location to work around restrictions with multiple output
+ * from a single map reduce job.
+ *
+ * This method should always be called after the job execution
+ * completes.
+ */
+ public void moveResults() throws IOException {
+ if (curTmpPath != null) {
+ tmpPath.add(curTmpPath);
+ curTmpPath = null;
+ }
+
+ for (Path tmp: tmpPath) {
+ Path abs = new Path(tmp, "abs");
+ Path rel = new Path(tmp, "rel");
+ FileSystem fs = tmp.getFileSystem(conf);
+
+ if (fs.exists(abs)) {
+ moveResults(abs, abs.toUri().getPath(), fs);
+ }
+
+ if (fs.exists(rel)) {
+ moveResults(rel, rel.toUri().getPath()+"/", fs);
+ }
+ }
+ tmpPath = new LinkedList<Path>();
+ }
+
+ /**
+ * Walks the temporary directory structure to move (rename) files
+ * to their final location.
+ */
+ private void moveResults(Path p, String rem, FileSystem fs) throws IOException {
+ for (FileStatus fstat: fs.listStatus(p)) {
+ Path src = fstat.getPath();
+ if (fstat.isDir()) {
+ fs.mkdirs(removePart(src, rem));
+ moveResults(fstat.getPath(), rem, fs);
+ } else {
+ Path dst = removePart(src, rem);
+ fs.rename(src,dst);
+ }
+ }
+ }
+
+ private Path removePart(Path src, String part) {
+ URI uri = src.toUri();
+ String pathStr = uri.getPath().replace(part, "");
+ return new Path(pathStr);
+ }
+
+ private void makeTmpPath() throws IOException {
+ if (curTmpPath != null) {
+ tmpPath.add(curTmpPath);
+ }
+
+ for (int tries = 0;;) {
+ try {
+ curTmpPath =
+ new Path(FileLocalizer
+ .getTemporaryPath(null, pigContext).toString());
+ FileSystem fs = curTmpPath.getFileSystem(conf);
+ curTmpPath = curTmpPath.makeQualified(fs);
+ fs.mkdirs(curTmpPath);
+ break;
+ } catch (IOException ioe) {
+ if (++tries==100) {
+ throw ioe;
+ }
+ }
+ }
+ }
+
/**
* The map between MapReduceOpers and their corresponding Jobs
*/
Map<OperatorKey, Job> seen = new Hashtable<OperatorKey, Job>();
/**
- * Top level compile method that issues a call to the recursive
- * compile method.
+ * Compiles all jobs that have no dependencies removes them from
+ * the plan and returns. Should be called with the same plan until
+ * exhausted.
* @param plan - The MROperPlan to be compiled
* @param grpName - The name given to the JobControl
- * @param conf - The Configuration object having the various properties
- * @param pigContext - PigContext passed on from the execution engine
- * @return JobControl object
+ * @return JobControl object - null if no more jobs in plan
* @throws JobCreationException
*/
- public JobControl compile(MROperPlan plan, String grpName, Configuration conf, PigContext pigContext) throws JobCreationException{
+ public JobControl compile(MROperPlan plan, String grpName) throws JobCreationException{
this.plan = plan;
- this.conf = conf;
- this.pigContext = pigContext;
- JobControl jobCtrl = new JobControl(grpName);
-
- List<MapReduceOper> leaves ;
- leaves = plan.getLeaves();
-
- for (MapReduceOper mro : leaves) {
- jobCtrl.addJob(compile(mro,jobCtrl));
+
+ if (plan.size() == 0) {
+ return null;
}
- return jobCtrl;
- }
-
- /**
- * The recursive compilation method that works by doing a depth first
- * traversal of the MROperPlan. Compiles a Job for the input MapReduceOper
- * with the dependencies maintained in jobCtrl
- * @param mro - Input MapReduceOper for which a Job needs to be compiled
- * @param jobCtrl - The running JobCtrl object to maintain dependencies b/w jobs
- * @return Job corresponding to the input mro
- * @throws JobCreationException
- */
- private Job compile(MapReduceOper mro, JobControl jobCtrl) throws JobCreationException {
- List<MapReduceOper> pred = plan.getPredecessors(mro);
-
- JobConf currJC = null;
-
- try{
- if(pred==null || pred.size()<=0){
- //No dependencies! Create the JobConf
- //Construct the Job object with it and return
- Job ret = null;
- if(seen.containsKey(mro.getOperatorKey()))
- ret = seen.get(mro.getOperatorKey());
- else{
- currJC = getJobConf(mro, conf, pigContext);
- ret = new Job(currJC,null);
- seen.put(mro.getOperatorKey(), ret);
- }
- return ret;
- }
-
- //Has dependencies. So compile all the inputs
- List<Job> compiledInputs = new ArrayList<Job>(pred.size());
-
- for (MapReduceOper oper : pred) {
- Job ret = null;
- if(seen.containsKey(oper.getOperatorKey()))
- ret = seen.get(oper.getOperatorKey());
- else{
- ret = compile(oper, jobCtrl);
- jobCtrl.addJob(ret);
- seen.put(oper.getOperatorKey(),ret);
- }
- compiledInputs.add(ret);
+
+ JobControl jobCtrl = new JobControl(grpName);
+
+ try {
+ List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
+ roots.addAll(plan.getRoots());
+ for (MapReduceOper mro: roots) {
+ jobCtrl.addJob(new Job(getJobConf(mro, conf, pigContext)));
+ plan.remove(mro);
}
- //Get JobConf for the current MapReduceOper
- currJC = getJobConf(mro, conf, pigContext);
-
- //Create a new Job with the obtained JobConf
- //and the compiled inputs as dependent jobs
- return new Job(currJC,(ArrayList<Job>)compiledInputs);
} catch (JobCreationException jce) {
throw jce;
} catch(Exception e) {
@@ -190,8 +223,10 @@
String msg = "Internal error creating job configuration.";
throw new JobCreationException(msg, errCode, PigException.BUG, e);
}
+
+ return jobCtrl;
}
-
+
/**
* The method that creates the JobConf corresponding to a MapReduceOper.
* The assumption is that
@@ -225,10 +260,10 @@
jobConf.setUser(user != null ? user : "Pigster");
//Process the POLoads
- List<PhysicalOperator> lds = getRoots(mro.mapPlan);
+ List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+
if(lds!=null && lds.size()>0){
- for (PhysicalOperator operator : lds) {
- POLoad ld = (POLoad)operator;
+ for (POLoad ld : lds) {
Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
//Store the inp filespecs
@@ -275,27 +310,49 @@
jobConf.setOutputFormat(PigOutputFormat.class);
//Process POStore and remove it from the plan
- POStore st = null;
- if(mro.reducePlan.isEmpty()){
- st = (POStore) mro.mapPlan.getLeaves().get(0);
- mro.mapPlan.remove(st);
- }
- else{
- st = (POStore) mro.reducePlan.getLeaves().get(0);
- mro.reducePlan.remove(st);
- }
- //set out filespecs
- String outputPath = st.getSFile().getFileName();
- FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
- FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
- jobConf.set("pig.storeFunc", outputFuncSpec.toString());
-
- // Setup the logs directory for streaming jobs
- jobConf.set("pig.streaming.log.dir",
- new Path(new Path(outputPath), LOG_DIR).toString());
- jobConf.set("pig.streaming.task.output.dir", outputPath);
+ List<POStore> mapStores = PlanHelper.getStores(mro.mapPlan);
+ List<POStore> reduceStores = PlanHelper.getStores(mro.reducePlan);
+
+ if (mapStores.size() + reduceStores.size() == 1) { // single store case
+ log.info("Setting up single store job");
+
+ POStore st;
+ if (reduceStores.isEmpty()) {
+ st = mapStores.remove(0);
+ mro.mapPlan.remove(st);
+ }
+ else {
+ st = reduceStores.remove(0);
+ mro.reducePlan.remove(st);
+ }
+ String outputPath = st.getSFile().getFileName();
+ FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
+ FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
+ jobConf.set("pig.storeFunc", outputFuncSpec.toString());
+
+ jobConf.set("pig.streaming.log.dir",
+ new Path(outputPath, LOG_DIR).toString());
+ jobConf.set("pig.streaming.task.output.dir", outputPath);
+ }
+ else { // multi store case
+ log.info("Setting up multi store job");
+
+ makeTmpPath();
+ FileSystem fs = curTmpPath.getFileSystem(conf);
+ for (POStore st: mapStores) {
+ Path tmpOut = new Path(
+ curTmpPath,
+ PlanHelper.makeStoreTmpPath(st.getSFile().getFileName()));
+ fs.mkdirs(tmpOut);
+ }
+
+ FileOutputFormat.setOutputPath(jobConf, curTmpPath);
+
+ jobConf.set("pig.streaming.log.dir",
+ new Path(curTmpPath, LOG_DIR).toString());
+ jobConf.set("pig.streaming.task.output.dir", curTmpPath.toString());
+ }
-
// store map key type
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/LocalLauncher.java Thu Mar 5 01:04:06 2009
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
-
-import java.io.IOException;
-import java.io.PrintStream;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ConfigurationValidator;
-
-
-public class LocalLauncher extends Launcher{
- private static final Log log = LogFactory.getLog(LocalLauncher.class);
-
- @Override
- public boolean launchPig(
- PhysicalPlan php,
- String grpName,
- PigContext pc) throws PlanException, VisitorException,
- IOException, ExecException,
- JobCreationException {
- long sleepTime = 500;
- MROperPlan mrp = compile(php, pc);
-
- ExecutionEngine exe = pc.getExecutionEngine();
- Properties validatedProperties = ConfigurationValidator.getValidatedProperties(exe.getConfiguration());
- Configuration conf = ConfigurationUtil.toConfiguration(validatedProperties);
- conf.set("mapred.job.tracker", "local");
- JobClient jobClient = new JobClient(new JobConf(conf));
-
- JobControlCompiler jcc = new JobControlCompiler();
-
- JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-
-
- int numMRJobs = jc.getWaitingJobs().size();
-
- new Thread(jc).start();
-
- double lastProg = -1;
- int perCom = 0;
- while(!jc.allFinished()){
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {}
- double prog = calculateProgress(jc, jobClient)/numMRJobs;
- if(prog>=(lastProg+0.01)){
- perCom = (int)(prog * 100);
- if(perCom!=100)
- log.info( perCom + "% complete");
- }
- lastProg = prog;
- }
- // Look to see if any jobs failed. If so, we need to report that.
- List<Job> failedJobs = jc.getFailedJobs();
- if (failedJobs != null && failedJobs.size() > 0) {
- log.error("Map reduce job failed");
- for (Job fj : failedJobs) {
- log.error(fj.getMessage());
- getStats(fj, jobClient, true);
- }
- jc.stop();
- return false;
- }
-
- List<Job> succJobs = jc.getSuccessfulJobs();
- if(succJobs!=null)
- for(Job job : succJobs){
- getStats(job,jobClient, false);
- }
-
- jc.stop();
- log.info( "100% complete");
- log.info("Success!");
- return true;
- }
-
- @Override
- public void explain(
- PhysicalPlan php,
- PigContext pc,
- PrintStream ps,
- String format,
- boolean verbose) throws PlanException, VisitorException,
- IOException {
- log.trace("Entering LocalLauncher.explain");
- MROperPlan mrp = compile(php, pc);
-
- if (format.equals("text")) {
- MRPrinter printer = new MRPrinter(ps, mrp);
- printer.setVerbose(verbose);
- printer.visit();
- } else {
- DotMRPrinter printer =new DotMRPrinter(mrp, ps);
- printer.setVerbose(verbose);
- printer.dump();
- }
- }
-
- private MROperPlan compile(
- PhysicalPlan php,
- PigContext pc) throws PlanException, IOException, VisitorException {
- MRCompiler comp = new MRCompiler(php, pc);
- comp.randomizeFileLocalizer();
- comp.compile();
- MROperPlan plan = comp.getMRPlan();
- String lastInputChunkSize =
- pc.getProperties().getProperty(
- "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
- String prop = System.getProperty("pig.exec.nocombiner");
- if (!("true".equals(prop))) {
- CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize);
- co.visit();
- }
-
- // optimize key - value handling in package
- POPackageAnnotator pkgAnnotator = new POPackageAnnotator(plan);
- pkgAnnotator.visit();
-
- // check whether stream operator is present
- MRStreamHandler checker = new MRStreamHandler(plan);
- checker.visit();
-
- // optimize joins
- LastInputStreamingOptimizer liso =
- new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
- liso.visit();
-
- // figure out the type of the key for the map plan
- // this is needed when the key is null to create
- // an appropriate NullableXXXWritable object
- KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
- kdv.visit();
- return plan;
- }
-
- //A purely testing method. Not to be used elsewhere
- public boolean launchPigWithCombinePlan(PhysicalPlan php,
- String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
- VisitorException, IOException, ExecException, JobCreationException {
- long sleepTime = 500;
- MRCompiler comp = new MRCompiler(php, pc);
- comp.compile();
-
- Configuration conf = new Configuration();
- conf.set("mapred.job.tracker", "local");
- JobClient jobClient = new JobClient(new JobConf(conf));
-
- MROperPlan mrp = comp.getMRPlan();
- if(mrp.getLeaves().get(0)!=mrp.getRoots().get(0))
- throw new PlanException("Unsupported configuration to test combine plan");
-
- MapReduceOper mro = mrp.getLeaves().get(0);
- mro.combinePlan = combinePlan;
-
- JobControlCompiler jcc = new JobControlCompiler();
-
- JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-
- int numMRJobs = jc.getWaitingJobs().size();
-
- new Thread(jc).start();
-
- double lastProg = -1;
- while (!jc.allFinished()) {
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- }
- double prog = calculateProgress(jc, jobClient) / numMRJobs;
- if (prog > lastProg)
- log.info((int)(prog * 100) + "% complete");
- lastProg = prog;
- }
- lastProg = calculateProgress(jc, jobClient) / numMRJobs;
- if (isComplete(lastProg))
- log.info("Completed Successfully");
- else {
- log.info("Unsuccessful attempt. Completed " + lastProg * 100
- + "% of the job");
- List<Job> failedJobs = jc.getFailedJobs();
- if (failedJobs == null)
- throw new ExecException(
- "Something terribly wrong with Job Control.");
- for (Job job : failedJobs) {
- getStats(job, jobClient, true);
- }
- }
- List<Job> succJobs = jc.getSuccessfulJobs();
- if (succJobs != null)
- for (Job job : succJobs) {
- getStats(job, jobClient, false);
- }
-
- jc.stop();
-
- return isComplete(lastProg);
- }
-}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Mar 5 01:04:06 2009
@@ -302,7 +302,6 @@
private POStore getStore(){
POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
- st.setPc(pigContext);
return st;
}
@@ -577,14 +576,6 @@
}
}
- /*private void addUDFs(PhysicalPlan plan) throws VisitorException{
- if(plan!=null){
- udfFinderForExpr.setPlan(plan);
- udfFinderForExpr.visit();
- curMROp.UDFs.addAll(udfFinderForExpr.getUDFs());
- }
- }*/
-
private void addUDFs(PhysicalPlan plan) throws VisitorException{
if(plan!=null){
udfFinder.setPlan(plan);
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Mar 5 01:04:06 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
+import java.util.LinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,7 +59,7 @@
IOException,
ExecException,
JobCreationException {
- long sleepTime = 5000;
+ long sleepTime = 500;
MROperPlan mrp = compile(php, pc);
ExecutionEngine exe = pc.getExecutionEngine();
@@ -66,30 +67,38 @@
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
- JobControlCompiler jcc = new JobControlCompiler();
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
- JobControl jc = jcc.compile(mrp, grpName, conf, pc);
-
- int numMRJobs = jc.getWaitingJobs().size();
-
- new Thread(jc).start();
-
+ List<Job> failedJobs = new LinkedList<Job>();
+ List<Job> succJobs = new LinkedList<Job>();
+ JobControl jc;
+ int numMRJobs = mrp.size();
double lastProg = -1;
- int perCom = 0;
- while(!jc.allFinished()){
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {}
- double prog = calculateProgress(jc, jobClient)/numMRJobs;
- if(prog>=(lastProg+0.01)){
- perCom = (int)(prog * 100);
- if(perCom!=100)
- log.info( perCom + "% complete");
+
+ while((jc = jcc.compile(mrp, grpName)) != null) {
+ numMRJobs += jc.getWaitingJobs().size();
+
+ new Thread(jc).start();
+
+ while(!jc.allFinished()){
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ double prog = calculateProgress(jc, jobClient)/numMRJobs;
+ if(prog>=(lastProg+0.01)){
+ int perCom = (int)(prog * 100);
+ if(perCom!=100)
+ log.info( perCom + "% complete");
+ }
+ lastProg = prog;
}
- lastProg = prog;
+ failedJobs.addAll(jc.getFailedJobs());
+ succJobs.addAll(jc.getSuccessfulJobs());
+ jcc.moveResults();
+ jc.stop();
}
+
// Look to see if any jobs failed. If so, we need to report that.
- List<Job> failedJobs = jc.getFailedJobs();
if (failedJobs != null && failedJobs.size() > 0) {
log.error("Map reduce job failed");
for (Job fj : failedJobs) {
@@ -100,13 +109,12 @@
return false;
}
- List<Job> succJobs = jc.getSuccessfulJobs();
- if(succJobs!=null)
+ if(succJobs!=null) {
for(Job job : succJobs){
getStats(job,jobClient, false);
}
+ }
- jc.stop();
log.info( "100% complete");
log.info("Success!");
return true;
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Thu Mar 5 01:04:06 2009
@@ -39,8 +39,11 @@
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
@@ -54,6 +57,10 @@
//Map Plan
protected PhysicalPlan mp;
+
+ // Store operators
+ protected List<POStore> stores;
+
protected TupleFactory tf = TupleFactory.getInstance();
OutputCollector<PigNullableWritable, Writable> outputCollector;
@@ -77,7 +84,7 @@
@Override
public void close() throws IOException {
super.close();
- PhysicalOperator.setReporter(null);
+
if(errorInMap) {
//error in map - returning
return;
@@ -100,7 +107,19 @@
}
}
mp = null;
-
+
+ for (POStore store: stores) {
+ if (!initialized) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ store.tearDown();
+ }
+
+ PhysicalOperator.setReporter(null);
+ initialized = false;
}
/**
@@ -113,8 +132,9 @@
SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
PigMapReduce.sJobConf = job;
try {
- mp = (PhysicalPlan) ObjectSerializer.deserialize(job
- .get("pig.mapPlan"));
+ mp = (PhysicalPlan) ObjectSerializer.deserialize(
+ job.get("pig.mapPlan"));
+ stores = PlanHelper.getStores(mp);
// To be removed
if(mp.isEmpty())
@@ -166,6 +186,13 @@
this.outputCollector = oc;
pigReporter.setRep(reporter);
PhysicalOperator.setReporter(pigReporter);
+ for (POStore store: stores) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+ impl.setReporter(reporter);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
}
if(mp.isEmpty()){
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Mar 5 01:04:06 2009
@@ -42,6 +42,8 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.DataType;
import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
@@ -140,6 +142,9 @@
//The reduce plan
protected PhysicalPlan rp;
+
+ // Store operators
+ protected List<POStore> stores;
//The POPackage operator which is the
//root of every Map Reduce plan is
@@ -155,7 +160,11 @@
protected boolean errorInReduce = false;
PhysicalOperator[] roots;
+
private PhysicalOperator leaf;
+
+ protected boolean initialized = false;
+
/**
* Configures the Reduce plan, the POPackage operator
* and the reporter thread
@@ -168,6 +177,8 @@
try {
rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
.get("pig.reducePlan"));
+ stores = PlanHelper.getStores(rp);
+
pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
// To be removed
if(rp.isEmpty())
@@ -203,11 +214,23 @@
OutputCollector<PigNullableWritable, Writable> oc,
Reporter reporter) throws IOException {
- // cache the collector for use in runPipeline()
- // which could additionally be called from close()
- this.outputCollector = oc;
- pigReporter.setRep(reporter);
- PhysicalOperator.setReporter(pigReporter);
+ if (!initialized) {
+ initialized = true;
+
+ // cache the collector for use in runPipeline()
+ // which could additionally be called from close()
+ this.outputCollector = oc;
+ pigReporter.setRep(reporter);
+ PhysicalOperator.setReporter(pigReporter);
+
+ for (POStore store: stores) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+ impl.setReporter(reporter);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ }
// In the case we optimize the join, we combine
// POPackage and POForeach - so we could get many
@@ -322,9 +345,6 @@
@Override
public void close() throws IOException {
super.close();
- /*if(runnableReporter!=null)
- runnableReporter.setDone(true);*/
- PhysicalOperator.setReporter(null);
if(errorInReduce) {
// there was an error in reduce - just return
@@ -347,6 +367,19 @@
throw ioe;
}
}
+
+ for (POStore store: stores) {
+ if (!initialized) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ store.tearDown();
+ }
+
+ PhysicalOperator.setReporter(null);
+ initialized = false;
}
}
@@ -384,11 +417,23 @@
OutputCollector<PigNullableWritable, Writable> oc,
Reporter reporter) throws IOException {
- // cache the collector for use in runPipeline()
- // which could additionally be called from close()
- this.outputCollector = oc;
- pigReporter.setRep(reporter);
- PhysicalOperator.setReporter(pigReporter);
+ if (!initialized) {
+ initialized = true;
+
+ // cache the collector for use in runPipeline()
+ // which could additionally be called from close()
+ this.outputCollector = oc;
+ pigReporter.setRep(reporter);
+ PhysicalOperator.setReporter(pigReporter);
+
+ for (POStore store: stores) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(PigMapReduce.sJobConf);
+ impl.setReporter(reporter);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ }
// If the keyType is not a tuple, the MapWithComparator.collect()
// would have wrapped the key into a tuple so that the
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Mar 5 01:04:06 2009
@@ -1169,7 +1169,6 @@
POStore store = new POStore(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)));
store.setSFile(loStore.getOutputFile());
- store.setPc(pc);
currentPlan.add(store);
List<LogicalOperator> op = loStore.getPlan().getPredecessors(loStore);
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Thu Mar 5 01:04:06 2009
@@ -45,20 +45,13 @@
*
*/
public class POStore extends PhysicalOperator {
- /**
- *
- */
+
private static final long serialVersionUID = 1L;
- // The user defined load function or a default load function
- private StoreFunc storer;
- // The filespec on which the operator is based
- FileSpec sFile;
- // The stream used to bind to by the loader
- OutputStream os;
- // PigContext passed to us by the operator creator
- PigContext pc;
-
+ private static Result empty = new Result(POStatus.STATUS_NULL, null);
+ private StoreFunc storer;
private final Log log = LogFactory.getLog(getClass());
+ private POStoreImpl impl;
+ private FileSpec sFile;
public POStore(OperatorKey k) {
this(k, -1, null);
@@ -73,87 +66,56 @@
}
/**
- * Set up the storer by
- * 1) Instantiating the store func
- * 2) Opening an output stream to the specified file and
- * 3) Binding to the output stream
+ * Set up the storer
* @throws IOException
*/
- private void setUp() throws IOException{
- storer = (StoreFunc)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
- os = FileLocalizer.create(sFile.getFileName(), pc);
- storer.bindTo(os);
+ public void setUp() throws IOException{
+ if (impl != null) {
+ storer = impl.createStoreFunc(sFile);
+ }
}
/**
- * At the end of processing, the outputstream is closed
- * using this method
+ * Called at the end of processing for clean up.
* @throws IOException
*/
- private void tearDown() throws IOException{
- os.close();
- }
+ public void tearDown() throws IOException{
+ if (impl != null) {
+ impl.tearDown();
+ }
+ }
/**
* To perform cleanup when there is an error.
- * Uses the FileLocalizer method which only
- * deletes the file but not the dirs created
- * with it.
* @throws IOException
*/
- private void cleanUp() throws IOException{
- String fName = sFile.getFileName();
- os.flush();
- if(FileLocalizer.fileExists(fName,pc))
- FileLocalizer.delete(fName,pc);
+ public void cleanUp() throws IOException{
+ if (impl != null) {
+ impl.cleanUp();
+ }
}
- /**
- * The main method used by the local execution engine
- * to store tuples into the specified file using the
- * specified store function. One call to this method
- * retrieves all tuples from its predecessor operator
- * and stores it into the file till it recieves an EOP.
- *
- * If there is an error, the cleanUp routine is called
- * and then the tearDown is called to close the OutputStream
- *
- * @return Whatever the predecessor returns
- * A null from the predecessor is ignored
- * and processing of further tuples continued
- */
- public Result store() throws ExecException{
- try{
- setUp();
- }catch (IOException e) {
- ExecException ee = new ExecException("Unable to setup the storer because of the exception: " + e.getMessage());
- ee.initCause(e);
- throw ee;
- }
- try{
- Result res;
- Tuple inpValue = null;
- while(true){
- res = processInput();
- if(res.returnStatus==POStatus.STATUS_OK)
- storer.putNext((Tuple)res.result);
- else if(res.returnStatus==POStatus.STATUS_NULL)
- continue;
- else
- break;
- }
- if(res.returnStatus==POStatus.STATUS_EOP){
- storer.finish();
- }
- else{
- cleanUp();
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = processInput();
+ try {
+ switch (res.returnStatus) {
+ case POStatus.STATUS_OK:
+ storer.putNext((Tuple)res.result);
+ res = empty;
+ break;
+ case POStatus.STATUS_EOP:
+ break;
+ case POStatus.STATUS_ERR:
+ case POStatus.STATUS_NULL:
+ default:
+ break;
}
- tearDown();
- return res;
- }catch(IOException e){
- log.error("Received error from storer function: " + e);
- return new Result();
+ } catch (IOException ioe) {
+ log.error("Received error from storer function: " + ioe);
+ throw new ExecException(ioe);
}
+ return res;
}
@Override
@@ -174,12 +136,6 @@
return false;
}
- public StoreFunc getStorer() {
- return storer;
- }
-
-
-
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitStore(this);
@@ -189,16 +145,11 @@
return sFile;
}
- public void setSFile(FileSpec file) {
- sFile = file;
+ public void setSFile(FileSpec sFile) {
+ this.sFile = sFile;
}
- public PigContext getPc() {
- return pc;
+ public void setStoreImpl(POStoreImpl impl) {
+ this.impl = impl;
}
-
- public void setPc(PigContext pc) {
- this.pc = pc;
- }
-
}
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Mar 5 01:04:06 2009
@@ -49,7 +49,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalLauncher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -151,7 +150,6 @@
String scope = leaf.getOperatorKey().getScope();
POStore str = new POStore(new OperatorKey(scope,
NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- str.setPc(pigContext);
spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
pigContext).toString(), new FuncSpec(BinStorage.class
.getName()));
@@ -161,7 +159,6 @@
spec = ((POStore) leaf).getSFile();
}
- // LocalLauncher launcher = new LocalLauncher();
LocalPigLauncher launcher = new LocalPigLauncher();
boolean success = launcher.launchPig(plan, jobName, pigContext);
if (success)
@@ -189,7 +186,6 @@
try {
ExecTools.checkLeafIsStore(plan, pigContext);
- // LocalLauncher launcher = new LocalLauncher();
LocalPigLauncher launcher = new LocalPigLauncher();
launcher.explain(plan, pigContext, stream,
format, isVerbose);
Modified: hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (original)
+++ hadoop/pig/branches/multiquery/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Thu Mar 5 01:04:06 2009
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;
+import java.util.BitSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,13 +33,16 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
public class LocalPigLauncher extends Launcher {
-
- Log log = LogFactory.getLog(getClass());
+ private static final Tuple DUMMYTUPLE = null;
+ private Log log = LogFactory.getLog(getClass());
+ List<POStore> stores;
@Override
public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps,
@@ -53,17 +57,17 @@
throws PlanException, VisitorException, IOException, ExecException,
JobCreationException {
// TODO Auto-generated method stub
- List<PhysicalOperator> stores = php.getLeaves();
+ stores = PlanHelper.getStores(php);
int noJobs = stores.size();
int failedJobs = 0;
- for (PhysicalOperator op : stores) {
- POStore store = (POStore) op;
- Result res = store.store();
- if (res.returnStatus != POStatus.STATUS_EOP)
- failedJobs++;
+ for (POStore op : stores) {
+ op.setStoreImpl(new LocalPOStoreImpl(pc));
+ op.setUp();
}
+ failedJobs = runPipeline(stores.toArray(new POStore[0]));
+
if (failedJobs == 0) {
log.info("100% complete!");
log.info("Success!!");
@@ -76,4 +80,36 @@
}
+ private int runPipeline(POStore[] leaves) throws IOException, ExecException {
+ BitSet bs = new BitSet(leaves.length);
+ int failed = 0;
+ while(true) {
+ if (bs.cardinality() == leaves.length) {
+ break;
+ }
+ for(int i=bs.nextClearBit(0); i<leaves.length; i=bs.nextClearBit(i+1)) {
+ Result res = leaves[i].getNext(DUMMYTUPLE);
+ switch(res.returnStatus) {
+ case POStatus.STATUS_NULL:
+ // good null from store means keep at it.
+ continue;
+ case POStatus.STATUS_OK:
+ // ok shouldn't happen store should have consumed it.
+ // fallthrough
+ case POStatus.STATUS_ERR:
+ leaves[i].cleanUp();
+ leaves[i].tearDown();
+ failed++;
+ // fallthrough
+ case POStatus.STATUS_EOP:
+ leaves[i].tearDown();
+ // fallthrough
+ default:
+ bs.set(i);
+ break;
+ }
+ }
+ }
+ return failed;
+ }
}
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestJobSubmission.java Thu Mar 5 01:04:06 2009
@@ -468,11 +468,11 @@
ExecutionEngine exe = pc.getExecutionEngine();
ConfigurationValidator.validatePigProperties(exe.getConfiguration());
Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
- JobControlCompiler jcc = new JobControlCompiler();
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf);
try {
- jcc.compile(mrPlan, "Test", conf, pc);
+ jcc.compile(mrPlan, "Test");
} catch (JobCreationException jce) {
- assertTrue(jce.getErrorCode() == 1068);
+ assertTrue(jce.getErrorCode() == 1068);
}
}
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestLocalJobSubmission.java Thu Mar 5 01:04:06 2009
@@ -32,7 +32,9 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LocalLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -130,7 +132,7 @@
rmrf(outDir);
}
- private void generateInput(int numTuples) throws ExecException{
+ private void generateInput(int numTuples) throws Exception{
DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
@@ -146,13 +148,18 @@
inps.add(proj);
POStore str = new POStore(new OperatorKey("", r.nextLong()));
- str.setInputs(inps);
FileSpec fSpec = new FileSpec(ldFile, new FuncSpec(PigStorage.class.getName()));
str.setSFile(fSpec);
- str.setPc(pc);
- str.store();
+ str.setStoreImpl(new LocalPOStoreImpl(pc));
+
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(proj);
+ pp.add(str);
+ pp.connect(proj,str);
+
+ new LocalPigLauncher().launchPig(pp, "TestLocalJobSubmission", pc);
}
/*private void setUp1(boolean gen) throws Exception {
@@ -389,7 +396,6 @@
POStore st = new POStore(new OperatorKey("", r.nextLong()));
ld.setPc(pc);
ld.setLFile(LFSpec);
- st.setPc(pc);
st.setSFile(SFSpec);
Tuple sample = new DefaultTuple();
@@ -449,7 +455,7 @@
private void submit() throws Exception{
assertEquals(true, FileLocalizer.fileExists(ldFile, pc));
- LocalLauncher ll = new LocalLauncher();
+ MapReduceLauncher ll = new MapReduceLauncher();
ll.launchPig(php, grpName, pc);
}
}
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestMultiQuery.java Thu Mar 5 01:04:06 2009
@@ -72,7 +72,7 @@
myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("store b into '/tmp/output1';");
myPig.registerQuery("c = group b by gid;");
myPig.registerQuery("store c into '/tmp/output2';");
@@ -103,7 +103,7 @@
myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("store b into '/tmp/output1';");
myPig.registerQuery("c = group b by gid;");
myPig.registerQuery("store c into '/tmp/output2';");
@@ -128,11 +128,11 @@
myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("store b into '/tmp/output1';");
- myPig.registerQuery("c = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output2';");
- myPig.registerQuery("d = filter c by uid > 1500;");
+ myPig.registerQuery("d = filter c by uid > 15;");
myPig.registerQuery("store d into '/tmp/output3';");
LogicalPlan lp = checkLogicalPlan(1, 3, 14);
@@ -161,11 +161,11 @@
myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("store b into '/tmp/output1';");
- myPig.registerQuery("c = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output2';");
- myPig.registerQuery("d = filter c by uid > 1500;");
+ myPig.registerQuery("d = filter c by uid > 15;");
myPig.registerQuery("store d into '/tmp/output3';");
myPig.executeBatch();
@@ -190,8 +190,8 @@
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("c = filter a by uid > 500;");
- myPig.registerQuery("d = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter a by uid > 5;");
+ myPig.registerQuery("d = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output1';");
myPig.registerQuery("store d into '/tmp/output2';");
myPig.registerQuery("e = cogroup c by uid, d by uid;");
@@ -225,8 +225,8 @@
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
myPig.registerQuery("b = load 'file:test/org/apache/pig/test/data/passwd2' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("c = filter a by uid > 500;");
- myPig.registerQuery("d = filter b by uid > 1000;");
+ myPig.registerQuery("c = filter a by uid > 5;");
+ myPig.registerQuery("d = filter b by uid > 10;");
myPig.registerQuery("store c into '/tmp/output1';");
myPig.registerQuery("store d into '/tmp/output2';");
myPig.registerQuery("e = cogroup c by uid, d by uid;");
@@ -252,7 +252,7 @@
myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("group b by gid;");
LogicalPlan lp = checkLogicalPlan(0, 0, 0);
@@ -279,7 +279,7 @@
myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
- myPig.registerQuery("b = filter a by uid > 500;");
+ myPig.registerQuery("b = filter a by uid > 5;");
myPig.registerQuery("group b by gid;");
myPig.executeBatch();
@@ -299,7 +299,7 @@
try {
String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "explain b;"
+ "store b into '/tmp/output1';\n";
@@ -324,7 +324,7 @@
try {
String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "dump b;"
+ "store b into '/tmp/output1';\n";
@@ -349,7 +349,7 @@
try {
String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "describe b;"
+ "store b into '/tmp/output1';\n";
@@ -374,7 +374,7 @@
try {
String script = "a = load 'file:test/org/apache/pig/test/data/passwd' "
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);"
- + "b = filter a by uid > 500;"
+ + "b = filter a by uid > 5;"
+ "illustrate b;"
+ "store b into '/tmp/output1';\n";
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/TestStore.java Thu Mar 5 01:04:06 2009
@@ -39,6 +39,9 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
+import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -67,30 +70,34 @@
st.setSFile(fSpec);
pc = new PigContext();
pc.connect();
- st.setPc(pc);
+ st.setStoreImpl(new LocalPOStoreImpl(pc));
proj = GenPhyOp.exprProject();
proj.setColumn(0);
proj.setResultType(DataType.TUPLE);
proj.setOverloaded(true);
List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
- inps.add(proj);
- st.setInputs(inps);
-
}
@After
public void tearDown() throws Exception {
}
+ private boolean store() throws Exception {
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(proj);
+ pp.add(st);
+ pp.connect(proj, st);
+ return new LocalPigLauncher().launchPig(pp, "TestStore", pc);
+ }
+
@Test
- public void testStore() throws ExecException, IOException {
+ public void testStore() throws Exception {
inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- Result res = st.store();
- assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ assertTrue(store());
int size = 0;
BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
@@ -112,13 +119,12 @@
}
@Test
- public void testStoreComplexData() throws ExecException, IOException {
+ public void testStoreComplexData() throws Exception {
inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- Result res = st.store();
- assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ assertTrue(store());
PigStorage ps = new PigStorage(":");
int size = 0;
@@ -144,15 +150,14 @@
}
@Test
- public void testStoreComplexDataWithNull() throws ExecException, IOException {
+ public void testStoreComplexDataWithNull() throws Exception {
Tuple inputTuple = GenRandomData.genRandSmallBagTextTupleWithNulls(new Random(), 10, 100);
inpDB = DefaultBagFactory.getInstance().newDefaultBag();
inpDB.add(inputTuple);
Tuple t = new DefaultTuple();
t.append(inpDB);
proj.attachInput(t);
- Result res = st.store();
- assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ assertTrue(store());
PigStorage ps = new PigStorage(":");
int size = 0;
Modified: hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java?rev=750264&r1=750263&r2=750264&view=diff
==============================================================================
--- hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/branches/multiquery/test/org/apache/pig/test/utils/GenPhyOp.java Thu Mar 5 01:04:06 2009
@@ -749,7 +749,6 @@
public static POStore topStoreOp() {
POStore ret = new POStore(new OperatorKey("", r.nextLong()));
- ret.setPc(pc);
ret.setSFile(new FileSpec("DummyFil", new FuncSpec("DummyLdr")));
return ret;
}