You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/02/18 20:22:02 UTC
svn commit: r745623 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
src/org/apache/pig/data/ src/org/apache/pig/impl/builtin/ sr...
Author: pradeepkth
Date: Wed Feb 18 19:22:00 2009
New Revision: 745623
URL: http://svn.apache.org/viewvc?rev=745623&view=rev
Log:
PIG-545: PERFORMANCE: Sampler for order bys does not produce a good distribution
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
Removed:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SortPartitioner.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Feb 18 19:22:00 2009
@@ -424,3 +424,5 @@
PIG-590: error handling on the backend (sms)
+ PIG-545: PERFORMANCE: Sampler for order bys does not produce a good
+ distribution (pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 18 19:22:00 2009
@@ -48,6 +48,7 @@
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -371,7 +372,7 @@
// global sort, not for limit after sort.
if (mro.isGlobalSort()) {
jobConf.set("pig.quantilesFile", mro.getQuantFile());
- jobConf.setPartitionerClass(SortPartitioner.class);
+ jobConf.setPartitionerClass(WeightedRangePartitioner.class);
}
if(mro.UDFs.size()==1){
String compFuncSpec = mro.UDFs.get(0);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 18 19:22:00 2009
@@ -35,11 +35,12 @@
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.RandomSampleLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -71,6 +72,7 @@
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
/**
* The compiler that compiles a given physical plan
@@ -944,9 +946,11 @@
MapReduceOper mro = endSingleInputPlanWithStr(fSpec);
FileSpec quantFile = getTempFileSpec();
int rp = op.getRequestedParallelism();
- int[] fields = getSortCols(op);
- MapReduceOper quant = getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
- curMROp = getSortJob(op, quant, fSpec, quantFile, rp, fields);
+ Pair<Integer,Byte>[] fields = getSortCols(op);
+ Pair<MapReduceOper, Integer> quantJobParallelismPair =
+ getQuantileJob(op, mro, fSpec, quantFile, rp, fields);
+ curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile,
+ quantJobParallelismPair.second, fields);
if(op.isUDFComparatorUsed){
curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
@@ -958,14 +962,16 @@
}
}
- private int[] getSortCols(POSort sort) throws PlanException, ExecException {
+ private Pair<Integer, Byte>[] getSortCols(POSort sort) throws PlanException, ExecException {
List<PhysicalPlan> plans = sort.getSortPlans();
if(plans!=null){
- int[] ret = new int[plans.size()];
+ Pair[] ret = new Pair[plans.size()];
int i=-1;
for (PhysicalPlan plan : plans) {
if (((POProject)plan.getLeaves().get(0)).isStar()) return null;
- ret[++i] = ((POProject)plan.getLeaves().get(0)).getColumn();
+ int first = ((POProject)plan.getLeaves().get(0)).getColumn();
+ byte second = ((POProject)plan.getLeaves().get(0)).getResultType();
+ ret[++i] = new Pair<Integer,Byte>(first,second);
}
return ret;
}
@@ -980,7 +986,7 @@
FileSpec lFile,
FileSpec quantFile,
int rp,
- int[] fields) throws PlanException{
+ Pair<Integer,Byte>[] fields) throws PlanException{
MapReduceOper mro = startNew(lFile, quantJob);
mro.setQuantFile(quantFile.getFileName());
mro.setGlobalSort(true);
@@ -1139,10 +1145,9 @@
return mro;
}
- public MapReduceOper getQuantileJob(POSort inpSort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, int[] fields) throws PlanException, VisitorException {
+ public Pair<MapReduceOper,Integer> getQuantileJob(POSort inpSort, MapReduceOper prevJob, FileSpec lFile, FileSpec quantFile, int rp, Pair<Integer,Byte>[] fields) throws PlanException, VisitorException {
FileSpec quantLdFilName = new FileSpec(lFile.getFileName(), new FuncSpec(RandomSampleLoader.class.getName()));
MapReduceOper mro = startNew(quantLdFilName, prevJob);
- mro.UDFs.add(FindQuantiles.class.getName());
POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
.getRequestedParallelism(), null, inpSort.getSortPlans(),
inpSort.getMAscCols(), inpSort.getMSortFunc());
@@ -1152,7 +1157,7 @@
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat1 = new ArrayList<Boolean>();
-
+ // Set up the projections of the key columns
if (fields == null) {
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,
@@ -1164,20 +1169,23 @@
eps1.add(ep);
flat1.add(true);
} else {
- for (int i : fields) {
+ for (Pair<Integer,Byte> i : fields) {
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setColumn(i);
+ prj.setColumn(i.first);
prj.setOverloaded(false);
- prj.setResultType(DataType.BYTEARRAY);
+ prj.setResultType(i.second);
ep.add(prj);
eps1.add(ep);
flat1.add(true);
}
}
+ // This foreach will pick the sort key columns from the RandomSampleLoader output
POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
mro.mapPlan.addAsLeaf(nfe1);
+ // Now set up a POLocalRearrange which has "all" as the key and the output of the
+ // foreach will be the "value" out of POLocalRearrange
PhysicalPlan ep1 = new PhysicalPlan();
ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
ce.setValue("all");
@@ -1210,39 +1218,73 @@
pkg.setInner(inner);
mro.reducePlan.add(pkg);
+ // Lets start building the plan which will have the sort
+ // for the foreach
PhysicalPlan fe2Plan = new PhysicalPlan();
-
+ // Top level project which just projects the tuple which is coming
+ // from the foreach after the package
POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
topPrj.setColumn(1);
topPrj.setResultType(DataType.TUPLE);
topPrj.setOverloaded(true);
fe2Plan.add(topPrj);
- PhysicalPlan nesSortPlan = new PhysicalPlan();
- POProject prjStar2 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prjStar2.setResultType(DataType.TUPLE);
- prjStar2.setStar(true);
- nesSortPlan.add(prjStar2);
-
+ // the projections which will form sort plans
List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
- nesSortPlanLst.add(nesSortPlan);
+ if (fields == null) {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ prj.setStar(true);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.TUPLE);
+ ep.add(prj);
+ nesSortPlanLst.add(ep);
+ } else {
+ for (int i=0; i<fields.length;i++) {
+ PhysicalPlan ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setColumn(i);
+ prj.setOverloaded(false);
+ prj.setResultType(fields[i].second);
+ ep.add(prj);
+ nesSortPlanLst.add(ep);
+ }
+ }
sort.setSortPlans(nesSortPlanLst);
sort.setResultType(DataType.BAG);
fe2Plan.add(sort);
fe2Plan.connect(topPrj, sort);
- /*PhysicalPlan ep3 = new PhysicalPlan();
- POProject prjStar3 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prjStar3.setResultType(DataType.BAG);
- prjStar3.setColumn(1);
- prjStar3.setStar(false);
- ep3.add(prjStar3);*/
-
+ // The plan which will have a constant representing the
+ // degree of parallelism for the final order by map-reduce job
+ // this will either come from a "order by parallel x" in the script
+ // or will be the default number of reducers for the cluster if
+ // "parallel x" is not used in the script
PhysicalPlan rpep = new PhysicalPlan();
ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
rpce.setRequestedParallelism(rp);
- rpce.setValue(rp<=0?1:rp);
+ int val = rp;
+ if(val<=0){
+ ExecutionEngine eng = pigContext.getExecutionEngine();
+ if(eng instanceof HExecutionEngine){
+ try {
+ val = Math.round(0.9f * ((HExecutionEngine)eng).getJobClient().getDefaultReduces());
+ if(val<=0)
+ val = 1;
+ } catch (IOException e) {
+ int errCode = 6015;
+ String msg = "Problem getting the default number of reduces from the Job Client.";
+ throw new MRCompilerException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
+ }
+ } else {
+ val = 1; // local mode, set it to 1
+ }
+ }
+ int parallelismForSort = (rp <= 0 ? val : rp);
+ rpce.setValue(parallelismForSort);
+
rpce.setResultType(DataType.INTEGER);
rpep.add(rpce);
@@ -1258,41 +1300,14 @@
mro.reducePlan.add(nfe2);
mro.reducePlan.connect(pkg, nfe2);
- PhysicalPlan ep4 = new PhysicalPlan();
- POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prjStar4.setResultType(DataType.TUPLE);
- prjStar4.setStar(true);
- ep4.add(prjStar4);
-
- List ufInps = new ArrayList();
- ufInps.add(prjStar4);
- // Turn the asc/desc array into an array of strings so that we can pass it
- // to the FindQuantiles function.
- List<Boolean> ascCols = inpSort.getMAscCols();
- String[] ascs = new String[ascCols.size()];
- for (int i = 0; i < ascCols.size(); i++) ascs[i] = ascCols.get(i).toString();
- POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
- new FuncSpec(FindQuantiles.class.getName(), ascs));
- ep4.add(uf);
- ep4.connect(prjStar4, uf);
-
- List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
- ep4s.add(ep4);
- List<Boolean> flattened3 = new ArrayList<Boolean>();
- flattened3.add(false);
- POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
-
- mro.reducePlan.add(nfe3);
- mro.reducePlan.connect(nfe2, nfe3);
-
POStore str = getStore();
str.setSFile(quantFile);
mro.reducePlan.add(str);
- mro.reducePlan.connect(nfe3, str);
+ mro.reducePlan.connect(nfe2, str);
mro.setReduceDone(true);
mro.requestedParallelism = 1;
- return mro;
+ return new Pair<MapReduceOper, Integer>(mro, parallelismForSort);
}
static class LastInputStreamingOptimizer extends MROpPlanVisitor {
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/CountingMap.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+/**
+ *
+ */
+
+public class CountingMap<K> extends HashMap<K, Integer> {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ private long totalCount = 0;
+
+ @Override
+ public Integer put(K arg0, Integer arg1) {
+ if(!containsKey(arg0)){
+ totalCount += arg1;
+ return super.put(arg0, arg1);
+ }
+ else{
+ totalCount += arg1;
+ return super.put(arg0, get(arg0)+arg1);
+ }
+ }
+
+ public void display(){
+ System.out.println();
+ System.out.println("-------------------------");
+
+ for (Entry<K,Integer> ent : entrySet()) {
+ System.out.println(ent.getKey() + ": " + ent.getValue());
+ }
+
+ System.out.println("-------------------------------");
+ }
+
+ public long getTotalCount() {
+ return totalCount;
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.pig.PigException;
+
+
+public class DiscreteProbabilitySampleGenerator {
+ Random rGen;
+ float[] probVec;
+ float epsilon = 0.00001f;
+
+ public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) throws MalFormedProbVecException{
+ rGen = new Random(seed);
+ float sum = 0.0f;
+ for (float f : probVec) {
+ sum += f;
+ }
+ if(1-epsilon<=sum && sum<=1+epsilon)
+ this.probVec = probVec;
+ else {
+ int errorCode = 2122;
+ String message = "Sum of probabilities should be one";
+ throw new MalFormedProbVecException(message, errorCode, PigException.BUG);
+ }
+ }
+
+ public DiscreteProbabilitySampleGenerator(float[] probVec) throws MalFormedProbVecException{
+ rGen = new Random();
+ float sum = 0.0f;
+ for (float f : probVec) {
+ sum += f;
+ }
+ if(1-epsilon<=sum && sum<=1+epsilon)
+ this.probVec = probVec;
+ else {
+ int errorCode = 2122;
+ String message = "Sum of probabilities should be one";
+ throw new MalFormedProbVecException(message, errorCode, PigException.BUG);
+ }
+ }
+
+ public int getNext(){
+ double toss = rGen.nextDouble();
+ // if the uniformly random number that I generated
+ // is in the probability range for a given parition,
+ // pick that parition
+ // For some sample item which occurs only in partitions
+ // 1 and 2
+ // say probVec[1] = 0.3
+ // and probVec[2] = 0.7
+ // if our coin toss generate < 0.3, we pick 1 otherwise
+ // we pick 2
+ for(int i=0;i<probVec.length;i++){
+ toss -= probVec[i];
+ if(toss<=0.0)
+ return i;
+ }
+ return -1;
+ }
+
+ public static void main(String[] args) throws MalFormedProbVecException {
+ float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
+ DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec);
+ CountingMap<Integer> cm = new CountingMap<Integer>();
+ for(int i=0;i<100;i++){
+ cm.put(gen.getNext(), 1);
+ }
+ cm.display();
+ }
+
+ @Override
+ public String toString() {
+ // TODO Auto-generated method stub
+ return Arrays.toString(probVec);
+ }
+
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/MalFormedProbVecException.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+
+import org.apache.pig.backend.BackendException;
+
+/**
+ *
+ */
+
+public class MalFormedProbVecException extends BackendException {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ public MalFormedProbVecException(String message, int errCode, byte errSrc) {
+ super(message, errCode, errSrc);
+ }
+
+ public MalFormedProbVecException() {
+ super();
+ }
+
+ public MalFormedProbVecException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ public MalFormedProbVecException(String arg0) {
+ super(arg0);
+ }
+
+ public MalFormedProbVecException(Throwable arg0) {
+ super(arg0);
+ }
+}
\ No newline at end of file
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=745623&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Wed Feb 18 19:22:00 2009
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
+
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.NullableBytesWritable;
+import org.apache.pig.impl.io.NullableDoubleWritable;
+import org.apache.pig.impl.io.NullableFloatWritable;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableLongWritable;
+import org.apache.pig.impl.io.NullableText;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class WeightedRangePartitioner implements Partitioner<PigNullableWritable, Writable> {
+ PigNullableWritable[] quantiles;
+ RawComparator<PigNullableWritable> comparator;
+ Integer numQuantiles;
+ DataBag samples;
+ public static Map<PigNullableWritable,DiscreteProbabilitySampleGenerator> weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>();
+ JobConf job;
+
+ public int getPartition(PigNullableWritable key, Writable value,
+ int numPartitions){
+ if(!weightedParts.containsKey(key)){
+ int index = Arrays.binarySearch(quantiles, key, comparator);
+ if (index < 0)
+ index = -index-1;
+ else
+ index = index + 1;
+ return Math.min(index, numPartitions - 1);
+ }
+ DiscreteProbabilitySampleGenerator gen = weightedParts.get(key);
+ return gen.getNext();
+ }
+
+ public void configure(JobConf job) {
+ this.job = job;
+ String quantilesFile = job.get("pig.quantilesFile", "");
+ comparator = job.getOutputKeyComparator();
+ if (quantilesFile.length() == 0)
+ throw new RuntimeException(this.getClass().getSimpleName() + " used but no quantiles found");
+
+ try{
+ InputStream is = FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
+ BinStorage loader = new BinStorage();
+ ArrayList<PigNullableWritable> quantilesList = new ArrayList<PigNullableWritable>();
+ loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+ Tuple t = loader.getNext();
+ if(t==null) throw new RuntimeException("Empty samples file");
+ // the Quantiles file has a tuple as under:
+ // (numQuantiles, bag of samples)
+ // numQuantiles here is the reduce parallelism
+ numQuantiles = (Integer) t.get(0);
+ samples = (DataBag) t.get(1);
+ long numSamples = samples.size();
+ long toSkip = numSamples / numQuantiles;
+ if(toSkip == 0) {
+ // numSamples is < numQuantiles;
+ // set numQuantiles to numSamples
+ numQuantiles = (int)numSamples;
+ toSkip = 1;
+ }
+
+ long ind=0, j=-1, nextQuantile = toSkip-1;
+ for (Tuple it : samples) {
+ if (ind==nextQuantile){
+ ++j;
+ quantilesList.add(getPigNullableWritable(it));
+ nextQuantile+=toSkip;
+ if(j==numQuantiles-1)
+ break;
+ }
+ ind++;
+ //TODO how do we report progress?
+ //if (i % 1000 == 0) progress();
+ // Currently there is no way to report progress since
+ // in configure() we cannot get a handle to the reporter
+ // (even PhysicalOperator.getReporter() does not work! It is
+ // set to null.) Hopefully the work done here wll be < 10 minutes
+ // since we are dealing with 100* num_mapper samples. When
+ // RandomSampleLoader becomes an operator or UDF instead of a
+ // loader hopefully we can intelligently decide on the number
+ // of samples (instead of the static 100) and then not being
+ // able to report progress may not be a big issue.
+ }
+ convertToArray(quantilesList);
+ long i=-1;
+ Map<PigNullableWritable,CountingMap<Integer>> contribs = new HashMap<PigNullableWritable, CountingMap<Integer>>();
+ for (Tuple it : samples){
+ ++i;
+ PigNullableWritable sample = getPigNullableWritable(it);
+ int partInd = new Long(i/toSkip).intValue(); // which partition
+ if(partInd==numQuantiles) break;
+ // the quantiles array has the element from the sample which is the
+ // last element for a given partition. For example: if numQunatiles
+ // is 5 and number of samples is 100, then toSkip = 20
+ // quantiles[0] = sample[19] // the 20th element
+ // quantiles[1] = sample[39] // the 40th element
+ // and so on. For any element in the sample between 0 and 19, partInd
+ // will be 0. We want to check if a sample element which is
+ // present between 0 and 19 and is also the 19th (quantiles[0] element).
+ // This would mean that element might spread over the 0th and 1st
+ // partition. We are looking for contributions to a partition
+ // from such elements.
+
+ // First We only check for sample elements in partitions other than the last one
+ // < numQunatiles -1 (partInd is 0 indexed).
+ if(partInd<numQuantiles-1 && areEqual(sample,quantiles[partInd])){
+ if(!contribs.containsKey(sample)){
+ CountingMap<Integer> cm = new CountingMap<Integer>();
+ cm.put(partInd, 1);
+ contribs.put(sample, cm);
+ }
+ else
+ contribs.get(sample).put(partInd, 1);
+ }
+ else{
+ // we are either in the last partition (last quantile)
+ // OR the sample element we are currently processing is not
+ // the same as the element in the quantile array for this partition
+ // if we haven't seen this sample item earlier, this is not an
+ // element which crosses partitions - so ignore
+ if(!contribs.containsKey(sample))
+ continue;
+ else
+ // we have seen this sample before (in a previous partInd),
+ // add to the contribution associated with this sample - if we had
+ // not seen this sample in a previous partInd, then we have not
+ // had this in the contribs map! (because of the if above).This
+ // sample can either go to the previous partInd or this partInd
+ // in the final sort reduce stage. That is where the amount of
+ // contribution to each partInd will matter and influence the choice.
+ contribs.get(sample).put(partInd, 1);
+ }
+ }
+ for(Entry<PigNullableWritable, CountingMap<Integer>> ent : contribs.entrySet()){
+ PigNullableWritable key = ent.getKey(); // sample item which repeats
+
+ // this map will have the contributions of the sample item to the different partitions
+ CountingMap<Integer> value = ent.getValue();
+
+ long total = value.getTotalCount();
+ float[] probVec = new float[numQuantiles];
+ // for each partition that this sample item is present in,
+ // compute the fraction of the total occurences for that
+ // partition - this will be the probability with which we
+ // will pick this partition in the final sort reduce job
+ // for this sample item
+ for (Entry<Integer,Integer> valEnt : value.entrySet()) {
+ probVec[valEnt.getKey()] = (float)valEnt.getValue()/total;
+ }
+// weightedParts.put(key, new DiscreteProbabilitySampleGenerator(11317,probVec));
+ weightedParts.put(key, new DiscreteProbabilitySampleGenerator(probVec));
+ }
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ private PigNullableWritable getPigNullableWritable(Tuple t) {
+ try {
+ // user comparators work with tuples - so if user comparator
+ // is being used OR if there are more than 1 sort cols, use
+ // NullableTuple
+ if ("true".equals(job.get("pig.usercomparator")) || t.size() > 1) {
+ return new NullableTuple(t);
+ } else {
+ Object o = t.get(0);
+ String kts = job.get("pig.reduce.key.type");
+ if (kts == null) {
+ throw new RuntimeException("Didn't get reduce key type "
+ + "from config file.");
+ }
+ return HDataType.getWritableComparableTypes(o,
+ Byte.valueOf(kts));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean areEqual(PigNullableWritable sample, PigNullableWritable writable) {
+ return comparator.compare(sample, writable)==0;
+ }
+
+ private void convertToArray(
+ ArrayList<PigNullableWritable> q) {
+ if ("true".equals(job.get("pig.usercomparator")) ||
+ q.get(0).getClass().equals(NullableTuple.class)) {
+ quantiles = q.toArray(new NullableTuple[0]);
+ } else if (q.get(0).getClass().equals(NullableBytesWritable.class)) {
+ quantiles = q.toArray(new NullableBytesWritable[0]);
+ } else if (q.get(0).getClass().equals(NullableDoubleWritable.class)) {
+ quantiles = q.toArray(new NullableDoubleWritable[0]);
+ } else if (q.get(0).getClass().equals(NullableFloatWritable.class)) {
+ quantiles = q.toArray(new NullableFloatWritable[0]);
+ } else if (q.get(0).getClass().equals(NullableIntWritable.class)) {
+ quantiles = q.toArray(new NullableIntWritable[0]);
+ } else if (q.get(0).getClass().equals(NullableLongWritable.class)) {
+ quantiles = q.toArray(new NullableLongWritable[0]);
+ } else if (q.get(0).getClass().equals(NullableText.class)) {
+ quantiles = q.toArray(new NullableText[0]);
+ } else {
+ throw new RuntimeException("Unexpected class in " + this.getClass().getSimpleName());
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultDataBag.java Wed Feb 18 19:22:00 2009
@@ -60,6 +60,7 @@
*/
public DefaultDataBag(List<Tuple> listOfTuples) {
mContents = listOfTuples;
+ mSize = listOfTuples.size();
}
public boolean isSorted() {
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/PigNullableWritable.java Wed Feb 18 19:22:00 2009
@@ -129,6 +129,13 @@
else return mValue.hashCode();
}
+
+
+ @Override
+ public boolean equals(Object arg0) {
+ return compareTo(arg0)==0;
+ }
+
@Override
public String toString() {
return new String("Null: " + mNull + " index: " + mIndex + " " +
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Wed Feb 18 19:22:00 2009
@@ -376,10 +376,7 @@
pigServer.registerQuery("B = ORDER A BY $0 using " + TupComp.class.getName() + ";");
}
}
- pigServer.store("B", tmpOutputFile);
-
- pigServer.registerQuery("A = load '" + tmpOutputFile + "';");
- Iterator<Tuple> iter = pigServer.openIterator("A");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
String last = "";
HashSet<Integer> seen = new HashSet<Integer>();
if(!iter.hasNext()) fail("No Results obtained");
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMRCompiler.java Wed Feb 18 19:22:00 2009
@@ -31,11 +31,11 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.COUNT;
import org.apache.pig.builtin.SUM;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.GFCross;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
@@ -59,6 +59,17 @@
import org.junit.Before;
import org.junit.Test;
+/**
+ * Test cases to test the MRCompiler.
+ * VERY IMPORTANT NOTE: The tests here compare results with a
+ * "golden" set of outputs. In each testcase here, the operators
+ * generated have a random operator key which uses Java's Random
+ * class. So if there is a code change which changes the number of
+ * operators created in a plan, then not only will the "golden" file
+ * for that test case need to be changed, but also for the tests
+ * that follow it since the operator keys that will be generated through
+ * Random will be different.
+ */
public class TestMRCompiler extends junit.framework.TestCase {
// MiniCluster cluster = MiniCluster.buildCluster();
@@ -82,6 +93,13 @@
LogicalPlanTester planTester = new LogicalPlanTester() ;
+ // if for some reason, the golden files need
+ // to be regenerated, set this to true - THIS
+ // WILL OVERWRITE THE GOLDEN FILES - So use this
+ // with caution and only for the testcases you need
+ // and are sure of
+ private boolean generate = true;
+
@Before
public void setUp() throws ExecException {
GenPhyOp.setR(r);
@@ -701,6 +719,7 @@
PhysicalPlan ldFil1 = GenPhyOp.loadedFilter();
php.merge(ldFil1);
+ // set up order by *
String funcName = WeirdComparator.class.getName();
POUserComparisonFunc comparator = new POUserComparisonFunc(
new OperatorKey("", r.nextLong()), -1, null, new FuncSpec(funcName));
@@ -727,18 +746,21 @@
php.add(sort);
php.connect(ldFil1.getLeaves().get(0), sort);
-
+ // have a foreach which takes the sort output
+ // and send it two two udfs
List<String> udfs = new ArrayList<String>();
- udfs.add(FindQuantiles.class.getName());
+ udfs.add(COUNT.class.getName());
udfs.add(SUM.class.getName());
POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs);
php.add(fe3);
php.connect(sort, fe3);
+ // add a group above the foreach
PhysicalPlan grpChain1 = GenPhyOp.grpChain();
php.merge(grpChain1);
php.connect(fe3,grpChain1.getRoots().get(0));
+
udfs.clear();
udfs.add(AVG.class.getName());
POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
@@ -832,7 +854,9 @@
/**
* Test to ensure that the order by without parallel followed by a limit, i.e., top k
- * always produces the correct number of map reduce jobs
+ * always produces the correct number of map reduce jobs. In the testcase below since
+ * we are running the unit test locally, we will get reduce parallelism as 1. So we will
+ * NOT introduce the extra MR job to do a final limit
*/
@Test
public void testNumReducersInLimit() throws Exception {
@@ -850,7 +874,7 @@
mrOper = mrPlan.getSuccessors(mrOper).get(0);
++count;
}
- assertTrue(count == 4);
+ assertTrue(count == 3);
}
/**
@@ -908,6 +932,11 @@
ppp.print(baos);
compiledPlan = baos.toString();
+ if(generate ){
+ FileOutputStream fos = new FileOutputStream(expectedFile);
+ fos.write(baos.toByteArray());
+ return;
+ }
FileInputStream fis = new FileInputStream(expectedFile);
byte[] b = new byte[MAX_SIZE];
int len = fis.read(b);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld Wed Feb 18 19:22:00 2009
@@ -1,4 +1,4 @@
-MapReduce(1,GFCross) - -156:
+MapReduce(1,GFCross) - -153:
| Store(DummyFil:DummyLdr) - --5683415113785058706
| |
| |---New For Each(false)[tuple] - --8002381389674382470
@@ -10,10 +10,10 @@
| |---Package[tuple]{Unknown} - --885269774183211482
| Local Rearrange[tuple]{Unknown}(false) - --776319888013965510
| |
-| |---Load(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -155
+| |---Load(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -152
|
-|---MapReduce(1,AVG) - -153:
- | Store(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -154
+|---MapReduce(1,AVG) - -150:
+ | Store(/tmp/temp-1456742965/tmp-586682361:org.apache.pig.builtin.BinStorage) - -151
| |
| |---New For Each(false)[tuple] - -7965768498188214494
| | |
@@ -24,14 +24,14 @@
| |---Package[tuple]{Unknown} - --7335024873119453444
| Local Rearrange[tuple]{Unknown}(false) - -4589138876054328603
| |
- | |---Load(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -152
+ | |---Load(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -149
|
- |---MapReduce(20,TestMRCompiler$WeirdComparator,FindQuantiles,SUM) - -145:
- | Store(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -151
+ |---MapReduce(20,TestMRCompiler$WeirdComparator,COUNT,SUM) - -142:
+ | Store(/tmp/temp-1456742965/tmp-26634357:org.apache.pig.builtin.BinStorage) - -148
| |
| |---New For Each(false,false)[tuple] - --4248200967728536480
| | |
- | | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - -8767305735755351861
+ | | POUserFunc(org.apache.pig.builtin.COUNT)[tuple] - -8767305735755351861
| | |
| | |---Project[tuple][*] - --5908426805312852480
| | |
@@ -39,37 +39,31 @@
| | |
| | |---Project[tuple][*] - --1848504978980807369
| |
- | |---New For Each(true)[tuple] - -150
+ | |---New For Each(true)[tuple] - -147
| | |
- | | Project[bag][1] - -149
+ | | Project[bag][1] - -146
| |
- | |---Package[tuple]{tuple} - -148
- | Local Rearrange[tuple]{tuple}(false) - -147
+ | |---Package[tuple]{tuple} - -145
+ | Local Rearrange[tuple]{tuple}(false) - -144
| | |
- | | Project[tuple][*] - -146
+ | | Project[tuple][*] - -143
| |
- | |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -144
+ | |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -141
|
- |---MapReduce(1,FindQuantiles,TestMRCompiler$WeirdComparator) - -130:
- | Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -143
+ |---MapReduce(1,TestMRCompiler$WeirdComparator) - -130:
+ | Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -140
| |
- | |---New For Each(false)[tuple] - -142
+ | |---New For Each(false,false)[tuple] - -139
| | |
- | | POUserFunc(org.apache.pig.impl.builtin.FindQuantiles)[tuple] - -141
+ | | Constant(20) - -138
| | |
- | | |---Project[tuple][*] - -140
+ | | POSort[bag](org.apache.pig.test.TestMRCompiler$WeirdComparator) - --8479692259657755370
+ | | | |
+ | | | Project[tuple][*] - -137
+ | | |
+ | | |---Project[tuple][1] - -136
| |
- | |---New For Each(false,false)[tuple] - -139
- | | |
- | | Constant(20) - -138
- | | |
- | | POSort[bag](org.apache.pig.test.TestMRCompiler$WeirdComparator) - --8479692259657755370
- | | | |
- | | | Project[tuple][*] - -137
- | | |
- | | |---Project[tuple][1] - -136
- | |
- | |---Package[tuple]{chararray} - -135
+ | |---Package[tuple]{chararray} - -135
| Local Rearrange[tuple]{chararray}(false) - -134
| | |
| | Constant(all) - -133
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld Wed Feb 18 19:22:00 2009
@@ -1,36 +1,36 @@
-MapReduce(-1) - -170:
+MapReduce(-1) - -167:
| Store(DummyFil:DummyLdr) - -7973970339130605847
| |
-| |---New For Each(true)[bag] - -173
+| |---New For Each(true)[bag] - -170
| | |
-| | Project[tuple][0] - -172
+| | Project[tuple][0] - -169
| |
-| |---Package[tuple]{tuple} - -171
-| Local Rearrange[tuple]{tuple}(true) - -167
+| |---Package[tuple]{tuple} - -168
+| Local Rearrange[tuple]{tuple}(true) - -164
| | |
-| | Project[tuple][*] - -166
+| | Project[tuple][*] - -163
| |
-| |---Load(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -169
+| |---Load(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -166
|
-|---MapReduce(-1) - -165:
- | Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -168
+|---MapReduce(-1) - -162:
+ | Store(/tmp/temp-1456742965/tmp2077335416:org.apache.pig.builtin.BinStorage) - -165
| |
| |---Package[tuple]{Unknown} - -2082992246427879202
| Local Rearrange[tuple]{Unknown}(false) - --3148893660811981376
| |
- | |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -164
+ | |---Load(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -161
|
- |---MapReduce(-1) - -157:
- | Store(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -163
+ |---MapReduce(-1) - -154:
+ | Store(/tmp/temp-1456742965/tmp-1456742965:org.apache.pig.builtin.BinStorage) - -160
| |
- | |---New For Each(true)[bag] - -162
+ | |---New For Each(true)[bag] - -159
| | |
- | | Project[tuple][0] - -161
+ | | Project[tuple][0] - -158
| |
- | |---Package[tuple]{tuple} - -160
- | Local Rearrange[tuple]{tuple}(true) - -159
+ | |---Package[tuple]{tuple} - -157
+ | Local Rearrange[tuple]{tuple}(true) - -156
| | |
- | | Project[tuple][*] - -158
+ | | Project[tuple][*] - -155
| |
| |---Filter[tuple] - --7926255547935388282
| |
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld Wed Feb 18 19:22:00 2009
@@ -1,17 +1,17 @@
-MapReduce(1) - -174:
+MapReduce(1) - -171:
| Store(DummyFil:DummyLdr) - -7856319821130535798
| |
-| |---Limit - -180
+| |---Limit - -177
| |
-| |---New For Each(true)[bag] - -179
+| |---New For Each(true)[bag] - -176
| | |
-| | Project[tuple][1] - -178
+| | Project[tuple][1] - -175
| |
-| |---Package[tuple]{tuple} - -177
-| Local Rearrange[tuple]{tuple}(false) - -176
+| |---Package[tuple]{tuple} - -174
+| Local Rearrange[tuple]{tuple}(false) - -173
| | |
-| | Project[tuple][*] - -175
+| | Project[tuple][*] - -172
| |
| |---Limit - -7398260302074824818
| |
-| |---Load(DummyFil:DummyLdr) - -4188863770717253580
+| |---Load(DummyFil:DummyLdr) - -4188863770717253580
\ No newline at end of file
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=745623&r1=745622&r2=745623&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Wed Feb 18 19:22:00 2009
@@ -30,7 +30,6 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;