You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/16 03:09:32 UTC
svn commit: r656913 [1/3] - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/mapReduceLayer...
Author: gates
Date: Thu May 15 18:09:30 2008
New Revision: 656913
URL: http://svn.apache.org/viewvc?rev=656913&view=rev
Log:
PIG-162 Shubham's addition MR launcher code.
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
Removed:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC5.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
incubator/pig/branches/types/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu May 15 18:09:30 2008
@@ -146,7 +146,8 @@
**/test/TestPODistinct.java, **/test/TestPOSort.java,
**/test/TestSchema.java, **/test/TestLogicalPlanBuilder.java,**/test/TestUnion.java, **/test/TestMRCompiler.java,
**/test/FakeFSInputStream.java, **/test/Util.java, **/test/TestJobSubmission.java,
- **/test/TestLocalJobSubmission.java, **/test/TestPOMapLookUp.java,
+ **/test/TestLocalJobSubmission.java, **/test/TestPOMapLookUp.java,
+ **/test/TestPOBinCond.java, **/test/TestPONegative.java, **/pig/impl/builtin/GFCross.java,
**/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
**/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
**/physicalLayer/topLevelOperators/**/*.java, **/physicalLayer/plans/*.java,
@@ -277,7 +278,8 @@
<include name="**/TestLogicalPlanBuilder.java" />
<include name="**/TestLocalJobSubmission.java" />
<include name="**/TestPOMapLookUp.java" />
-
+ <include name="**/TestPOBinCond.java" />
+ <include name="**/TestPONegative.java" />
<!--
<include name="**/*Test*.java" />
<exclude name="**/TestLargeFile.java" />
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Thu May 15 18:09:30 2008
@@ -57,11 +57,11 @@
import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SortPartitioner;
import org.apache.pig.backend.hadoop.datastorage.HFile;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.mapReduceLayer.SortPartitioner;
// compiler for mapreduce physical plans
public class MapreducePlanCompiler {
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu May 15 18:09:30 2008
@@ -1,4 +1,5 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -46,6 +47,7 @@
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.eval.EvalSpec;
import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.mapReduceLayer.SortPartitioner;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.WrappedIOException;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu May 15 18:09:30 2008
@@ -21,6 +21,7 @@
import java.util.Iterator;
import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -36,8 +37,16 @@
@Override
public DataBag exec(Tuple input) throws IOException {
- Integer numQuantiles = (Integer)input.get(0);
- DataBag samples = (DataBag)input.get(1);
+ Integer numQuantiles = null;
+ DataBag samples = null;
+ try{
+ numQuantiles = (Integer)input.get(0);
+ samples = (DataBag)input.get(1);
+ }catch(ExecException e){
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
DataBag output = mBagFactory.newDefaultBag();
long numSamples = samples.size();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java Thu May 15 18:09:30 2008
@@ -21,6 +21,7 @@
import java.util.Random;
import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -35,33 +36,40 @@
public static int DEFAULT_PARALLELISM = 96;
@Override
- public DataBag exec(Tuple input) throws IOException {;
- numInputs = (Integer)input.get(0);
- myNumber = (Integer)input.get(1);
- DataBag output = mBagFactory.newDefaultBag();
+ public DataBag exec(Tuple input) throws IOException {
+ try{
+ numInputs = (Integer)input.get(0);
+ myNumber = (Integer)input.get(1);
- numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
- int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+ DataBag output = mBagFactory.newDefaultBag();
- int[] digits = new int[numInputs];
- for (int i=0; i<numInputs; i++){
- if (i == myNumber){
- Random r = new Random(System.currentTimeMillis());
- digits[i] = r.nextInt(numGroupsPerInput);
- }else{
- digits[i] = 0;
+ numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
+ int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+
+ int[] digits = new int[numInputs];
+ for (int i=0; i<numInputs; i++){
+ if (i == myNumber){
+ Random r = new Random(System.currentTimeMillis());
+ digits[i] = r.nextInt(numGroupsPerInput);
+ }else{
+ digits[i] = 0;
+ }
}
+
+ for (int i=0; i<numGroupsGoingTo; i++){
+ output.add(toTuple(digits));
+ next(digits);
+ }
+
+ return output;
+ }catch(ExecException e){
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
}
-
- for (int i=0; i<numGroupsGoingTo; i++){
- output.add(toTuple(digits));
- next(digits);
- }
-
- return output;
}
- private Tuple toTuple(int[] digits) throws IOException{
+ private Tuple toTuple(int[] digits) throws IOException, ExecException{
Tuple t = mTupleFactory.newTuple(numInputs);
for (int i=0; i<numInputs; i++){
t.set(i, digits[i]);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Thu May 15 18:09:30 2008
@@ -242,12 +242,18 @@
mro.reducePlan.remove(pack);
jobConf.setMapperClass(PigMapReduce.Map.class);
jobConf.setReducerClass(PigMapReduce.Reduce.class);
+ jobConf.setNumReduceTasks((mro.requestedParallelism>0)?mro.requestedParallelism:1);
jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
jobConf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
jobConf.setOutputKeyClass(DataType.getWritableComparableTypes(pack.getKeyType()).getClass());
jobConf.setOutputValueClass(IndexedTuple.class);
}
+
+ if(mro.isGlobalSort()){
+ jobConf.set("pig.quantilesFile", mro.getQuantFile());
+ jobConf.setPartitionerClass(SortPartitioner.class);
+ }
return jobConf;
}catch(Exception e){
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Thu May 15 18:09:30 2008
@@ -0,0 +1,162 @@
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class Launcher {
+ private static final Log log = LogFactory.getLog(Launcher.class);
+
+ protected Launcher(){
+
+ }
+ /**
+ * Method to launch pig for hadoop either for a cluster's
+ * job tracker or for a local job runner. THe only difference
+ * between the two is the job client. Depending on the pig context
+ * the job client will be initialize to one of the two.
+ * Launchers for other frameworks can overide these methods.
+ * Given an input PhysicalPlan, it compiles it
+ * to get a MapReduce Plan. The MapReduce plan which
+ * has multiple MapReduce operators each one of which
+ * has to be run as a map reduce job with dependency
+ * information stored in the plan. It compiles the
+ * MROperPlan into a JobControl object. Each Map Reduce
+ * operator is converted into a Job and added to the JobControl
+ * object. Each Job also has a set of dependent Jobs that
+ * are created using the MROperPlan.
+ * The JobControl object is obtained from the JobControlCompiler
+ * Then a new thread is spawned that submits these jobs
+ * while respecting the dependency information.
+ * The parent thread monitors the submitted jobs' progress and
+ * after it is complete, stops the JobControl thread.
+ * @param php
+ * @param grpName
+ * @param pc
+ * @throws PlanException
+ * @throws VisitorException
+ * @throws IOException
+ * @throws ExecException
+ * @throws JobCreationException
+ */
+ protected void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc)
+ throws PlanException, VisitorException, IOException, ExecException,
+ JobCreationException {
+ long sleepTime = 500;
+ MRCompiler comp = new MRCompiler(php, pc);
+ comp.compile();
+
+ ExecutionEngine exe = pc.getExecutionEngine();
+ Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
+ JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
+
+ MROperPlan mrp = comp.getMRPlan();
+ JobControlCompiler jcc = new JobControlCompiler();
+
+ JobControl jc = jcc.compile(mrp, grpName, conf, pc);
+
+ new Thread(jc).start();
+
+ int numMRJobs = jc.getWaitingJobs().size();
+ double lastProg = -1;
+ while(!jc.allFinished()){
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {}
+ double prog = calculateProgress(jc, jobClient)/numMRJobs;
+ if(prog>lastProg)
+ log.info(prog * 100 + "% complete");
+ lastProg = prog;
+ }
+ lastProg = calculateProgress(jc, jobClient)/numMRJobs;
+ if(lastProg==1.0)
+ log.info("Completed Successfully");
+ else{
+ log.info("Unsuccessful attempt. Completed " + lastProg * 100 + "% of the job");
+ List<Job> failedJobs = jc.getFailedJobs();
+ for (Job job : failedJobs) {
+ String MRJobID = job.getMapredJobID();
+ getErrorMessages(jobClient.getMapTaskReports(MRJobID), "map");
+ getErrorMessages(jobClient.getReduceTaskReports(MRJobID), "reduce");
+ }
+ }
+
+ jc.stop();
+
+ }
+
+ protected static void getErrorMessages(TaskReport reports[], String type)
+ {
+ for (int i = 0; i < reports.length; i++) {
+ String msgs[] = reports[i].getDiagnostics();
+ StringBuilder sb = new StringBuilder("Error message from task (" + type + ") " +
+ reports[i].getTaskId());
+ for (int j = 0; j < msgs.length; j++) {
+ sb.append(" " + msgs[j]);
+ }
+ log.error(sb.toString());
+ }
+ }
+
+ /**
+ * Compute the progress of the current job submitted
+ * through the JobControl object jc to the JobClient jobClient
+ * @param jc - The JobControl object that has been submitted
+ * @param jobClient - The JobClient to which it has been submitted
+ * @return The progress as a precentage in double format
+ * @throws IOException
+ */
+ protected static double calculateProgress(JobControl jc, JobClient jobClient) throws IOException{
+ double prog = 0.0;
+ prog += jc.getSuccessfulJobs().size();
+
+ List runnJobs = jc.getRunningJobs();
+ for (Object object : runnJobs) {
+ Job j = (Job)object;
+ prog += progressOfRunningJob(j, jobClient);
+ }
+ return prog;
+ }
+
+ /**
+ * Returns the progress of a Job j which is part of a submitted
+ * JobControl object. The progress is for this Job. So it has to
+ * be scaled down by the num of jobs that are present in the
+ * JobControl.
+ * @param j - The Job for which progress is required
+ * @param jobClient - the JobClient to which it has been submitted
+ * @return Returns the percentage progress of this Job
+ * @throws IOException
+ */
+ protected static double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{
+ String mrJobID = j.getMapredJobID();
+ RunningJob rj = jobClient.getJob(mrJobID);
+ if(rj==null && j.getState()==Job.SUCCESS)
+ return 1;
+ else if(rj==null)
+ return 0;
+ else{
+ double mapProg = rj.mapProgress();
+ double redProg = rj.reduceProgress();
+ return (mapProg + redProg)/2;
+ }
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Thu May 15 18:09:30 2008
@@ -0,0 +1,18 @@
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+public class LocalLauncher extends Launcher{
+ @Override
+ public void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc) throws PlanException, VisitorException, IOException, ExecException, JobCreationException {
+ super.launchPig(php, grpName, pc);
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Thu May 15 18:09:30 2008
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -30,25 +31,39 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.mapReduceLayer.plans.UDFFinder;
+import org.apache.pig.impl.mapReduceLayer.plans.UDFFinderForExpr;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.PlanException;
@@ -73,13 +88,19 @@
* being to keep the number of MROpers to a minimum.
*
* It also merges multiple Map jobs, created by compiling
- * the inputs individually, into a single job.
+ * the inputs individually, into a single job. Here a new
+ * map job is created and then the contents of the previous
+ * map plans are added. However, any other state that was in
+ * the previous map plans, should be manually moved over. So,
+ * if you are adding something new take care about this.
+ * Ex of this is in requestedParallelism
*
* Only in case of blocking operators and splits, a new
* MapReduce operator is started using a store-load combination
* to connect the two operators. Whenever this happens
* care is taken to add the MROper into the MRPlan and connect it
* appropriately.
+ *
*
*/
public class MRCompiler extends PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> {
@@ -117,6 +138,10 @@
private Random r;
+ private UDFFinderForExpr udfFinderForExpr;
+
+ private UDFFinder udfFinder;
+
public MRCompiler(PhysicalPlan<PhysicalOperator> plan) {
this(plan,null);
}
@@ -132,6 +157,8 @@
scope = "MRCompiler";
r = new Random(1331);
FileLocalizer.setR(r);
+ udfFinderForExpr = new UDFFinderForExpr();
+ udfFinder = new UDFFinder();
}
/**
@@ -218,6 +245,8 @@
//Now we have the inputs compiled. Do something
//with the input oper op.
op.visit(this);
+ if(op.getRequestedParallelism() > curMROp.requestedParallelism)
+ curMROp.requestedParallelism = op.getRequestedParallelism();
compiledInputs = prevCompInp;
}
@@ -378,21 +407,9 @@
}
}
- /**
- * Used to compile a split operator. The logic is to
- * close the split job by replacing the split oper by
- * a store and creating a new Map MRoper and return
- * that as the current MROper to which other operators
- * would be compiled into. The new MROper would be connected
- * to the split job by load-store. Also add the split oper
- * to the splitsSeen map.
- * @param op
- * @throws IOException
- * @throws PlanException
- */
- private void split(POSplit op) throws PlanException{
+
+ private MapReduceOper endSingleInputPlanWithStr(FileSpec fSpec) throws PlanException{
MapReduceOper mro = compiledInputs[0];
- FileSpec fSpec = op.getSplitStore();
POStore str = getStore();
str.setSFile(fSpec);
if (!mro.isMapDone()) {
@@ -404,8 +421,7 @@
} else {
log.warn("Both map and reduce phases have been done. This is unexpected while compiling!");
}
- splitsSeen.put(op.getOperatorKey(), mro);
- curMROp = startNew(fSpec, mro);
+ return mro;
}
/**
@@ -486,11 +502,15 @@
}
}
merge(ret.get(0).mapPlan, mpLst);
+
Iterator<MapReduceOper> it = toBeConnected.iterator();
while(it.hasNext())
MRPlan.connect(it.next(), mergedMap);
- for(MapReduceOper rmro : remLst)
+ for(MapReduceOper rmro : remLst){
+ if(rmro.requestedParallelism > mergedMap.requestedParallelism)
+ mergedMap.requestedParallelism = rmro.requestedParallelism;
MRPlan.remove(rmro);
+ }
return ret;
}
@@ -508,14 +528,43 @@
finPlan.merge(e);
}
}
+
+ private void addUDFs(ExprPlan plan) throws VisitorException{
+ if(plan!=null){
+ udfFinderForExpr.setPlan(plan);
+ udfFinderForExpr.visit();
+ curMROp.UDFs.addAll(udfFinderForExpr.getUDFs());
+ }
+ }
+
+ private void addUDFs(PhysicalPlan<PhysicalOperator> plan) throws VisitorException{
+ if(plan!=null){
+ udfFinder.setPlan(plan);
+ udfFinder.visit();
+ curMROp.UDFs.addAll(udfFinder.getUDFs());
+ }
+ }
+
+
+ /* The visitOp methods that decide what to do with the current operator */
/**
- * The visitOp methods that decide what to do with the current operator
+ * Compiles a split operator. The logic is to
+ * close the split job by replacing the split oper by
+ * a store and creating a new Map MRoper and return
+ * that as the current MROper to which other operators
+ * would be compiled into. The new MROper would be connected
+ * to the split job by load-store. Also add the split oper
+ * to the splitsSeen map.
+ * @param op - The split operator
+ * @throws VisitorException
*/
-
public void visitSplit(POSplit op) throws VisitorException{
try{
- split(op);
+ FileSpec fSpec = op.getSplitStore();
+ MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+ splitsSeen.put(op.getOperatorKey(), mro);
+ curMROp = startNew(fSpec, mro);
}catch(Exception e){
VisitorException pe = new VisitorException(e.getMessage());
pe.initCause(e);
@@ -546,6 +595,7 @@
public void visitFilter(POFilter op) throws VisitorException{
try{
nonBlocking(op);
+ addUDFs(op.getPlan());
}catch(Exception e){
VisitorException pe = new VisitorException(e.getMessage());
pe.initCause(e);
@@ -556,6 +606,10 @@
public void visitLocalRearrange(POLocalRearrange op) throws VisitorException {
try{
nonBlocking(op);
+ List<ExprPlan> plans = op.getPlans();
+ if(plans!=null)
+ for(ExprPlan ep : plans)
+ addUDFs(ep);
}catch(Exception e){
VisitorException pe = new VisitorException(e.getMessage());
pe.initCause(e);
@@ -566,6 +620,7 @@
public void visitForEach(POForEach op) throws VisitorException{
try{
nonBlocking(op);
+ addUDFs(op.getPlan());
}catch(Exception e){
VisitorException pe = new VisitorException(e.getMessage());
pe.initCause(e);
@@ -602,4 +657,273 @@
throw pe;
}
}
+
+ @Override
+ public void visitSort(POSort op) throws VisitorException {
+ try{
+ FileSpec fSpec = getTempFileSpec();
+ MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
+ FileSpec quantFile = getTempFileSpec();
+ int rp = op.getRequestedParallelism();
+ int[] fields = getSortCols(op);
+ MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
+ curMROp = getSortJob(quant, fSpec, quantFile, rp, fields);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
+ private int[] getSortCols(POSort sort){
+ List<ExprPlan> plans = sort.getSortPlans();
+ if(plans!=null){
+ int[] ret = new int[plans.size()];
+ int i=-1;
+ for (ExprPlan plan : plans) {
+ ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn();
+ }
+ return ret;
+ }
+ return null;
+ }
+
+ public MapReduceOper getSortJob(MapReduceOper quantJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException{
+ MapReduceOper mro = startNew(lFile, quantJob);
+ mro.setQuantFile(quantFile.getFileName());
+ mro.setGlobalSort(true);
+ mro.requestedParallelism = rp;
+
+ List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+
+ if(fields==null)
+ throw new PlanException("No Expression Plan found in POSort");
+ for (int i : fields) {
+ ExprPlan ep = new ExprPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setColumn(i);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BYTEARRAY);
+ ep.add(prj);
+ eps1.add(ep);
+ }
+
+ POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ lr.setIndex(0);
+ lr.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ lr.setPlans(eps1);
+ lr.setResultType(DataType.TUPLE);
+ mro.mapPlan.addAsLeaf(lr);
+
+ mro.setMapDone(true);
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg.setKeyType((fields.length>1) ? DataType.TUPLE : DataType.BYTEARRAY);
+ pkg.setNumInps(1);
+ boolean[] inner = {false};
+ pkg.setInner(inner);
+ mro.reducePlan.add(pkg);
+
+ ExprPlan ep = new ExprPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setColumn(1);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BYTEARRAY);
+ ep.add(prj);
+ List<ExprPlan> eps2 = new ArrayList<ExprPlan>();
+ eps2.add(ep);
+ List<Boolean> flattened = new ArrayList<Boolean>();
+ flattened.add(true);
+ POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps2,flattened);
+ fe1Gen.setResultType(DataType.TUPLE);
+ PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>();
+ fe1Plan.add(fe1Gen);
+ POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ fe1.setPlan(fe1Plan);
+ fe1.setResultType(DataType.TUPLE);
+ mro.reducePlan.add(fe1);
+ mro.reducePlan.connect(pkg, fe1);
+// ep1.add(innGen);
+ return mro;
+ }
+
+ public MapReduceOper getQuantileJob(POSort sort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
+ FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),RandomSampleLoader.class.getName());
+ MapReduceOper mro = startNew(quantLdFilName, prevJob);
+ mro.UDFs.add(FindQuantiles.class.getName());
+ if(sort.isUDFComparatorUsed)
+ mro.UDFs.add(sort.getMSortFunc().getFuncSpec());
+
+ List<ExprPlan> eps1 = new ArrayList<ExprPlan>();
+ List<Boolean> flat1 = new ArrayList<Boolean>();
+
+ if(fields==null)
+ throw new PlanException("No Expression Plan found in POSort");
+ for (int i : fields) {
+ ExprPlan ep = new ExprPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setColumn(i);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BYTEARRAY);
+ ep.add(prj);
+ eps1.add(ep);
+ flat1.add(true);
+ }
+ POGenerate fe1Gen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)),eps1,flat1);
+ fe1Gen.setResultType(DataType.TUPLE);
+ PhysicalPlan<PhysicalOperator> fe1Plan = new PhysicalPlan<PhysicalOperator>();
+ fe1Plan.add(fe1Gen);
+ POForEach fe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ fe1.setPlan(fe1Plan);
+ fe1.setResultType(DataType.TUPLE);
+ mro.mapPlan.addAsLeaf(fe1);
+
+ ExprPlan ep1 = new ExprPlan();
+ ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ ce.setValue("all");
+ ep1.add(ce);
+
+ List<ExprPlan> eps = new ArrayList<ExprPlan>();
+ eps.add(ep1);
+
+ POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ lr.setIndex(0);
+ lr.setKeyType(DataType.CHARARRAY);
+ lr.setPlans(eps);
+ lr.setResultType(DataType.TUPLE);
+ mro.mapPlan.add(lr);
+ mro.mapPlan.connect(fe1, lr);
+
+ mro.setMapDone(true);
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ pkg.setKeyType(DataType.CHARARRAY);
+ pkg.setNumInps(1);
+ boolean[] inner = {false};
+ pkg.setInner(inner);
+ mro.reducePlan.add(pkg);
+
+ PhysicalPlan<PhysicalOperator> fe2Plan = new PhysicalPlan<PhysicalOperator>();
+
+ POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ topPrj.setColumn(1);
+ topPrj.setOverloaded(true);
+ topPrj.setResultType(DataType.TUPLE);
+ fe2Plan.add(topPrj);
+
+ ExprPlan nesSortPlan = new ExprPlan();
+ POProject prjStar2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar2.setResultType(DataType.TUPLE);
+ prjStar2.setStar(true);
+ nesSortPlan.add(prjStar2);
+
+ List<ExprPlan> nesSortPlanLst = new ArrayList<ExprPlan>();
+ nesSortPlanLst.add(nesSortPlan);
+
+ sort.setSortPlans(nesSortPlanLst);
+ fe2Plan.add(sort);
+ fe2Plan.connect(topPrj, sort);
+
+ ExprPlan ep3 = new ExprPlan();
+ POProject prjStar3 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar3.setResultType(DataType.TUPLE);
+ prjStar3.setStar(true);
+ ep3.add(prjStar3);
+
+ ExprPlan rpep = new ExprPlan();
+ ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ rpce.setRequestedParallelism(rp);
+ rpce.setValue(rp<=0?1:rp);
+ rpep.add(rpce);
+
+ List<ExprPlan> genEps = new ArrayList<ExprPlan>();
+ genEps.add(rpep);
+ genEps.add(ep3);
+
+ List<Boolean> flattened2 = new ArrayList<Boolean>();
+ flattened2.add(false);
+ flattened2.add(false);
+ POGenerate nesGen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)), genEps, flattened2);
+ fe2Plan.add(nesGen);
+ fe2Plan.connect(sort, nesGen);
+
+ POForEach fe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ fe2.setPlan(fe2Plan);
+ fe2.setResultType(DataType.TUPLE);
+
+ mro.reducePlan.add(fe2);
+ mro.reducePlan.connect(pkg, fe2);
+
+ ExprPlan ep4 = new ExprPlan();
+ POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prjStar4.setResultType(DataType.TUPLE);
+ prjStar4.setStar(true);
+ ep4.add(prjStar4);
+
+ List ufInps = new ArrayList();
+ ufInps.add(prjStar4);
+ POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, FindQuantiles.class.getName());
+ ep4.add(uf);
+ ep4.connect(prjStar4, uf);
+
+ List<ExprPlan> ep4s = new ArrayList<ExprPlan>();
+ ep4s.add(ep4);
+ List<Boolean> flattened3 = new ArrayList<Boolean>();
+ flattened3.add(false);
+ POGenerate finGen = new POGenerate(new OperatorKey(scope,nig.getNextNodeId(scope)), ep4s, flattened3);
+
+ PhysicalPlan<PhysicalOperator> fe3Plan = new PhysicalPlan<PhysicalOperator>();
+ fe3Plan.add(finGen);
+
+ POForEach fe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ fe3.setPlan(fe3Plan);
+ fe3.setResultType(DataType.TUPLE);
+
+ mro.reducePlan.add(fe3);
+ mro.reducePlan.connect(fe2, fe3);
+
+ POStore str = getStore();
+ str.setSFile(quantFile);
+ mro.reducePlan.add(str);
+ mro.reducePlan.connect(fe3, str);
+
+ mro.setReduceDone(true);
+// mro.requestedParallelism = rp;
+ return mro;
+ }
+
+ public static void main(String[] args) throws PlanException, IOException, ExecException, VisitorException {
+ PigContext pc = new PigContext();
+ pc.connect();
+ MRCompiler comp = new MRCompiler(null, pc);
+ Random r = new Random();
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ MapReduceOper pj = comp.getMROp();
+ POLoad ld = comp.getLoad();
+ pj.mapPlan.add(ld);
+
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, null,
+ sortPlans, mAscCols, null);
+
+ pj.mapPlan.addAsLeaf(sort);
+
+ POStore st = comp.getStore();
+ pj.mapPlan.addAsLeaf(st);
+
+ MRCompiler c1 = new MRCompiler(pj.mapPlan,pc);
+ c1.compile();
+ MROperPlan plan = c1.getMRPlan();
+ for(int i=0;i<3;i++){
+ MapReduceOper job = plan.getLeaves().get(0);
+ System.out.println(job.name());
+ plan.remove(job);
+ }
+ }
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Thu May 15 18:09:30 2008
@@ -0,0 +1,22 @@
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Main class that launches pig for Map Reduce
+ *
+ */
+public class MapReduceLauncher extends Launcher{
+
+ @Override
+ public void launchPig(PhysicalPlan<PhysicalOperator> php, String grpName, PigContext pc) throws PlanException, VisitorException, IOException, ExecException, JobCreationException {
+ super.launchPig(php, grpName, pc);
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceOper.java Thu May 15 18:09:30 2008
@@ -65,11 +65,16 @@
//Indicates if this job is an order by job
boolean globalSort = false;
+ //The quantiles file name if globalSort is true
+ String quantFile;
+
public List<String> UDFs;
NodeIdGenerator nig;
private String scope;
+
+ int requestedParallelism = -1;
public MapReduceOper(OperatorKey k) {
super(k);
@@ -104,7 +109,10 @@
*/
@Override
public String name() {
- StringBuilder sb = new StringBuilder("MapReduce - " + mKey.toString()
+ String udfStr = getUDFsAsStr();
+
+ StringBuilder sb = new StringBuilder("MapReduce" + "(" + requestedParallelism +
+ (udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString()
+ ":\n");
int index = sb.length();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -126,6 +134,18 @@
return sb.toString();
}
+ private String getUDFsAsStr() {
+ StringBuilder sb = new StringBuilder();
+ if(UDFs!=null && UDFs.size()>0){
+ for (String str : UDFs) {
+ sb.append(str.substring(str.lastIndexOf('.')+1));
+ sb.append(',');
+ }
+ sb.deleteCharAt(sb.length()-1);
+ }
+ return sb.toString();
+ }
+
@Override
public boolean supportsMultipleInputs() {
return true;
@@ -145,7 +165,7 @@
return mapDone;
}
- public void setMapDone(boolean mapDone) throws IOException{
+ public void setMapDone(boolean mapDone){
this.mapDone = mapDone;
}
@@ -214,4 +234,12 @@
public void setGlobalSort(boolean globalSort) {
this.globalSort = globalSort;
}
+
+ public String getQuantFile() {
+ return quantFile;
+ }
+
+ public void setQuantFile(String quantFile) {
+ this.quantFile = quantFile;
+ }
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Thu May 15 18:09:30 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.mapReduceLayer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+
+
+public class SortPartitioner implements Partitioner {
+ Tuple[] quantiles;
+ WritableComparator comparator;
+
+ public int getPartition(WritableComparable key, Writable value,
+ int numPartitions) {
+ try{
+ Tuple keyTuple = (Tuple)key;
+ int index = Arrays.binarySearch(quantiles, (Tuple)keyTuple.get(0), comparator);
+ if (index < 0)
+ index = -index-1;
+ return Math.min(index, numPartitions - 1);
+ }catch(ExecException e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void configure(JobConf job) {
+ String quantilesFile = job.get("pig.quantilesFile", "");
+ if (quantilesFile.length() == 0)
+ throw new RuntimeException("Sort paritioner used but no quantiles found");
+
+ try{
+ InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
+ BinStorage loader = new BinStorage();
+ loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+
+ Tuple t;
+ ArrayList<Tuple> quantiles = new ArrayList<Tuple>();
+
+ while(true){
+ t = loader.getNext();
+ if (t==null)
+ break;
+ quantiles.add(t);
+ }
+ this.quantiles = quantiles.toArray(new Tuple[0]);
+ }catch (IOException e){
+ throw new RuntimeException(e);
+ }
+
+ comparator = job.getOutputKeyComparator();
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinder.java Thu May 15 18:09:30 2008
@@ -0,0 +1,72 @@
+package org.apache.pig.impl.mapReduceLayer.plans;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UDFFinder extends PhyPlanVisitor {
+ List<String> UDFs;
+ DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>> dfw;
+ UDFFinderForExpr udfFinderForExpr;
+
+ public UDFFinder(){
+ this(null, null);
+ }
+
+ public UDFFinder(ExprPlan plan,
+ PlanWalker<ExpressionOperator, ExprPlan> walker) {
+ super(plan, walker);
+ UDFs = new ArrayList<String>();
+ dfw = new DepthFirstWalker<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(null);
+ udfFinderForExpr = new UDFFinderForExpr();
+ }
+
+ public List<String> getUDFs() {
+ return UDFs;
+ }
+
+ public void setPlan(PhysicalPlan<PhysicalOperator> plan){
+ mPlan = plan;
+ dfw.setPlan(plan);
+ mCurrentWalker = dfw;
+ UDFs.clear();
+ }
+
+ private void addUDFsIn(ExprPlan ep) throws VisitorException{
+ udfFinderForExpr.setPlan(ep);
+ udfFinderForExpr.visit();
+ UDFs.addAll(udfFinderForExpr.getUDFs());
+ }
+
+ @Override
+ public void visitFilter(POFilter op) throws VisitorException {
+ addUDFsIn(op.getPlan());
+ }
+
+ @Override
+ public void visitGenerate(POGenerate op) throws VisitorException {
+ List<ExprPlan> eps = op.getInputPlans();
+ for (ExprPlan ep : eps) {
+ addUDFsIn(ep);
+ }
+ }
+
+ @Override
+ public void visitSort(POSort op) throws VisitorException {
+ if(op.getMSortFunc()!=null)
+ UDFs.add(op.getMSortFunc().getFuncSpec());
+ }
+
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/UDFFinderForExpr.java Thu May 15 18:09:30 2008
@@ -0,0 +1,44 @@
+package org.apache.pig.impl.mapReduceLayer.plans;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class UDFFinderForExpr extends ExprPlanVisitor {
+ List<String> UDFs;
+ DepthFirstWalker<ExpressionOperator, ExprPlan> dfw;
+
+ public UDFFinderForExpr(){
+ this(null, null);
+ }
+
+ public UDFFinderForExpr(ExprPlan plan,
+ PlanWalker<ExpressionOperator, ExprPlan> walker) {
+ super(plan, walker);
+ UDFs = new ArrayList<String>();
+ dfw = new DepthFirstWalker<ExpressionOperator, ExprPlan>(null);
+ }
+
+ @Override
+ public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+ UDFs.add(userFunc.getFuncSpec());
+ }
+
+ public List<String> getUDFs() {
+ return UDFs;
+ }
+
+ public void setPlan(ExprPlan plan){
+ mPlan = plan;
+ dfw.setPlan(plan);
+ mCurrentWalker = dfw;
+ UDFs.clear();
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/ExprPlanVisitor.java Thu May 15 18:09:30 2008
@@ -25,6 +25,7 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POMapLookUp;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Add;
@@ -109,7 +110,7 @@
public void visitMod(Mod mod) throws VisitorException {
//do nothing
}
-
+
public void visitBinCond(POBinCond binCond) {
// do nothing
@@ -119,6 +120,10 @@
//do nothing
}
+
+ public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+ //do nothing
+ }
public void visitMapLookUp(POMapLookUp mapLookUp) {
// TODO Auto-generated method stub
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Thu May 15 18:09:30 2008
@@ -32,10 +32,10 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanWalker;
@@ -77,11 +77,11 @@
}
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
- pushWalker(mCurrentWalker.spawnChildWalker((P)lr.getPlan()));
- // this causes the current walker (the new one we created)
- // to walk the nested plan
- visit();
- popWalker();
+ List<ExprPlan> inpPlans = lr.getPlans();
+ for (ExprPlan plan : inpPlans) {
+ ExprPlanVisitor epv = new ExprPlanVisitor(plan,new DependencyOrderWalker<ExpressionOperator, ExprPlan>(plan));
+ epv.visit();
+ }
}
public void visitForEach(POForEach fe) throws VisitorException{
@@ -116,24 +116,19 @@
//do nothing
}
- public void visitDistinct(PODistinct distinct) throws VisitorException {
- //do nothing
- }
-
- public void visitRead(PORead read) throws VisitorException {
- //do nothing
- }
+ public void visitDistinct(PODistinct distinct) throws VisitorException {
+ //do nothing
+ }
+
+ public void visitRead(PORead read) throws VisitorException {
+ //do nothing
+ }
- public void visitSort(POSort sort) throws VisitorException {
+ public void visitSort(POSort sort) throws VisitorException {
List<ExprPlan> inpPlans = sort.getSortPlans();
for (ExprPlan plan : inpPlans) {
ExprPlanVisitor epv = new ExprPlanVisitor(plan,new DependencyOrderWalker<ExpressionOperator, ExprPlan>(plan));
epv.visit();
}
- }
-
- public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
- //do nothing
- }
-
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PlanPrinter.java Thu May 15 18:09:30 2008
@@ -150,7 +150,7 @@
}
else if(node instanceof POLocalRearrange){
- sb.append(planString(((POLocalRearrange)node).getPlan()));
+ sb.append(planString(((POLocalRearrange)node).getPlans()));
}
else if(node instanceof POSort){
sb.append(planString(((POSort)node).getSortPlans()));
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java Thu May 15 18:09:30 2008
@@ -146,10 +146,10 @@
res = gen.getNext(t);
- if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ /*if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
return inp;
if(inp.returnStatus == POStatus.STATUS_NULL)
- continue;
+ continue;*/
processingPlan = true;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POGenerate.java Thu May 15 18:09:30 2008
@@ -89,7 +89,22 @@
@Override
public String name() {
- return "POGenerate - " + mKey.toString();
+ String fString = getFlatStr();
+ return "POGenerate" + "(" + fString + ")" + " - " + mKey.toString();
+ }
+
+ private String getFlatStr() {
+ if(isToBeFlattened==null)
+ return "";
+ StringBuilder sb = new StringBuilder();
+ for (Boolean b : isToBeFlattened) {
+ sb.append(b);
+ sb.append(',');
+ }
+ if(sb.length()>0){
+ sb.deleteCharAt(sb.length()-1);
+ }
+ return sb.toString();
}
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java Thu May 15 18:09:30 2008
@@ -17,19 +17,24 @@
*/
package org.apache.pig.impl.physicalLayer.topLevelOperators;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -47,17 +52,12 @@
private Log log = LogFactory.getLog(getClass());
- PhysicalPlan<PhysicalOperator> plan;
+ List<ExprPlan> plans;
+
+ List<ExpressionOperator> leafOps;
// The position of this LR in the package operator
int index;
-
- POGenerate gen;
-
- //Since the plan has a generate, this needs to be maintained
- //as the generate can potentially return multiple tuples for
- //same call.
- private boolean processingPlan = false;
byte keyType;
@@ -76,6 +76,7 @@
public POLocalRearrange(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
index = -1;
+ leafOps = new ArrayList<ExpressionOperator>();
}
@Override
@@ -113,43 +114,18 @@
@Override
public void attachInput(Tuple t) {
super.attachInput(t);
- processingPlan = false;
}
/**
* Calls getNext on the generate operator inside the nested
* physical plan. Converts the generated tuple into the proper
- * format, i.e, (key,{(value)})
+ * format, i.e, (key,indexedTuple(value))
*/
@Override
public Result getNext(Tuple t) throws ExecException {
- Result res = null;
+
Result inp = null;
- //The nested plan is under processing
- //So return tuples that the generate oper
- //returns after converting them to the required
- //format
- if(processingPlan){
- while(true) {
- res = gen.getNext(t);
- if(res.returnStatus==POStatus.STATUS_OK){
- res.result = constructLROutput((Tuple)res.result);
- return res;
- }
- if(res.returnStatus==POStatus.STATUS_ERR)
- return res;
- if(res.returnStatus==POStatus.STATUS_NULL)
- continue;
- if(res.returnStatus==POStatus.STATUS_EOP){
- processingPlan = false;
- break;
- }
- }
- }
- //The nested plan processing is done or is
- //yet to begin. So process the input and start
- //nested plan processing on the input tuple
- //read
+ Result res = null;
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
@@ -157,48 +133,80 @@
if (inp.returnStatus == POStatus.STATUS_NULL)
continue;
- plan.attachInput((Tuple) inp.result);
-
- res = gen.getNext(t);
- if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
- break;
- if(inp.returnStatus == POStatus.STATUS_NULL)
- continue;
-
- processingPlan = true;
-
- res.result = constructLROutput((Tuple)res.result);
+ for (ExprPlan ep : plans) {
+ ep.attachInput((Tuple)inp.result);
+ }
+ List<Result> resLst = new ArrayList<Result>();
+ for (ExpressionOperator op : leafOps){
+
+ switch(op.resultType){
+ case DataType.BAG:
+ res = op.getNext(dummyBag);
+ break;
+ case DataType.BOOLEAN:
+ res = op.getNext(dummyBool);
+ break;
+ case DataType.BYTEARRAY:
+ res = op.getNext(dummyDBA);
+ break;
+ case DataType.CHARARRAY:
+ res = op.getNext(dummyString);
+ break;
+ case DataType.DOUBLE:
+ res = op.getNext(dummyDouble);
+ break;
+ case DataType.FLOAT:
+ res = op.getNext(dummyFloat);
+ break;
+ case DataType.INTEGER:
+ res = op.getNext(dummyInt);
+ break;
+ case DataType.LONG:
+ res = op.getNext(dummyLong);
+ break;
+ case DataType.MAP:
+ res = op.getNext(dummyMap);
+ break;
+ case DataType.TUPLE:
+ res = op.getNext(dummyTuple);
+ break;
+ }
+ if(res.returnStatus!=POStatus.STATUS_OK)
+ return new Result();
+ resLst.add(res);
+ }
+ res.result = constructLROutput(resLst,(Tuple)inp.result);
return res;
}
return inp;
}
- private Tuple constructLROutput(Tuple genOut){
- //Strip the input tuple off its key which
- //will be the first field in the tuple
- Object key = genOut.getAll().remove(0);
+ private Tuple constructLROutput(List<Result> resLst, Tuple value) throws ExecException{
+ //Construct key
+ Object key;
+ if(resLst.size()>1){
+ Tuple t = TupleFactory.getInstance().newTuple(resLst.size());
+ int i=-1;
+ for(Result res : resLst)
+ t.set(++i, res.result);
+ key = t;
+ }
+ else{
+ key = resLst.get(0).result;
+ }
//Create the indexed tuple out of the value
//that is remaining in the input tuple
- IndexedTuple it = new IndexedTuple(genOut, index);
+ IndexedTuple it = new IndexedTuple(value, index);
//Put the key and the indexed tuple
//in a tuple and return
- Tuple outPut = new DefaultTuple();
- outPut.append(key);
- outPut.append(it);
+ Tuple outPut = TupleFactory.getInstance().newTuple(2);
+ outPut.set(0,key);
+ outPut.set(1,it);
return outPut;
}
- public PhysicalPlan<PhysicalOperator> getPlan() {
- return plan;
- }
-
- public void setPlan(PhysicalPlan<PhysicalOperator> plan) {
- this.plan = plan;
- gen = (POGenerate) plan.getLeaves().get(0);
- }
-
public byte getKeyType() {
return keyType;
}
@@ -206,4 +214,16 @@
public void setKeyType(byte keyType) {
this.keyType = keyType;
}
+
+ public List<ExprPlan> getPlans() {
+ return plans;
+ }
+
+ public void setPlans(List<ExprPlan> plans) {
+ this.plans = plans;
+ leafOps.clear();
+ for (ExprPlan plan : plans) {
+ leafOps.add(plan.getLeaves().get(0));
+ }
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java Thu May 15 18:09:30 2008
@@ -35,6 +35,8 @@
import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
import org.apache.pig.impl.plan.VisitorException;
/**
@@ -51,94 +53,94 @@
*/
public class POSort extends PhysicalOperator<PhyPlanVisitor> {
- //private List<Integer> mSortCols;
- private List<ExprPlan> sortPlans;
- private List<Byte> ExprOutputTypes;
- private List<Boolean> mAscCols;
- private POUserComparisonFunc mSortFunc;
- private final Log log = LogFactory.getLog(getClass());
-
- private boolean inputsAccumulated = false;
- public boolean isUDFComparatorUsed = false;
- private DataBag sortedBag;
- transient Iterator<Tuple> it;
-
- public POSort(OperatorKey k, int rp, List inp, List<ExprPlan> sortPlans,
- List<Boolean> mAscCols, POUserFunc mSortFunc) {
- super(k, rp, inp);
- //this.mSortCols = mSortCols;
- this.sortPlans = sortPlans;
- this.mAscCols = mAscCols;
- this.mSortFunc = (POUserComparisonFunc) mSortFunc;
- if (mSortFunc == null) {
- sortedBag = BagFactory.getInstance().newSortedBag(
- new SortComparator());
- ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
-
- for(ExprPlan plan : sortPlans) {
- ExprOutputTypes.add(plan.getLeaves().get(0).resultType);
- }
- } else {
- sortedBag = BagFactory.getInstance().newSortedBag(
- new UDFSortComparator());
- isUDFComparatorUsed = true;
- }
- }
-
- public POSort(OperatorKey k, int rp, List inp) {
- super(k, rp, inp);
-
- }
-
- public POSort(OperatorKey k, int rp) {
- super(k, rp);
-
- }
-
- public POSort(OperatorKey k, List inp) {
- super(k, inp);
-
- }
-
- public POSort(OperatorKey k) {
- super(k);
-
- }
-
- public class SortComparator implements Comparator<Tuple> {
- public int compare(Tuple o1, Tuple o2) {
- int count = 0;
- int ret = 0;
- if(sortPlans == null || sortPlans.size() == 0)
- return 0;
- for(ExprPlan plan : sortPlans) {
- try {
- plan.attachInput(o1);
- Result res1 = getResult(plan, ExprOutputTypes.get(count));
- plan.attachInput(o2);
- Result res2 = getResult(plan, ExprOutputTypes.get(count));
- if(res1.returnStatus != POStatus.STATUS_OK || res2.returnStatus != POStatus.STATUS_OK) {
- log.error("Error processing the input in the expression plan : " + plan.toString());
- } else {
- if(mAscCols.get(count ++))
- ret = DataType.compare(res1.result, res2.result);
- else
- ret = DataType.compare(res2.result, res1.result);
- }
-
- } catch (ExecException e) {
- log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
- }
-
- }
- return ret;
- }
-
- private Result getResult(ExprPlan plan, byte resultType) throws ExecException {
- ExpressionOperator Op = plan.getLeaves().get(0);
- Result res = null;
-
- switch (resultType) {
+ //private List<Integer> mSortCols;
+ private List<ExprPlan> sortPlans;
+ private List<Byte> ExprOutputTypes;
+ private List<Boolean> mAscCols;
+ private POUserComparisonFunc mSortFunc;
+ private final Log log = LogFactory.getLog(getClass());
+
+ private boolean inputsAccumulated = false;
+ public boolean isUDFComparatorUsed = false;
+ private DataBag sortedBag;
+ transient Iterator<Tuple> it;
+
+ public POSort(OperatorKey k, int rp, List inp, List<ExprPlan> sortPlans,
+ List<Boolean> mAscCols, POUserFunc mSortFunc) {
+ super(k, rp, inp);
+ //this.mSortCols = mSortCols;
+ this.sortPlans = sortPlans;
+ this.mAscCols = mAscCols;
+ this.mSortFunc = (POUserComparisonFunc) mSortFunc;
+ if (mSortFunc == null) {
+ sortedBag = BagFactory.getInstance().newSortedBag(
+ new SortComparator());
+ ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
+
+ for(ExprPlan plan : sortPlans) {
+ ExprOutputTypes.add(plan.getLeaves().get(0).resultType);
+ }
+ } else {
+ sortedBag = BagFactory.getInstance().newSortedBag(
+ new UDFSortComparator());
+ isUDFComparatorUsed = true;
+ }
+ }
+
+ public POSort(OperatorKey k, int rp, List inp) {
+ super(k, rp, inp);
+
+ }
+
+ public POSort(OperatorKey k, int rp) {
+ super(k, rp);
+
+ }
+
+ public POSort(OperatorKey k, List inp) {
+ super(k, inp);
+
+ }
+
+ public POSort(OperatorKey k) {
+ super(k);
+
+ }
+
+ public class SortComparator implements Comparator<Tuple> {
+ public int compare(Tuple o1, Tuple o2) {
+ int count = 0;
+ int ret = 0;
+ if(sortPlans == null || sortPlans.size() == 0)
+ return 0;
+ for(ExprPlan plan : sortPlans) {
+ try {
+ plan.attachInput(o1);
+ Result res1 = getResult(plan, ExprOutputTypes.get(count));
+ plan.attachInput(o2);
+ Result res2 = getResult(plan, ExprOutputTypes.get(count));
+ if(res1.returnStatus != POStatus.STATUS_OK || res2.returnStatus != POStatus.STATUS_OK) {
+ log.error("Error processing the input in the expression plan : " + plan.toString());
+ } else {
+ if(mAscCols.get(count ++))
+ ret = DataType.compare(res1.result, res2.result);
+ else
+ ret = DataType.compare(res2.result, res1.result);
+ }
+
+ } catch (ExecException e) {
+ log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
+ }
+
+ }
+ return ret;
+ }
+
+ private Result getResult(ExprPlan plan, byte resultType) throws ExecException {
+ ExpressionOperator Op = plan.getLeaves().get(0);
+ Result res = null;
+
+ switch (resultType) {
case DataType.BYTEARRAY:
res = Op.getNext(dummyDBA);
break;
@@ -158,95 +160,107 @@
res = Op.getNext(dummyLong);
break;
}
- return res;
- }
- }
-
- public class UDFSortComparator implements Comparator<Tuple> {
-
- public int compare(Tuple t1, Tuple t2) {
-
- mSortFunc.attachInput(t1, t2);
- Integer i = null;
- Result res = null;
- try {
- res = mSortFunc.getNext(i);
- } catch (ExecException e) {
-
- log.error("Input not ready. Error on reading from input. "
- + e.getMessage());
- }
- if (res != null)
- return (Integer) res.result;
- else
- return 0;
- }
-
- }
-
- @Override
- public String name() {
+ return res;
+ }
+ }
+
+ public class UDFSortComparator implements Comparator<Tuple> {
+
+ public int compare(Tuple t1, Tuple t2) {
+
+ mSortFunc.attachInput(t1, t2);
+ Integer i = null;
+ Result res = null;
+ try {
+ res = mSortFunc.getNext(i);
+ } catch (ExecException e) {
+
+ log.error("Input not ready. Error on reading from input. "
+ + e.getMessage());
+ }
+ if (res != null)
+ return (Integer) res.result;
+ else
+ return 0;
+ }
+
+ }
+
+ @Override
+ public String name() {
+
+ return "POSort - " + mKey.toString();
+ }
+
+ @Override
+ public boolean isBlocking() {
+
+ return true;
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = new Result();
+ if (!inputsAccumulated) {
+ res = processInput();
+ while (res.returnStatus != POStatus.STATUS_EOP) {
+ if (res.returnStatus == POStatus.STATUS_ERR) {
+ log.error("Error in reading from the inputs");
+ continue;
+ } else if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+ sortedBag.add((Tuple) res.result);
+ res = processInput();
+
+ }
+
+ inputsAccumulated = true;
+
+ }
+ if (it == null) {
+ it = sortedBag.iterator();
+ }
+ res.result = it.next();
+ if (res.result == null)
+ res.returnStatus = POStatus.STATUS_EOP;
+ else
+ res.returnStatus = POStatus.STATUS_OK;
+ return res;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
- return "POSort - " + mKey.toString();
- }
+ return false;
+ }
- @Override
- public boolean isBlocking() {
-
- return true;
- }
-
- @Override
- public Result getNext(Tuple t) throws ExecException {
- Result res = new Result();
- if (!inputsAccumulated) {
- res = processInput();
- while (res.returnStatus != POStatus.STATUS_EOP) {
- if (res.returnStatus == POStatus.STATUS_ERR) {
- log.error("Error in reading from the inputs");
- continue;
- } else if (res.returnStatus == POStatus.STATUS_NULL) {
- continue;
- }
- sortedBag.add((Tuple) res.result);
- res = processInput();
-
- }
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
- inputsAccumulated = true;
+ v.visitSort(this);
+ }
- }
- if (it == null) {
- it = sortedBag.iterator();
- }
- res.result = it.next();
- if (res.result == null)
- res.returnStatus = POStatus.STATUS_EOP;
- else
- res.returnStatus = POStatus.STATUS_OK;
- return res;
- }
-
- @Override
- public boolean supportsMultipleInputs() {
-
- return false;
+ public List<ExprPlan> getSortPlans() {
+ return sortPlans;
}
- @Override
- public boolean supportsMultipleOutputs() {
-
- return false;
+ public void setSortPlans(List<ExprPlan> sortPlans) {
+ this.sortPlans = sortPlans;
}
- @Override
- public void visit(PhyPlanVisitor v) throws VisitorException {
-
- v.visitSort(this);
+ public POUserComparisonFunc getMSortFunc() {
+ return mSortFunc;
}
- public List<ExprPlan> getSortPlans() {
- return sortPlans;
+ public void setMSortFunc(POUserComparisonFunc sortFunc) {
+ mSortFunc = sortFunc;
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POProject.java Thu May 15 18:09:30 2008
@@ -64,6 +64,8 @@
//of the translator to set this.
boolean overloaded = false;
+ boolean star = false;
+
public POProject(OperatorKey k) {
this(k,-1,0);
}
@@ -79,7 +81,8 @@
@Override
public String name() {
- return "Project(" + column + ") - " + mKey.toString();
+
+ return "Project(" + ((star) ? "*" : column) + ") - " + mKey.toString();
}
@Override
@@ -184,6 +187,9 @@
res = processInput();
if(res.returnStatus!=POStatus.STATUS_OK)
return res;
+ if(star)
+ return res;
+
inpValue = (Tuple)res.result;
res.result = null;
@@ -229,4 +235,12 @@
this.overloaded = overloaded;
}
+ public boolean isStar() {
+ return star;
+ }
+
+ public void setStar(boolean star) {
+ this.star = star;
+ }
+
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserComparisonFunc.java Thu May 15 18:09:30 2008
@@ -0,0 +1,124 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+
+public class POUserComparisonFunc extends POUserFunc {
+
+ transient ComparisonFunc func;
+ private Log log = LogFactory.getLog(getClass());
+
+ public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec, ComparisonFunc func) {
+ super(k, rp, inp);
+ this.funcSpec = funcSpec;
+ this.func = func;
+ }
+
+ public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+ this(k, rp, inp, funcSpec, null);
+
+ instantiateFunc();
+ }
+
+ private void instantiateFunc() {
+ this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+ }
+
+ public ComparisonFunc getComparator() {
+ if (func == null)
+ instantiateFunc();
+ return func;
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ Result result = new Result();
+
+ if (func == null)
+ instantiateFunc();
+
+
+ result.result = func.compare(t1, t2);
+ result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
+ : POStatus.STATUS_ERR;
+ // the two attached tuples are used up now. So we set the
+ // inputAttached flag to false
+ inputAttached = false;
+ return result;
+
+ }
+
+ private Result getNext() {
+ Result res = null;
+ log.error("getNext being called with non-integer");
+ return res;
+ }
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Tuple in) throws ExecException {
+ return getNext();
+ }
+
+ public void attachInput(Tuple t1, Tuple t2) {
+ if (func == null)
+ instantiateFunc();
+
+ this.t1 = t1;
+ this.t2 = t2;
+ inputAttached = true;
+
+ }
+
+}