You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/02/18 20:22:02 UTC

svn commit: r745623 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/data/ src/org/apache/pig/impl/builtin/ sr...

Author: pradeepkth
Date: Wed Feb 18 19:22:00 2009
New Revision: 745623

URL: http://svn.apache.org/viewvc?rev=745623&view=rev
Log:
PIG-545: PERFORMANCE: Sampler for order bys does not produce a good distribution

Added:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
Removed:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java
    hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
    hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Feb 18 19:22:00 2009
@@ -424,3 +424,5 @@
 
     PIG-590: error handling on the backend (sms)
 
+    PIG-545: PERFORMANCE: Sampler for order bys does not produce a good
+    distribution (pradeepkth)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 18 19:22:00 2009
@@ -48,6 +48,7 @@
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -371,7 +372,7 @@
                 // global sort, not for limit after sort.
                 if (mro.isGlobalSort()) {
                     jobConf.set("pig.quantilesFile", mro.getQuantFile());
-                    jobConf.setPartitionerClass(SortPartitioner.class);
+                    jobConf.setPartitionerClass(WeightedRangePartitioner.class);
                 }
                 if(mro.UDFs.size()==1){
                     String compFuncSpec = mro.UDFs.get(0);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 18 19:22:00 2009
@@ -35,11 +35,12 @@
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -71,6 +72,7 @@
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * The compiler that compiles a given physical plan
@@ -944,9 +946,11 @@
             MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
             FileSpec quantFile = getTempFileSpec();
             int rp = op.getRequestedParallelism();
-            int[] fields = getSortCols(op);
-            MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
-            curMROp = getSortJob(op, quant, fSpec, quantFile, rp, fields);
+            Pair<Integer,Byte>[] fields = getSortCols(op);
+            Pair<MapReduceOper, Integer> quantJobParallelismPair = 
+                getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
+            curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile, 
+                    quantJobParallelismPair.second, fields);
             
             if(op.isUDFComparatorUsed){
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
@@ -958,14 +962,16 @@
         }
     }
     
-    private int[] getSortCols(POSort sort) throws PlanException, ExecException {
+    private Pair<Integer, Byte>[] getSortCols(POSort sort) throws PlanException, ExecException {
         List<PhysicalPlan> plans = sort.getSortPlans();
         if(plans!=null){
-            int[] ret = new int[plans.size()]; 
+            Pair[] ret = new Pair[plans.size()]; 
             int i=-1;
             for (PhysicalPlan plan : plans) {
                 if (((POProject)plan.getLeaves().get(0)).isStar()) return null;
-                ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn();
+                int first = ((POProject)plan.getLeaves().get(0)).getColumn();
+                byte second = ((POProject)plan.getLeaves().get(0)).getResultType();
+                ret[++i] = new Pair<Integer,Byte>(first,second);
             }
             return ret;
         }
@@ -980,7 +986,7 @@
             FileSpec lFile,
             FileSpec quantFile,
             int rp,
-            int[] fields) throws PlanException{
+            Pair<Integer,Byte>[] fields) throws PlanException{
         MapReduceOper mro = startNew(lFile, quantJob);
         mro.setQuantFile(quantFile.getFileName());
         mro.setGlobalSort(true);
@@ -1139,10 +1145,9 @@
         return mro;
     }
 
-    public MapReduceOper getQuantileJob(POSort inpSort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
+    public Pair<MapReduceOper,Integer> getQuantileJob(POSort inpSort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, Pair<Integer,Byte>[] fields) throws PlanException, VisitorException {
         FileSpec quantLdFilName = new FileSpec(lFile.getFileName(), new FuncSpec(RandomSampleLoader.class.getName()));
         MapReduceOper mro = startNew(quantLdFilName, prevJob);
-        mro.UDFs.add(FindQuantiles.class.getName());
         POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
                 .getRequestedParallelism(), null, inpSort.getSortPlans(),
                 inpSort.getMAscCols(), inpSort.getMSortFunc());
@@ -1152,7 +1157,7 @@
         
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
         List<Boolean> flat1 = new ArrayList<Boolean>();
-        
+        // Set up the projections of the key columns 
         if (fields == null) {
             PhysicalPlan ep = new PhysicalPlan();
             POProject prj = new POProject(new OperatorKey(scope,
@@ -1164,20 +1169,23 @@
             eps1.add(ep);
             flat1.add(true);
         } else {
-            for (int i : fields) {
+            for (Pair<Integer,Byte> i : fields) {
                 PhysicalPlan ep = new PhysicalPlan();
                 POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                prj.setColumn(i);
+                prj.setColumn(i.first);
                 prj.setOverloaded(false);
-                prj.setResultType(DataType.BYTEARRAY);
+                prj.setResultType(i.second);
                 ep.add(prj);
                 eps1.add(ep);
                 flat1.add(true);
             }
         }
+        // This foreach will pick the sort key columns from the RandomSampleLoader output 
         POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
         mro.mapPlan.addAsLeaf(nfe1);
         
+        // Now set up a POLocalRearrange which has "all" as the key and the output of the
+        // foreach will be the "value" out of POLocalRearrange
         PhysicalPlan ep1 = new PhysicalPlan();
         ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
         ce.setValue("all");
@@ -1210,39 +1218,73 @@
         pkg.setInner(inner);
         mro.reducePlan.add(pkg);
         
+        // Lets start building the plan which will have the sort
+        // for the foreach
         PhysicalPlan fe2Plan = new PhysicalPlan();
-        
+        // Top level project which just projects the tuple which is coming 
+        // from the foreach after the package
         POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         topPrj.setColumn(1);
         topPrj.setResultType(DataType.TUPLE);
         topPrj.setOverloaded(true);
         fe2Plan.add(topPrj);
         
-        PhysicalPlan nesSortPlan = new PhysicalPlan();
-        POProject prjStar2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        prjStar2.setResultType(DataType.TUPLE);
-        prjStar2.setStar(true);
-        nesSortPlan.add(prjStar2);
-        
+        // the projections which will form sort plans
         List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
-        nesSortPlanLst.add(nesSortPlan);
+        if (fields == null) {
+            PhysicalPlan ep = new PhysicalPlan();
+            POProject prj = new POProject(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+            prj.setStar(true);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.TUPLE);
+            ep.add(prj);
+            nesSortPlanLst.add(ep);
+        } else {
+            for (int i=0; i<fields.length;i++) {
+                PhysicalPlan ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                prj.setColumn(i);
+                prj.setOverloaded(false);
+                prj.setResultType(fields[i].second);
+                ep.add(prj);
+                nesSortPlanLst.add(ep);
+            }
+        }
         
         sort.setSortPlans(nesSortPlanLst);
         sort.setResultType(DataType.BAG);
         fe2Plan.add(sort);
         fe2Plan.connect(topPrj, sort);
         
-        /*PhysicalPlan ep3 = new PhysicalPlan();
-        POProject prjStar3 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        prjStar3.setResultType(DataType.BAG);
-        prjStar3.setColumn(1);
-        prjStar3.setStar(false);
-        ep3.add(prjStar3);*/
-        
+        // The plan which will have a constant representing the
+        // degree of parallelism for the final order by map-reduce job
+        // this will either come from a "order by parallel x" in the script
+        // or will be the default number of reducers for the cluster if
+        // "parallel x" is not used in the script
         PhysicalPlan rpep = new PhysicalPlan();
         ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
         rpce.setRequestedParallelism(rp);
-        rpce.setValue(rp<=0?1:rp);
+        int val = rp;
+        if(val<=0){
+            ExecutionEngine eng = pigContext.getExecutionEngine();
+            if(eng instanceof HExecutionEngine){
+                try {
+                    val = Math.round(0.9f * ((HExecutionEngine)eng).getJobClient().getDefaultReduces());
+                    if(val<=0)
+                        val = 1;
+                } catch (IOException e) {
+                    int errCode = 6015;
+                    String msg = "Problem getting the default number of reduces from the Job Client.";
+                    throw new MRCompilerException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
+                }
+            } else {
+            	val = 1; // local mode, set it to 1
+            }
+        }
+        int parallelismForSort = (rp <= 0 ? val : rp);
+        rpce.setValue(parallelismForSort);
+        
         rpce.setResultType(DataType.INTEGER);
         rpep.add(rpce);
         
@@ -1258,41 +1300,14 @@
         mro.reducePlan.add(nfe2);
         mro.reducePlan.connect(pkg, nfe2);
         
-        PhysicalPlan ep4 = new PhysicalPlan();
-        POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        prjStar4.setResultType(DataType.TUPLE);
-        prjStar4.setStar(true);
-        ep4.add(prjStar4);
-        
-        List ufInps = new ArrayList();
-        ufInps.add(prjStar4);
-        // Turn the asc/desc array into an array of strings so that we can pass it
-        // to the FindQuantiles function.
-        List<Boolean> ascCols = inpSort.getMAscCols();
-        String[] ascs = new String[ascCols.size()];
-        for (int i = 0; i < ascCols.size(); i++) ascs[i] = ascCols.get(i).toString();
-        POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, 
-            new FuncSpec(FindQuantiles.class.getName(), ascs));
-        ep4.add(uf);
-        ep4.connect(prjStar4, uf);
-        
-        List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
-        ep4s.add(ep4);
-        List<Boolean> flattened3 = new ArrayList<Boolean>();
-        flattened3.add(false);
-        POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
-        
-        mro.reducePlan.add(nfe3);
-        mro.reducePlan.connect(nfe2, nfe3);
-        
         POStore str = getStore();
         str.setSFile(quantFile);
         mro.reducePlan.add(str);
-        mro.reducePlan.connect(nfe3, str);
+        mro.reducePlan.connect(nfe2, str);
         
         mro.setReduceDone(true);
         mro.requestedParallelism = 1;
-        return mro;
+        return new Pair<MapReduceOper, Integer>(mro, parallelismForSort);
     }
 
     static class LastInputStreamingOptimizer extends MROpPlanVisitor {

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+/**
+ * 
+ */
+
+public class CountingMap<K> extends HashMap<K, Integer> {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    
+    private long totalCount = 0;
+    
+    @Override
+    public Integer put(K arg0, Integer arg1) {
+        if(!containsKey(arg0)){
+            totalCount += arg1;
+            return super.put(arg0, arg1);
+        }
+        else{
+            totalCount += arg1;
+            return super.put(arg0, get(arg0)+arg1);
+        }
+    }
+    
+    public void display(){
+        System.out.println();
+        System.out.println("-------------------------");
+        
+        for (Entry<K,Integer> ent : entrySet()) {
+            System.out.println(ent.getKey() + ": " + ent.getValue());
+        }
+        
+        System.out.println("-------------------------------");
+    }
+
+    public long getTotalCount() {
+        return totalCount;
+    }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.pig.PigException;
+
+
+public class DiscreteProbabilitySampleGenerator {
+    Random rGen;
+    float[] probVec;
+    float epsilon = 0.00001f;
+    
+    public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) throws MalFormedProbVecException{
+        rGen = new Random(seed);
+        float sum = 0.0f;
+        for (float f : probVec) {
+            sum += f;
+        }
+        if(1-epsilon<=sum && sum<=1+epsilon) 
+            this.probVec = probVec;
+        else {
+            int errorCode = 2122;
+            String message = "Sum of probabilities should be one";
+            throw new MalFormedProbVecException(message, errorCode, PigException.BUG);
+        }
+    }
+    
+    public DiscreteProbabilitySampleGenerator(float[] probVec) throws MalFormedProbVecException{
+        rGen = new Random();
+        float sum = 0.0f;
+        for (float f : probVec) {
+            sum += f;
+        }
+        if(1-epsilon<=sum && sum<=1+epsilon) 
+            this.probVec = probVec;
+        else {
+            int errorCode = 2122;
+            String message = "Sum of probabilities should be one";
+            throw new MalFormedProbVecException(message, errorCode, PigException.BUG);
+        }
+    }
+    
+    public int getNext(){
+        double toss = rGen.nextDouble();
+        // if the uniformly random number that I generated
+        // is in the probability range for a given parition,
+        // pick that parition
+        // For some sample item which occurs only in partitions
+        // 1 and 2
+        // say probVec[1] = 0.3
+        // and probVec[2] = 0.7
+        // if our coin toss generate < 0.3, we pick 1 otherwise
+        // we pick 2
+        for(int i=0;i<probVec.length;i++){
+            toss -= probVec[i];
+            if(toss<=0.0)
+                return i;
+        }
+        return -1;
+    }
+    
+    public static void main(String[] args) throws MalFormedProbVecException {
+        float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
+        DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
+        CountingMap<Integer> cm = new CountingMap<Integer>();
+        for(int i=0;i<100;i++){
+            cm.put(gen.getNext(), 1);
+        }
+        cm.display();
+    }
+
+    @Override
+    public String toString() {
+        // TODO Auto-generated method stub
+        return Arrays.toString(probVec);
+    }
+    
+    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+
+import org.apache.pig.backend.BackendException;
+
+/**
+ * 
+ */
+
+public class MalFormedProbVecException extends BackendException {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    
+    public MalFormedProbVecException(String message, int errCode, byte errSrc) {
+        super(message, errCode, errSrc);
+    }
+
+    public MalFormedProbVecException() {
+        super();
+    }
+
+    public MalFormedProbVecException(String arg0, Throwable arg1) {
+        super(arg0, arg1);
+    }
+
+    public MalFormedProbVecException(String arg0) {
+        super(arg0);
+    }
+
+    public MalFormedProbVecException(Throwable arg0) {
+        super(arg0);
+    }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class WeightedRangePartitioner implements Partitioner<PigNullableWritable, Writable> {
+    PigNullableWritable[] quantiles;
+    RawComparator<PigNullableWritable> comparator;
+    Integer numQuantiles;
+    DataBag samples;
+    public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+    JobConf job;
+
+    public int getPartition(PigNullableWritable key, Writable value,
+            int numPartitions){
+        if(!weightedParts.containsKey(key)){
+            int index = Arrays.binarySearch(quantiles, key, comparator);
+            if (index < 0)
+                index = -index-1;
+            else
+                index = index + 1;
+            return Math.min(index, numPartitions - 1);
+        }
+        DiscreteProbabilitySampleGenerator gen = weightedParts.get(key);
+        return gen.getNext();
+    }
+
+    public void configure(JobConf job) {
+        this.job = job;
+        String quantilesFile = job.get("pig.quantilesFile", "");
+        comparator = job.getOutputKeyComparator();
+        if (quantilesFile.length() == 0)
+            throw new RuntimeException(this.getClass().getSimpleName() + " used but no quantiles found");
+        
+        try{
+            InputStream is = FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
+            BinStorage loader = new BinStorage();
+            ArrayList<PigNullableWritable> quantilesList = new ArrayList<PigNullableWritable>();
+            loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+            Tuple t = loader.getNext();
+            if(t==null) throw new RuntimeException("Empty samples file");
+            // the Quantiles file has a tuple as under:
+            // (numQuantiles, bag of samples) 
+            // numQuantiles here is the reduce parallelism
+            numQuantiles = (Integer) t.get(0);
+            samples = (DataBag) t.get(1);
+            long numSamples = samples.size();
+            long toSkip = numSamples / numQuantiles;
+            if(toSkip == 0) {
+                // numSamples is < numQuantiles;
+                // set numQuantiles to numSamples
+                numQuantiles = (int)numSamples;
+                toSkip = 1;
+            }
+            
+            long ind=0, j=-1, nextQuantile = toSkip-1;
+            for (Tuple it : samples) {
+                if (ind==nextQuantile){
+                    ++j;
+                    quantilesList.add(getPigNullableWritable(it));
+                    nextQuantile+=toSkip;
+                    if(j==numQuantiles-1)
+                        break;
+                }
+                ind++;
+                //TODO how do we report progress?
+                //if (i % 1000 == 0) progress();
+                // Currently there is no way to report progress since 
+                // in configure() we cannot get a handle to the reporter
+                // (even PhysicalOperator.getReporter() does not work! It is
+                // set to null.) Hopefully the work done here wll be < 10 minutes
+                // since we are dealing with 100* num_mapper samples. When
+                // RandomSampleLoader becomes an operator or UDF instead of a
+                // loader hopefully we can intelligently decide on the number
+                // of samples (instead of the static 100) and then not being
+                // able to report progress may not be a big issue.
+            }
+            convertToArray(quantilesList);
+            long i=-1;
+            Map<PigNullableWritable,CountingMap<Integer>> contribs = new HashMap<PigNullableWritable, CountingMap<Integer>>();
+            for (Tuple it : samples){
+                ++i;
+                PigNullableWritable sample = getPigNullableWritable(it);
+                int partInd = new Long(i/toSkip).intValue(); // which partition
+                if(partInd==numQuantiles) break;
+                // the quantiles array has the element from the sample which is the
+                // last element for a given partition. For example: if numQunatiles 
+                // is 5 and number of samples is 100, then toSkip = 20 
+                // quantiles[0] = sample[19] // the 20th element
+                // quantiles[1] = sample[39] // the 40th element
+                // and so on. For any element in the sample between 0 and 19, partInd
+                // will be 0. We want to check if a sample element which is
+                // present between 0 and 19 and is also the 19th (quantiles[0] element).
+                // This would mean that element might spread over the 0th and 1st 
+                // partition. We are looking for contributions to a partition
+                // from such elements. 
+                
+                // First We only check for sample elements in partitions other than the last one
+                // < numQunatiles -1 (partInd is 0 indexed). 
+                if(partInd<numQuantiles-1 && areEqual(sample,quantiles[partInd])){
+                    if(!contribs.containsKey(sample)){
+                        CountingMap<Integer> cm = new CountingMap<Integer>();
+                        cm.put(partInd, 1);
+                        contribs.put(sample, cm);
+                    }
+                    else
+                        contribs.get(sample).put(partInd, 1);
+                }
+                else{ 
+                    // we are either in the last partition (last quantile)
+                    // OR the sample element we are currently processing is not
+                    // the same as the element in the quantile array for this partition
+                    // if we haven't seen this sample item earlier, this is not an
+                    // element which crosses partitions - so ignore
+                    if(!contribs.containsKey(sample))
+                        continue;
+                    else
+                        // we have seen this sample before (in a previous partInd), 
+                        // add to the contribution associated with this sample - if we had 
+                        // not seen this sample in a previous partInd, then we have not
+                        // had this in the contribs map! (because of the if above).This 
+                        // sample can either go to the previous partInd or this partInd 
+                        // in the final sort reduce stage. That is where the amount of 
+                        // contribution to each partInd will matter and influence the choice.
+                        contribs.get(sample).put(partInd, 1);
+                }
+            }
+            for(Entry<PigNullableWritable, CountingMap<Integer>> ent : contribs.entrySet()){
+                PigNullableWritable key = ent.getKey(); // sample item which repeats
+                
+                // this map will have the contributions of the sample item to the different partitions
+                CountingMap<Integer> value = ent.getValue(); 
+                
+                long total = value.getTotalCount();
+                float[] probVec = new float[numQuantiles];
+                // for each partition that this sample item is present in,
+                // compute the fraction of the total occurences for that
+                // partition - this will be the probability with which we
+                // will pick this partition in the final sort reduce job
+                // for this sample item
+                for (Entry<Integer,Integer> valEnt : value.entrySet()) {
+                    probVec[valEnt.getKey()] = (float)valEnt.getValue()/total;
+                }
+//                weightedParts.put(key, new DiscreteProbabilitySampleGenerator(11317,probVec));
+                weightedParts.put(key, new DiscreteProbabilitySampleGenerator(probVec));
+            }
+        }catch (Exception e){
+            throw new RuntimeException(e);
+        }
+    }
+
+    private PigNullableWritable getPigNullableWritable(Tuple t) {
+        try {
+            // user comparators work with tuples - so if user comparator
+            // is being used OR if there are more than 1 sort cols, use
+            // NullableTuple
+            if ("true".equals(job.get("pig.usercomparator")) || t.size() > 1) {
+                return new NullableTuple(t);
+            } else {
+                Object o = t.get(0);
+                String kts = job.get("pig.reduce.key.type");
+                if (kts == null) {
+                    throw new RuntimeException("Didn't get reduce key type "
+                        + "from config file.");
+                }
+                return HDataType.getWritableComparableTypes(o,
+                    Byte.valueOf(kts));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private boolean areEqual(PigNullableWritable sample, PigNullableWritable writable) {
+        return comparator.compare(sample, writable)==0;
+    }
+
+    private void convertToArray(
+            ArrayList<PigNullableWritable> q) {
+        if ("true".equals(job.get("pig.usercomparator")) ||
+                q.get(0).getClass().equals(NullableTuple.class)) {
+            quantiles = q.toArray(new NullableTuple[0]);
+        } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) {
+            quantiles = q.toArray(new NullableBytesWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) {
+            quantiles = q.toArray(new NullableDoubleWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) {
+            quantiles = q.toArray(new NullableFloatWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableIntWritable.class)) {
+            quantiles = q.toArray(new NullableIntWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableLongWritable.class)) {
+            quantiles = q.toArray(new NullableLongWritable[0]);
+        } else if (q.get(0).getClass().equals(NullableText.class)) {
+            quantiles = q.toArray(new NullableText[0]);
+        } else {
+            throw new RuntimeException("Unexpected class in " + this.getClass().getSimpleName());
+        }
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Wed Feb 18 19:22:00 2009
@@ -60,6 +60,7 @@
      */
     public DefaultDataBag(List<Tuple> listOfTuples) {
         mContents = listOfTuples;
+        mSize = listOfTuples.size();
     }
 
     public boolean isSorted() {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java Wed Feb 18 19:22:00 2009
@@ -129,6 +129,13 @@
         else return mValue.hashCode();
     }
 
+    
+    
+    @Override
+    public boolean equals(Object arg0) {
+        return compareTo(arg0)==0;
+    }
+
     @Override
     public String toString() {
         return new String("Null: " + mNull + " index: " + mIndex + " " +

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Wed Feb 18 19:22:00 2009
@@ -376,10 +376,7 @@
                 pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
             }
         }
-        pigServer.store("B", tmpOutputFile);
-        
-        pigServer.registerQuery("A = load '" + tmpOutputFile + "';");
-        Iterator<Tuple> iter = pigServer.openIterator("A");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
         String last = "";
         HashSet<Integer> seen = new HashSet<Integer>();
         if(!iter.hasNext()) fail("No Results obtained");

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Wed Feb 18 19:22:00 2009
@@ -31,11 +31,11 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.COUNT;
 import org.apache.pig.builtin.SUM;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -59,6 +59,17 @@
 import org.junit.Before;
 import org.junit.Test;
 
+/**
+ * Test cases to test the MRCompiler.
+ * VERY IMPORTANT NOTE: The tests here compare results with a
+ * "golden" set of outputs. In each testcase here, the operators
+ * generated have a random operator key which uses Java's Random 
+ * class. So if there is a code change which changes the number of
+ * operators created in a plan, then not only will the "golden" file
+ * for that test case need to be changed, but also for the tests
+ * that follow it since the operator keys that will be generated through
+ * Random will be different.
+ */
 public class TestMRCompiler extends junit.framework.TestCase {
 //    MiniCluster cluster = MiniCluster.buildCluster();
     
@@ -82,6 +93,13 @@
     
     LogicalPlanTester planTester = new LogicalPlanTester() ;
 
+    // if for some reason, the golden files need
+    // to be regenerated, set this to true - THIS
+    // WILL OVERWRITE THE GOLDEN FILES - So use this
+    // with caution and only for the testcases you need
+    // and are sure of
+    private boolean generate = true;
+
     @Before
     public void setUp() throws ExecException {
         GenPhyOp.setR(r);
@@ -701,6 +719,7 @@
         PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
         php.merge(ldFil1);
         
+        // set up order by *
         String funcName = WeirdComparator.class.getName();
         POUserComparisonFunc comparator = new POUserComparisonFunc(
                 new OperatorKey("", r.nextLong()), -1, null, new FuncSpec(funcName));
@@ -727,18 +746,21 @@
         
         php.add(sort);
         php.connect(ldFil1.getLeaves().get(0), sort);
-        
+        // have a foreach which takes the sort output
+        // and send it two two udfs
         List<String> udfs = new ArrayList<String>();
-        udfs.add(FindQuantiles.class.getName());
+        udfs.add(COUNT.class.getName());
         udfs.add(SUM.class.getName());
         POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs);
         php.add(fe3);
         php.connect(sort, fe3);
         
+        // add a group above the foreach
         PhysicalPlan grpChain1 = GenPhyOp.grpChain();
         php.merge(grpChain1);
         php.connect(fe3,grpChain1.getRoots().get(0));
         
+        
         udfs.clear();
         udfs.add(AVG.class.getName());
         POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
@@ -832,7 +854,9 @@
 
     /**
      * Test to ensure that the order by without parallel followed by a limit, i.e., top k
-     * always produces the correct number of map reduce jobs
+     * always produces the correct number of map reduce jobs. In the testcase below since
+     * we are running the unit test locally, we will get reduce parallelism as 1. So we will
+     * NOT introduce the extra MR job to do a final limit
      */
     @Test
     public void testNumReducersInLimit() throws Exception {
@@ -850,7 +874,7 @@
     		mrOper = mrPlan.getSuccessors(mrOper).get(0);
     		++count;
     	}        
-    	assertTrue(count == 4);
+    	assertTrue(count == 3);
     }
     
     /**
@@ -908,6 +932,11 @@
         ppp.print(baos);
         compiledPlan = baos.toString();
 
+        if(generate ){
+            FileOutputStream fos = new FileOutputStream(expectedFile);
+            fos.write(baos.toByteArray());
+            return;
+        }
         FileInputStream fis = new FileInputStream(expectedFile);
         byte[] b = new byte[MAX_SIZE];
         int len = fis.read(b);

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Wed Feb 18 19:22:00 2009
@@ -1,4 +1,4 @@
-MapReduce(1,GFCross) - -156:
+MapReduce(1,GFCross) - -153:
 |   Store(DummyFil:DummyLdr) - --5683415113785058706
 |   |
 |   |---New For Each(false)[tuple] - --8002381389674382470
@@ -10,10 +10,10 @@
 |       |---Package[tuple]{Unknown} - --885269774183211482
 |   Local Rearrange[tuple]{Unknown}(false) - --776319888013965510
 |   |
-|   |---Load(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -155
+|   |---Load(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -152
 |
-|---MapReduce(1,AVG) - -153:
-    |   Store(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -154
+|---MapReduce(1,AVG) - -150:
+    |   Store(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -151
     |   |
     |   |---New For Each(false)[tuple] - -7965768498188214494
     |       |   |
@@ -24,14 +24,14 @@
     |       |---Package[tuple]{Unknown} - --7335024873119453444
     |   Local Rearrange[tuple]{Unknown}(false) - -4589138876054328603
     |   |
-    |   |---Load(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -152
+    |   |---Load(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -149
     |
-    |---MapReduce(20,TestMRCompiler$WeirdComparator,FindQuantiles,SUM) - -145:
-        |   Store(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -151
+    |---MapReduce(20,TestMRCompiler$WeirdComparator,COUNT,SUM) - -142:
+        |   Store(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -148
         |   |
         |   |---New For Each(false,false)[tuple] - --4248200967728536480
         |       |   |
-        |       |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - -8767305735755351861
+        |       |   POUserFunc(org.apache.pig.builtin.COUNT)[tuple] - -8767305735755351861
         |       |   |
         |       |   |---Project[tuple][*] - --5908426805312852480
         |       |   |
@@ -39,37 +39,31 @@
         |       |   |
         |       |   |---Project[tuple][*] - --1848504978980807369
         |       |
-        |       |---New For Each(true)[tuple] - -150
+        |       |---New For Each(true)[tuple] - -147
         |           |   |
-        |           |   Project[bag][1] - -149
+        |           |   Project[bag][1] - -146
         |           |
-        |           |---Package[tuple]{tuple} - -148
-        |   Local Rearrange[tuple]{tuple}(false) - -147
+        |           |---Package[tuple]{tuple} - -145
+        |   Local Rearrange[tuple]{tuple}(false) - -144
         |   |   |
-        |   |   Project[tuple][*] - -146
+        |   |   Project[tuple][*] - -143
         |   |
-        |   |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -144
+        |   |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -141
         |
-        |---MapReduce(1,FindQuantiles,TestMRCompiler$WeirdComparator) - -130:
-            |   Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -143
+        |---MapReduce(1,TestMRCompiler$WeirdComparator) - -130:
+            |   Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -140
             |   |
-            |   |---New For Each(false)[tuple] - -142
+            |   |---New For Each(false,false)[tuple] - -139
             |       |   |
-            |       |   POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - -141
+            |       |   Constant(20) - -138
             |       |   |
-            |       |   |---Project[tuple][*] - -140
+            |       |   POSort[bag](org.apache.pig.test.TestMRCompiler$WeirdComparator) - --8479692259657755370
+            |       |   |   |
+            |       |   |   Project[tuple][*] - -137
+            |       |   |
+            |       |   |---Project[tuple][1] - -136
             |       |
-            |       |---New For Each(false,false)[tuple] - -139
-            |           |   |
-            |           |   Constant(20) - -138
-            |           |   |
-            |           |   POSort[bag](org.apache.pig.test.TestMRCompiler$WeirdComparator) - --8479692259657755370
-            |           |   |   |
-            |           |   |   Project[tuple][*] - -137
-            |           |   |
-            |           |   |---Project[tuple][1] - -136
-            |           |
-            |           |---Package[tuple]{chararray} - -135
+            |       |---Package[tuple]{chararray} - -135
             |   Local Rearrange[tuple]{chararray}(false) - -134
             |   |   |
             |   |   Constant(all) - -133

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld Wed Feb 18 19:22:00 2009
@@ -1,36 +1,36 @@
-MapReduce(-1) - -170:
+MapReduce(-1) - -167:
 |   Store(DummyFil:DummyLdr) - -7973970339130605847
 |   |
-|   |---New For Each(true)[bag] - -173
+|   |---New For Each(true)[bag] - -170
 |       |   |
-|       |   Project[tuple][0] - -172
+|       |   Project[tuple][0] - -169
 |       |
-|       |---Package[tuple]{tuple} - -171
-|   Local Rearrange[tuple]{tuple}(true) - -167
+|       |---Package[tuple]{tuple} - -168
+|   Local Rearrange[tuple]{tuple}(true) - -164
 |   |   |
-|   |   Project[tuple][*] - -166
+|   |   Project[tuple][*] - -163
 |   |
-|   |---Load(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -169
+|   |---Load(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -166
 |
-|---MapReduce(-1) - -165:
-    |   Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -168
+|---MapReduce(-1) - -162:
+    |   Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -165
     |   |
     |   |---Package[tuple]{Unknown} - -2082992246427879202
     |   Local Rearrange[tuple]{Unknown}(false) - --3148893660811981376
     |   |
-    |   |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -164
+    |   |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -161
     |
-    |---MapReduce(-1) - -157:
-        |   Store(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -163
+    |---MapReduce(-1) - -154:
+        |   Store(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -160
         |   |
-        |   |---New For Each(true)[bag] - -162
+        |   |---New For Each(true)[bag] - -159
         |       |   |
-        |       |   Project[tuple][0] - -161
+        |       |   Project[tuple][0] - -158
         |       |
-        |       |---Package[tuple]{tuple} - -160
-        |   Local Rearrange[tuple]{tuple}(true) - -159
+        |       |---Package[tuple]{tuple} - -157
+        |   Local Rearrange[tuple]{tuple}(true) - -156
         |   |   |
-        |   |   Project[tuple][*] - -158
+        |   |   Project[tuple][*] - -155
         |   |
         |   |---Filter[tuple] - --7926255547935388282
         |       |

Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld Wed Feb 18 19:22:00 2009
@@ -1,17 +1,17 @@
-MapReduce(1) - -174:
+MapReduce(1) - -171:
 |   Store(DummyFil:DummyLdr) - -7856319821130535798
 |   |
-|   |---Limit - -180
+|   |---Limit - -177
 |       |
-|       |---New For Each(true)[bag] - -179
+|       |---New For Each(true)[bag] - -176
 |           |   |
-|           |   Project[tuple][1] - -178
+|           |   Project[tuple][1] - -175
 |           |
-|           |---Package[tuple]{tuple} - -177
-|   Local Rearrange[tuple]{tuple}(false) - -176
+|           |---Package[tuple]{tuple} - -174
+|   Local Rearrange[tuple]{tuple}(false) - -173
 |   |   |
-|   |   Project[tuple][*] - -175
+|   |   Project[tuple][*] - -172
 |   |
 |   |---Limit - -7398260302074824818
 |       |
-|       |---Load(DummyFil:DummyLdr) - -4188863770717253580
+|       |---Load(DummyFil:DummyLdr) - -4188863770717253580
\ No newline at end of file

Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Wed Feb 18 19:22:00 2009
@@ -30,7 +30,6 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;