You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/12/24 01:44:51 UTC
svn commit: r729182 [1/2] - in /hadoop/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/ph...
Author: olga
Date: Tue Dec 23 16:44:50 2008
New Revision: 729182
URL: http://svn.apache.org/viewvc?rev=729182&view=rev
Log:
PIG-563: support for multiple combiner invocations
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/SUM.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/StringMax.java
hadoop/pig/branches/types/src/org/apache/pig/builtin/StringMin.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Dec 23 16:44:50 2008
@@ -343,3 +343,5 @@
PIG-6: Add load support from hbase (hustlmsp via gates).
PIG-522: make negation work (pradeepk via olgan)
+
+ PIG-563: support for multiple combiner invocations
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue Dec 23 16:44:50 2008
@@ -28,17 +28,20 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POPrinter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -167,12 +170,29 @@
if (ap != null) {
log.info("Choosing to move algebraic foreach to combiner");
- // Need to insert a foreach in the combine plan. It will
- // have one inner plan for each inner plan in the foreach
- // we're duplicating. For projections, the plan will be
- // the same. For algebraic udfs, the plan will have the
- // initial version of the function. The reduce plan will
- // be changed to have the final version.
+ // Need to insert two new foreachs - one in the combine
+ // and one in the map plan which will be based on the reduce foreach.
+ // The map foreach will have one inner plan for each
+ // inner plan in the foreach we're duplicating. For
+ // projections, the plan will be the same. For algebraic
+ // udfs, the plan will have the initial version of the function.
+
+ // The combine foreach will have one inner plan for each
+ // inner plan in the foreach we're duplicating. For
+ // projections, the project operators will be changed to
+ // project the same column as its position in the
+ // foreach. For algebraic udfs, the plan will have the
+ // intermediate version of the function. The input to the
+ // udf will be a POProject which will project the column
+ // corresponding to the position of the udf in the foreach
+
+ // In the inner plans of the reduce foreach for
+ // projections, the project operators will be changed to
+ // project the same column as its position in the
+ // foreach. For algebraic udfs, the plan will have the
+ // final version of the function. The input to the
+ // udf will be a POProject which will project the column
+ // corresponding to the position of the udf in the foreach
if (mr.combinePlan.getRoots().size() != 0) {
log.warn("Wasn't expecting to find anything already "
+ "in the combiner!");
@@ -185,27 +205,14 @@
if (mKeyType == 0) {
mKeyType = rearrange.getKeyType();
}
- POPackage cp = pack.clone();
- mr.combinePlan.add(cp);
+
+ POForEach mfe = foreach.clone();
POForEach cfe = foreach.clone();
- fixUpForeachs(cfe, foreach, ap);
- mr.combinePlan.add(cfe);
- mr.combinePlan.connect(cp, cfe);
- // No need to connect projections in cfe to cp, because
- // PigCombiner directly attaches output from package to
- // root of remaining plan.
- POLocalRearrange clr = rearrange.clone();
- fixUpRearrange(clr);
- mr.combinePlan.add(clr);
- mr.combinePlan.connect(cfe, clr);
+ fixUpForeachs(mfe, cfe, foreach, ap);
- // stream input to the algebraics in the
- // combine plan
- LastInputStreamingOptimizer.replaceWithPOJoinPackage(
- mr.combinePlan, cp, cfe, chunkSize);
// Use the ExprType list returned from algebraic to tell
- // POPostCombinerPackage which fields need projected and
+ // POCombinerPackage which fields need projected and
// which placed in bags.
int numFields = (mKeyField >= ap.size()) ? mKeyField + 1 :
ap.size();
@@ -215,12 +222,47 @@
else bags[i] = true;
}
bags[mKeyField] = false;
+ // Use the POCombiner package in the combine plan
+ // as it needs to act differently than the regular
+ // package operator.
+ POCombinerPackage combinePack =
+ new POCombinerPackage(pack, bags);
+ mr.combinePlan.add(combinePack);
+ mr.combinePlan.add(cfe);
+ mr.combinePlan.connect(combinePack, cfe);
+ // No need to connect projections in cfe to cp, because
+ // PigCombiner directly attaches output from package to
+ // root of remaining plan.
+
+ POLocalRearrange mlr = rearrange.clone();
+ fixUpRearrange(mlr);
+
+ // A specialized local rearrange operator will replace
+ // the normal local rearrange in the map plan. This behaves
+ // like the regular local rearrange in the getNext()
+ // as far as getting its input and constructing the
+ // "key" out of the input. It then returns a tuple with
+ // two fields - the key in the first position and the
+ // "value" inside a bag in the second position. This output
+ // format resembles the format out of a Package. This output
+ // will feed to the map foreach which expects this format.
+ // If the key field isn't in the project of the combiner or map foreach,
+ // it is added to the end (This is required so that we can
+ // set up the inner plan of the new Local Rearrange leaf in the map
+ // and combine plan to contain just the project of the key).
+ patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
+ POLocalRearrange clr = rearrange.clone();
+ fixUpRearrange(clr);
+
+ mr.combinePlan.add(clr);
+ mr.combinePlan.connect(cfe, clr);
+
// Change the package operator in the reduce plan to
- // be the post combiner package, as it needs to act
+ // be the POCombiner package, as it needs to act
// differently than the regular package operator.
- POPostCombinerPackage newPack =
- new POPostCombinerPackage(pack, bags);
- mr.reducePlan.replace(pack, newPack);
+ POCombinerPackage newReducePack =
+ new POCombinerPackage(pack, bags);
+ mr.reducePlan.replace(pack, newReducePack);
// the replace() above only changes
// the plan and does not change "inputs" to
@@ -228,8 +270,8 @@
// set up "inputs" for the operator after
// package correctly
List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
- packList.add(newPack);
- List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newPack);
+ packList.add(newReducePack);
+ List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
// there should be only one successor to package
sucs.get(0).setInputs(packList);
} catch (Exception e) {
@@ -239,6 +281,40 @@
}
}
+ /**
+ * @param mapPlan
+ * @param preCombinerLR
+ * @param mfe
+ * @param mlr
+ * @throws PlanException
+ */
+ private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
+ POForEach mfe, POLocalRearrange mlr) throws PlanException {
+
+ POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
+ mapPlan.replace(oldLR, preCombinerLR);
+
+ mapPlan.add(mfe);
+ mapPlan.connect(preCombinerLR, mfe);
+
+ mapPlan.add(mlr);
+ mapPlan.connect(mfe, mlr);
+ }
+
+ /**
+ * @param rearrange
+ * @return
+ */
+ private POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) {
+
+ String scope = rearrange.getOperatorKey().scope;
+ POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
+ new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)),
+ rearrange.getRequestedParallelism(), rearrange.getInputs());
+ pclr.setPlans(rearrange.getPlans());
+ return pclr;
+ }
+
/*
private boolean onKeysOnly(PhysicalPlan pp) {
// TODO
@@ -282,7 +358,7 @@
// Don't know what this is, but it isn't algebraic
return ExprType.NOT_ALGEBRAIC;
}
-
+
// Check that it doesn't have anything in the nested plan that I
// can't make algebraic. At this point this is just filters and
// foreach. Filters are left out because they are not necessarily
@@ -326,43 +402,54 @@
// Returns number of fields that this will project, including the added
// key field if that is necessary
private void fixUpForeachs(
+ POForEach mfe, // map foreach
POForEach cfe, // combiner foreach
POForEach rfe, // reducer foreach
List<ExprType> exprs) throws PlanException {
+ List<PhysicalPlan> mPlans = mfe.getInputPlans();
List<PhysicalPlan> cPlans = cfe.getInputPlans();
List<PhysicalPlan> rPlans = rfe.getInputPlans();
for (int i = 0; i < exprs.size(); i++) {
if (exprs.get(i) == ExprType.ALGEBRAIC) {
- changeFunc(cfe, cPlans.get(i), POUserFunc.INITIAL);
+ changeFunc(mfe, mPlans.get(i), POUserFunc.INITIAL);
+ changeFunc(cfe, cPlans.get(i), POUserFunc.INTERMEDIATE);
changeFunc(rfe, rPlans.get(i), POUserFunc.FINAL);
}
}
- // Set flattens for combiner ForEach to false
- List<Boolean> cfeFlattens = new ArrayList<Boolean>(cPlans.size());
+ // Set flattens for map and combiner ForEach to false
+ List<Boolean> feFlattens = new ArrayList<Boolean>(cPlans.size());
for (int i = 0; i < cPlans.size(); i++) {
- cfeFlattens.add(false);
+ feFlattens.add(false);
}
- cfe.setToBeFlattened(cfeFlattens);
+ mfe.setToBeFlattened(feFlattens);
+ cfe.setToBeFlattened(feFlattens);
- // If the key field isn't in the project of the combiner foreach, add
- // it to the end.
+ // If the key field isn't in the project of the combiner or map foreach, add
+ // it to the end (This is required so that we can set up the inner plan
+ // of the new Local Rearrange in the map and combine plan to contain just the
+ // project of the key).
if (mKeyField == -1) {
- PhysicalPlan newPlan = new PhysicalPlan();
- String scope = cfe.getOperatorKey().scope;
- POProject proj = new POProject(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
- proj.setResultType(mKeyType);
- newPlan.add(proj);
- cfe.addInputPlan(newPlan, false);
+ addKeyProject(mfe);
+ addKeyProject(cfe);
mKeyField = cPlans.size() - 1;
}
- // Change the plans on the reduce foreach to project from the column
- // they are in. UDFs will be left the same but their
+ // Change the plans on the reduce/combine foreach to project from the column
+ // they are in ( we just want to take output from the combine and
+ // use that as input in the reduce/combine plan). UDFs will be left the same but their
// inputs altered. Any straight projections will also be altered.
- for (int i = 0; i < rPlans.size(); i++) {
- List<PhysicalOperator> leaves = rPlans.get(i).getLeaves();
+ fixProjectAndInputs(cPlans);
+ fixProjectAndInputs(rPlans);
+ }
+
+ /**
+ * @param plans
+ * @throws PlanException
+ */
+ private void fixProjectAndInputs(List<PhysicalPlan> plans) throws PlanException {
+ for (int i = 0; i < plans.size(); i++) {
+ List<PhysicalOperator> leaves = plans.get(i).getLeaves();
if (leaves == null || leaves.size() != 1) {
throw new RuntimeException("Expected to find plan with single leaf!");
}
@@ -378,15 +465,29 @@
leaf.getRequestedParallelism(), i);
proj.setResultType(DataType.BAG);
// Remove old connections and elements from the plan
- rPlans.get(i).trimAbove(leaf);
- rPlans.get(i).add(proj);
- rPlans.get(i).connect(proj, leaf);
+ plans.get(i).trimAbove(leaf);
+ plans.get(i).add(proj);
+ plans.get(i).connect(proj, leaf);
List<PhysicalOperator> inputs =
new ArrayList<PhysicalOperator>(1);
inputs.add(proj);
leaf.setInputs(inputs);
}
}
+
+ }
+
+ /**
+ * @param fe
+ */
+ private void addKeyProject(POForEach fe) {
+ PhysicalPlan newForEachInnerPlan = new PhysicalPlan();
+ String scope = fe.getOperatorKey().scope;
+ POProject proj = new POProject(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)), -1, 0);
+ proj.setResultType(mKeyType);
+ newForEachInnerPlan.add(proj);
+ fe.addInputPlan(newForEachInnerPlan, false);
}
private void changeFunc(POForEach fe, PhysicalPlan plan, byte type) {
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Dec 23 16:44:50 2008
@@ -335,7 +335,6 @@
jobConf.setCombinerClass(PigCombiner.Combine.class);
jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
- jobConf.setCombineOnceOnly(true);
} else if (mro.needsDistinctCombiner()) {
jobConf.setCombinerClass(DistinctCombiner.Combine.class);
log.info("Setting identity combiner class.");
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Tue Dec 23 16:44:50 2008
@@ -189,8 +189,12 @@
continue;
if(redRes.returnStatus==POStatus.STATUS_ERR){
- IOException ioe = new IOException("Received Error while " +
- "processing the reduce plan.");
+ String msg = "Received Error while " +
+ "processing the combine plan.";
+ if(redRes.result != null) {
+ msg += redRes.result;
+ }
+ IOException ioe = new IOException(msg);
throw ioe;
}
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Tue Dec 23 16:44:50 2008
@@ -29,7 +29,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Pair;
@@ -75,7 +75,7 @@
if(pkg != null) {
// if the POPackage is actually a POPostCombinerPackage, then we should
// just look for the corresponding LocalRearrange(s) in the combine plan
- if(pkg instanceof POPostCombinerPackage) {
+ if(pkg instanceof POCombinerPackage) {
if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
throw new VisitorException("Unexpected problem while trying " +
"to optimize (could not find LORearrange in combine plan)");
@@ -152,7 +152,7 @@
* @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
*/
@Override
- public void visitCombinerPackage(POPostCombinerPackage pkg)
+ public void visitCombinerPackage(POCombinerPackage pkg)
throws VisitorException {
this.pkg = pkg;
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Tue Dec 23 16:44:50 2008
@@ -76,7 +76,7 @@
//do nothing
}
- public void visitCombinerPackage(POPostCombinerPackage pkg) throws VisitorException{
+ public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
//do nothing
}
@@ -261,6 +261,15 @@
}
+ /**
+ * @param preCombinerLocalRearrange
+ */
+ public void visitPreCombinerLocalRearrange(
+ POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+ // TODO Auto-generated method stub
+
+ }
+
public void visitCross(POCross cross) {
// TODO Auto-generated method stub
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPostCombinerPackage.java Tue Dec 23 16:44:50 2008
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.NullableTuple;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.Pair;
-/**
- * The package operator that packages the globally rearranged tuples into
- * output format after the combiner stage. It differs from POPackage in that
- * instead it does not use the index in the NullableTuple to find the bag to put a
- * tuple in. Intead, the inputs are
- * put in a bag corresponding to their offset in the tuple.
- */
-public class POPostCombinerPackage extends POPackage {
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- private final Log log = LogFactory.getLog(getClass());
-
- private static BagFactory mBagFactory = BagFactory.getInstance();
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
- private boolean[] mBags; // For each field, indicates whether or not it
- // needs to be put in a bag.
-
- /**
- * A new POPostCombinePackage will be constructed as a near clone of the
- * provided POPackage.
- * @param pkg POPackage to clone.
- * @param bags for each field, indicates whether it should be a bag (true)
- * or a simple field (false).
- */
- public POPostCombinerPackage(POPackage pkg, boolean[] bags) {
- super(new OperatorKey(pkg.getOperatorKey().scope,
- NodeIdGenerator.getGenerator().getNextNodeId(pkg.getOperatorKey().scope)),
- pkg.getRequestedParallelism(), pkg.getInputs());
- resultType = pkg.getResultType();
- keyType = pkg.keyType;
- numInputs = 1;
- inner = new boolean[1];
- for (int i = 0; i < pkg.inner.length; i++) {
- inner[i] = true;
- }
- mBags = bags;
- }
-
- @Override
- public String name() {
- return "PostCombinerPackage" + "[" + DataType.findTypeName(resultType) + "]" + "{" + DataType.findTypeName(keyType) + "}" +" - " + mKey.toString();
- }
-
- @Override
- public Result getNext(Tuple t) throws ExecException {
- int keyField = -1;
- //Create numInputs bags
- Object[] fields = new Object[mBags.length];
- for (int i = 0; i < mBags.length; i++) {
- if (mBags[i]) fields[i] = mBagFactory.newDefaultBag();
- }
-
- // For each indexed tup in the inp, split them up and place their
- // fields into the proper bags. If the given field isn't a bag, just
- // set the value as is.
- while (tupIter.hasNext()) {
- NullableTuple ntup = tupIter.next();
- Tuple tup = (Tuple)ntup.getValueAsPigType();
- // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
- // group case and not in cogroups. So there should only
- // be one LocalRearrange from which we get the keyInfo for
- // which field in the value is in the key. This LocalRearrange
- // has an index of -1. When we do support combiner in Cogroups
- // THIS WILL NEED TO BE REVISITED.
- Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
- keyInfo.get(0); // assumption: only group are "combinable", hence index 0
- Map<Integer, Integer> keyLookup = lrKeyInfo.second;
- int tupIndex = 0; // an index for accessing elements from
- // the value (tup) that we have currently
- for(int i = 0; i < mBags.length; i++) {
- Integer keyIndex = keyLookup.get(i);
- if(keyIndex == null) {
- // the field for this index is not the
- // key - so just take it from the "value"
- // we were handed - Currently THIS HAS TO BE A BAG
- // In future if this changes, THIS WILL NEED TO BE
- // REVISITED.
- ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
- tupIndex++;
- } else {
- // the field for this index is in the key
- fields[i] = key;
- }
- }
- }
-
- // The successor of the POPostCombinerPackage as of
- // now SHOULD be a POForeach which has been adjusted
- // to look for the key in the right place - so we will
- // NOT be adding the key in the result here but mere
- // putting all bags into a result tuple and returning it.
- Tuple res;
- res = mTupleFactory.newTuple(mBags.length);
- for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
- Result r = new Result();
- r.result = res;
- r.returnStatus = POStatus.STATUS_OK;
- return r;
-
- }
-
-}
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/AVG.java Tue Dec 23 16:44:50 2008
@@ -26,11 +26,13 @@
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -70,7 +72,7 @@
}
public String getIntermed() {
- return Intermed.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -80,13 +82,26 @@
static public class Initial extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
+ Tuple t = mTupleFactory.newTuple(2);
try {
- Tuple t = mTupleFactory.newTuple(2);
- t.set(0, sum(input));
- t.set(1, new Long(count(input)));
+ // input is a bag with one tuple containing
+ // the column we are trying to avg
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ DataByteArray dba = (DataByteArray)tp.get(0);
+ t.set(0, dba != null ? Double.valueOf(dba.toString()) : null);
+ t.set(1, 1L);
+ return t;
+ } catch(NumberFormatException nfe) {
+ // invalid input,
+ // treat this input as null
+ try {
+ t.set(0, null);
+ t.set(1, 1L);
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap(e);
+ }
return t;
- } catch(RuntimeException t) {
- throw new RuntimeException(t.getMessage() + ": " + input);
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
oughtToBeEE.initCause(ee);
@@ -96,7 +111,7 @@
}
}
- static public class Intermed extends EvalFunc<Tuple> {
+ static public class Intermediate extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
try {
@@ -151,8 +166,15 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
Double d = (Double)t.get(0);
- if(d == null) continue;
- sawNonNull = true;
+ // we count nulls in avg as contributing 0
+ // a departure from SQL for performance of
+ // COUNT() which implemented by just inspecting
+ // size of the bag
+ if(d == null) {
+ d = 0.0;
+ } else {
+ sawNonNull = true;
+ }
sum += d;
count += (Long)t.get(1);
}
@@ -183,13 +205,11 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
try{
- Double d = DataType.toDouble(t.get(0));
+ DataByteArray dba = (DataByteArray)t.get(0);
+ Double d = dba != null ? Double.valueOf(dba.toString()) : null;
if (d == null) continue;
sawNonNull = true;
sum += d;
- }catch(NumberFormatException nfe){
- // do nothing - essentially treat this
- // particular input as null
}catch(RuntimeException exp) {
ExecException newE = new ExecException("Error processing: " +
t.toString() + exp.getMessage(), exp);
@@ -203,7 +223,7 @@
return null;
}
}
-
+
@Override
public Schema outputSchema(Schema input) {
return new Schema(new Schema.FieldSchema(null, DataType.DOUBLE));
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Tue Dec 23 16:44:50 2008
@@ -41,7 +41,8 @@
@Override
public Long exec(Tuple input) throws IOException {
try {
- return count(input);
+ DataBag bag = (DataBag)input.get(0);
+ return bag.size();
} catch (ExecException ee) {
throw WrappedIOException.wrap("Caught exception in COUNT", ee);
}
@@ -52,7 +53,7 @@
}
public String getIntermed() {
- return Intermed.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,21 +64,20 @@
@Override
public Tuple exec(Tuple input) throws IOException {
- try {
- return mTupleFactory.newTuple(count(input));
- } catch (ExecException ee) {
- throw WrappedIOException.wrap(
- "Caught exception in COUNT.Initial", ee);
- }
+ // Since Initial is guaranteed to be called
+ // only in the map, it will be called with an
+ // input of a bag with a single tuple - the
+ // count should always be 1.
+ return mTupleFactory.newTuple(new Long(1));
}
}
- static public class Intermed extends EvalFunc<Tuple> {
+ static public class Intermediate extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
try {
- return mTupleFactory.newTuple(count(input));
+ return mTupleFactory.newTuple(sum(input));
} catch (ExecException ee) {
throw WrappedIOException.wrap(
"Caught exception in COUNT.Intermed", ee);
@@ -97,17 +97,6 @@
}
}
- static protected Long count(Tuple input) throws ExecException {
- Object values = input.get(0);
- if (values instanceof DataBag)
- return ((DataBag)values).size();
- else if (values instanceof Map)
- return new Long(((Map)values).size());
- else
- throw new ExecException("Cannot count a " +
- DataType.findTypeName(values));
- }
-
static protected Long sum(Tuple input) throws ExecException, NumberFormatException {
DataBag values = (DataBag)input.get(0);
long sum = 0;
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleAvg.java Tue Dec 23 16:44:50 2008
@@ -69,7 +69,7 @@
}
public String getIntermed() {
- return Intermed.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -81,8 +81,12 @@
public Tuple exec(Tuple input) throws IOException {
try {
Tuple t = mTupleFactory.newTuple(2);
- t.set(0, sum(input));
- t.set(1, new Long(count(input)));
+ // input is a bag with one tuple containing
+ // the column we are trying to avg on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ t.set(0, (Double)(tp.get(0)));
+ t.set(1, 1L);
return t;
} catch(RuntimeException t) {
throw new RuntimeException(t.getMessage() + ": " + input);
@@ -95,7 +99,7 @@
}
}
- static public class Intermed extends EvalFunc<Tuple> {
+ static public class Intermediate extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
try {
@@ -150,8 +154,15 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
Double d = (Double)t.get(0);
- if(d == null) continue;
- sawNonNull = true;
+ // we count nulls in avg as contributing 0
+ // a departure from SQL for performance of
+ // COUNT() which implemented by just inspecting
+ // size of the bag
+ if(d == null) {
+ d = 0.0;
+ } else {
+ sawNonNull = true;
+ }
sum += d;
count += (Long)t.get(1);
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMax.java Tue Dec 23 16:44:50 2008
@@ -51,7 +51,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -64,6 +64,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to max on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Double)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(max(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleMin.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,6 +63,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to min on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Double)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(min(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/DoubleSum.java Tue Dec 23 16:44:50 2008
@@ -31,6 +31,7 @@
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
/**
@@ -54,7 +55,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -66,10 +67,28 @@
@Override
public Tuple exec(Tuple input) throws IOException {
+ // Initial is called in the map - for SUM
+ // we just send the tuple down
+ try {
+ // input is a bag with one tuple containing
+ // the column we are trying to sum
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Double)( tp.get(0)));
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap("Caught exception in DoubleSum.Initial", e);
+ }
+ }
+ }
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
try {
return tfact.newTuple(sum(input));
} catch (ExecException ee) {
- IOException oughtToBeEE = new IOException();
+ IOException oughtToBeEE = new IOException("Caught exception in DoubleSum.Intermediate");
oughtToBeEE.initCause(ee);
throw oughtToBeEE;
}
@@ -81,7 +100,7 @@
try {
return sum(input);
} catch (ExecException ee) {
- IOException oughtToBeEE = new IOException();
+ IOException oughtToBeEE = new IOException("Caught exception in DoubleSum.Final");
oughtToBeEE.initCause(ee);
throw oughtToBeEE;
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatAvg.java Tue Dec 23 16:44:50 2008
@@ -66,7 +66,7 @@
}
public String getIntermed() {
- return Intermed.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -78,8 +78,13 @@
public Tuple exec(Tuple input) throws IOException {
try {
Tuple t = mTupleFactory.newTuple(2);
- t.set(0, sum(input));
- t.set(1, new Long(count(input)));
+ // input is a bag with one tuple containing
+ // the column we are trying to avg on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ Float f = (Float)(tp.get(0));
+ t.set(0, f != null ? new Double(f) : null);
+ t.set(1, 1L);
return t;
} catch(RuntimeException t) {
throw new RuntimeException(t.getMessage() + ": " + input);
@@ -92,7 +97,7 @@
}
}
- static public class Intermed extends EvalFunc<Tuple> {
+ static public class Intermediate extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
try {
@@ -147,8 +152,15 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
Double d = (Double)t.get(0);
- if(d == null) continue;
- sawNonNull = true;
+ // we count nulls in avg as contributing 0
+ // a departure from SQL for performance of
+ // COUNT() which implemented by just inspecting
+ // size of the bag
+ if(d == null) {
+ d = 0.0;
+ } else {
+ sawNonNull = true;
+ }
sum += d;
count += (Long)t.get(1);
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMax.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,6 +63,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to max on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Float)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(max(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatMin.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,6 +63,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to min on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Float)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(min(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/FloatSum.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -62,10 +62,32 @@
@Override
public Tuple exec(Tuple input) throws IOException {
+ // Initial is called in the map - for SUM
+ // we just send the tuple down
try {
- return tfact.newTuple(sum(input));
+ // input is a bag with one tuple containing
+ // the column we are trying to sum
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ // send down a double since intermediate
+ // would be sending a double
+ Float f = (Float)tp.get(0);
+ return tfact.newTuple(f != null ?
+ new Double(f) : null);
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap("Caught exception in FloatSum.Initial", e);
+ }
+ }
+ }
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ return tfact.newTuple(sumDoubles(input));
} catch (ExecException ee) {
- throw WrappedIOException.wrap("Caught exception in FloatSum.Initial", ee);
+ throw WrappedIOException.wrap("Caught exception in FloatSum.Intermediate", ee);
}
}
}
@@ -73,42 +95,48 @@
@Override
public Double exec(Tuple input) throws IOException {
try {
- // Can't just call sum, because the intermediate results are
- // now Doubles insteads of Floats.
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- // this is in compliance with SQL standard
- if(values.size() == 0) {
- return null;
- }
-
- double sum = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = (Tuple) it.next();
- try {
- Double d = (Double)(t.get(0));
- if (d == null) continue;
- sawNonNull = true;
- sum += d;
- }catch(RuntimeException exp) {
- throw WrappedIOException.wrap(
- "Caught exception in FloatSum.Final", exp);
- }
- }
-
-
- if(sawNonNull) {
- return new Double(sum);
- } else {
- return null;
- }
+ return sumDoubles(input);
} catch (ExecException ee) {
throw WrappedIOException.wrap("Caught exception in FloatSum.Final", ee);
}
}
}
+
+ static protected Double sumDoubles(Tuple input) throws ExecException {
+ // Can't just call sum, because the intermediate results are
+ // now Doubles insteads of Floats.
+ DataBag values = (DataBag)input.get(0);
+
+ // if we were handed an empty bag, return NULL
+ // this is in compliance with SQL standard
+ if(values.size() == 0) {
+ return null;
+ }
+
+ double sum = 0;
+ boolean sawNonNull = false;
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = (Tuple) it.next();
+ try {
+ Double d = (Double)(t.get(0));
+ if (d == null) continue;
+ sawNonNull = true;
+ sum += d;
+ }catch(RuntimeException exp) {
+ ExecException newE = new ExecException("Error processing: " +
+ t.toString() + exp.getMessage(), exp);
+ throw newE;
+ }
+ }
+
+
+ if(sawNonNull) {
+ return new Double(sum);
+ } else {
+ return null;
+ }
+
+ }
static protected Double sum(Tuple input) throws ExecException {
DataBag values = (DataBag)input.get(0);
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntAvg.java Tue Dec 23 16:44:50 2008
@@ -66,7 +66,7 @@
}
public String getIntermed() {
- return Intermed.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -76,13 +76,19 @@
static public class Initial extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
+
try {
Tuple t = mTupleFactory.newTuple(2);
- t.set(0, sum(input));
- t.set(1, new Long(count(input)));
+ // input is a bag with one tuple containing
+ // the column we are trying to avg on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ Integer i = (Integer)tp.get(0);
+ t.set(0, i != null ? new Long(i): null);
+ t.set(1, 1L);
return t;
- } catch(RuntimeException t) {
- throw new RuntimeException(t.getMessage() + ": " + input);
+ } catch(RuntimeException e) {
+ throw new RuntimeException(e.getMessage() + ": " + input);
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
oughtToBeEE.initCause(ee);
@@ -92,7 +98,7 @@
}
}
- static public class Intermed extends EvalFunc<Tuple> {
+ static public class Intermediate extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
try {
@@ -147,8 +153,15 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
Long l = (Long)t.get(0);
- if(l == null) continue;
- sawNonNull = true;
+ // we count nulls in avg as contributing 0
+ // a departure from SQL for performance of
+ // COUNT() which implemented by just inspecting
+ // size of the bag
+ if(l == null) {
+ l = 0L;
+ } else {
+ sawNonNull = true;
+ }
sum += l;
count += (Long)t.get(1);
}
@@ -180,6 +193,10 @@
Tuple t = it.next();
try {
Integer i = (Integer)(t.get(0));
+ // we count nulls in avg as contributing 0
+ // a departure from SQL for performance of
+ // COUNT() which implemented by just inspecting
+ // size of the bag
if (i == null) continue;
sawNonNull = true;
sum += i;
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMax.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,6 +63,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to max on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Integer)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(max(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntMin.java Tue Dec 23 16:44:50 2008
@@ -52,7 +52,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -65,6 +65,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to min on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Integer)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(min(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/IntSum.java Tue Dec 23 16:44:50 2008
@@ -49,7 +49,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -61,10 +61,33 @@
@Override
public Tuple exec(Tuple input) throws IOException {
+ // Initial is called in the map - for SUM
+ // we just send the tuple down
try {
- return tfact.newTuple(sum(input));
+ // input is a bag with one tuple containing
+ // the column we are trying to sum
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ Integer i = (Integer)tp.get(0);
+ return tfact.newTuple(i != null ?
+ new Long(i) : null);
+ }catch(NumberFormatException nfe){
+ // treat this particular input as null
+ return tfact.newTuple(null);
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap("Caught exception in IntSum.Initial", e);
+ }
+ }
+ }
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ return tfact.newTuple(sumLongs(input));
} catch (ExecException ee) {
- throw WrappedIOException.wrap("Caught exception in IntSum.Initial", ee);
+ throw WrappedIOException.wrap("Caught exception in IntSum.Intermediate", ee);
}
}
}
@@ -72,43 +95,48 @@
@Override
public Long exec(Tuple input) throws IOException {
try {
- // Can't just call sum, because the intermediate results are
- // now Longs insteads of Integers.
- DataBag values = (DataBag)input.get(0);
-
- // if we were handed an empty bag, return NULL
- // this is in compliance with SQL standard
- if(values.size() == 0) {
- return null;
- }
-
- long sum = 0;
- boolean sawNonNull = false;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = (Tuple) it.next();
- try {
- Long l = (Long)(t.get(0));
- if (l == null) continue;
- sawNonNull = true;
- sum += l;
- }catch(RuntimeException exp) {
- throw WrappedIOException.wrap(
- "Caught exception in IntSum.Final", exp);
- }
- }
-
-
- if(sawNonNull) {
- return new Long(sum);
- } else {
- return null;
- }
- } catch (ExecException ee) {
- throw WrappedIOException.wrap("Caught exception in IntSum.Final", ee);
+ return sumLongs(input);
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap("Caught exception in IntSum.Intermediate", e);
}
}
}
+ static protected Long sumLongs(Tuple input) throws ExecException {
+ // Can't just call sum, because the intermediate results are
+ // now Longs insteads of Integers.
+ DataBag values = (DataBag)input.get(0);
+
+ // if we were handed an empty bag, return NULL
+ // this is in compliance with SQL standard
+ if(values.size() == 0) {
+ return null;
+ }
+
+ long sum = 0;
+ boolean sawNonNull = false;
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = (Tuple) it.next();
+ try {
+ Long l = (Long)(t.get(0));
+ if (l == null) continue;
+ sawNonNull = true;
+ sum += l;
+ }catch(RuntimeException exp) {
+ throw new ExecException(
+ "Error processing: " +
+ t.toString() + exp.getMessage(), exp);
+ }
+ }
+
+
+ if(sawNonNull) {
+ return new Long(sum);
+ } else {
+ return null;
+ }
+ }
+
static protected Long sum(Tuple input) throws ExecException {
DataBag values = (DataBag)input.get(0);
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongAvg.java Tue Dec 23 16:44:50 2008
@@ -66,7 +66,7 @@
}
public String getIntermed() {
- return Intermed.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -78,8 +78,12 @@
public Tuple exec(Tuple input) throws IOException {
try {
Tuple t = mTupleFactory.newTuple(2);
- t.set(0, sum(input));
- t.set(1, new Long(count(input)));
+ // input is a bag with one tuple containing
+ // the column we are trying to avg on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ t.set(0, (Long)(tp.get(0)));
+ t.set(1, 1L);
return t;
} catch(RuntimeException t) {
throw new RuntimeException(t.getMessage() + ": " + input);
@@ -92,7 +96,7 @@
}
}
- static public class Intermed extends EvalFunc<Tuple> {
+ static public class Intermediate extends EvalFunc<Tuple> {
@Override
public Tuple exec(Tuple input) throws IOException {
try {
@@ -147,8 +151,15 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
Long l = (Long)t.get(0);
- if(l == null) continue;
- sawNonNull = true;
+ // we count nulls in avg as contributing 0
+ // a departure from SQL for performance of
+ // COUNT() which implemented by just inspecting
+ // size of the bag
+ if(l == null) {
+ l = 0L;
+ } else {
+ sawNonNull = true;
+ }
sum += l;
count += (Long)t.get(1);
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMax.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,6 +63,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to max on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Long)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(max(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongMin.java Tue Dec 23 16:44:50 2008
@@ -50,7 +50,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,6 +63,25 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
+ // input is a bag with one tuple containing
+ // the column we are trying to min on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple((Long)(tp.get(0)));
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
return tfact.newTuple(min(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/LongSum.java Tue Dec 23 16:44:50 2008
@@ -28,6 +28,7 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
/**
@@ -51,7 +52,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -63,10 +64,28 @@
@Override
public Tuple exec(Tuple input) throws IOException {
+ // Initial is called in the map - for SUM
+ // we just send the tuple down
+ try {
+ // input is a bag with one tuple containing
+ // the column we are trying to sum
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tfact.newTuple( (Long)tp.get(0));
+ } catch (ExecException e) {
+ throw WrappedIOException.wrap("Caught exception in LongSum.Initial", e);
+ }
+ }
+ }
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
try {
return tfact.newTuple(sum(input));
} catch (ExecException ee) {
- IOException oughtToBeEE = new IOException();
+ IOException oughtToBeEE = new IOException("Caught exception in LongSum.Intermediate");
oughtToBeEE.initCause(ee);
throw oughtToBeEE;
}
@@ -78,7 +97,7 @@
try {
return sum(input);
} catch (ExecException ee) {
- IOException oughtToBeEE = new IOException();
+ IOException oughtToBeEE = new IOException("Caught exception in LongSum.Final");
oughtToBeEE.initCause(ee);
throw oughtToBeEE;
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/MAX.java Tue Dec 23 16:44:50 2008
@@ -27,6 +27,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -56,7 +57,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -69,7 +70,30 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
- return tfact.newTuple(max(input));
+ // input is a bag with one tuple containing
+ // the column we are trying to max on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ DataByteArray dba = (DataByteArray)tp.get(0);
+ return tfact.newTuple(dba != null ?
+ Double.valueOf(dba.toString()): null);
+ } catch (NumberFormatException e) {
+ return tfact.newTuple(null);
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ return tfact.newTuple(maxDoubles(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
oughtToBeEE.initCause(ee);
@@ -81,7 +105,7 @@
@Override
public Double exec(Tuple input) throws IOException {
try {
- return max(input);
+ return maxDoubles(input);
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
oughtToBeEE.initCause(ee);
@@ -104,13 +128,49 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
try {
- Double d = DataType.toDouble(t.get(0));
+ DataByteArray dba = (DataByteArray)t.get(0);
+ Double d = dba != null ? Double.valueOf(dba.toString()) : null;
+ if (d == null) continue;
+ sawNonNull = true;
+ curMax = java.lang.Math.max(curMax, d);
+ } catch (RuntimeException exp) {
+ ExecException newE = new ExecException("Error processing: " +
+ t.toString() + exp.getMessage());
+ newE.initCause(exp);
+ throw newE;
+ }
+ }
+
+ if(sawNonNull) {
+ return new Double(curMax);
+ } else {
+ return null;
+ }
+ }
+
+ // same as above function except all its inputs are
+ // always Double - this should be used for better performance
+ // since we don't have to check the type of the object to
+ // decide it is a double. This should be used when the initial,
+ // intermediate and final versions are used.
+ static protected Double maxDoubles(Tuple input) throws ExecException {
+ DataBag values = (DataBag)input.get(0);
+
+ // if we were handed an empty bag, return NULL
+ // this is in compliance with SQL standard
+ if(values.size() == 0) {
+ return null;
+ }
+
+ double curMax = Double.NEGATIVE_INFINITY;
+ boolean sawNonNull = false;
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ try {
+ Double d = (Double)t.get(0);
if (d == null) continue;
sawNonNull = true;
curMax = java.lang.Math.max(curMax, d);
- }catch(NumberFormatException nfe){
- // do nothing - essentially treat this
- // particular input as null
} catch (RuntimeException exp) {
ExecException newE = new ExecException("Error processing: " +
t.toString() + exp.getMessage());
Modified: hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=729182&r1=729181&r2=729182&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/builtin/MIN.java Tue Dec 23 16:44:50 2008
@@ -27,6 +27,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -56,7 +57,7 @@
}
public String getIntermed() {
- return Initial.class.getName();
+ return Intermediate.class.getName();
}
public String getFinal() {
@@ -69,7 +70,31 @@
@Override
public Tuple exec(Tuple input) throws IOException {
try {
- return tfact.newTuple(min(input));
+ // input is a bag with one tuple containing
+ // the column we are trying to min on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ DataByteArray dba = (DataByteArray)tp.get(0);
+ return tfact.newTuple(dba != null?
+ Double.valueOf(dba.toString()) : null);
+ } catch (NumberFormatException e) {
+ // invalid input, send null
+ return tfact.newTuple(null);
+ } catch (ExecException ee) {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ return tfact.newTuple(minDoubles(input));
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
oughtToBeEE.initCause(ee);
@@ -81,7 +106,7 @@
@Override
public Double exec(Tuple input) throws IOException {
try {
- return min(input);
+ return minDoubles(input);
} catch (ExecException ee) {
IOException oughtToBeEE = new IOException();
oughtToBeEE.initCause(ee);
@@ -104,13 +129,49 @@
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
try {
- Double d = DataType.toDouble(t.get(0));
+ DataByteArray dba = (DataByteArray)t.get(0);
+ Double d = dba != null ? Double.valueOf(dba.toString()): null;
+ if (d == null) continue;
+ sawNonNull = true;
+ curMin = java.lang.Math.min(curMin, d);
+ } catch (RuntimeException exp) {
+ ExecException newE = new ExecException("Error processing: " +
+ t.toString() + exp.getMessage());
+ newE.initCause(exp);
+ throw newE;
+ }
+ }
+
+ if(sawNonNull) {
+ return new Double(curMin);
+ } else {
+ return null;
+ }
+ }
+
+ // same as above function except all its inputs are
+ // always Double - this should be used for better performance
+ // since we don't have to check the type of the object to
+ // decide it is a double. This should be used when the initial,
+ // intermediate and final versions are used.
+ static protected Double minDoubles(Tuple input) throws ExecException {
+ DataBag values = (DataBag)input.get(0);
+
+ // if we were handed an empty bag, return NULL
+ // this is in compliance with SQL standard
+ if(values.size() == 0) {
+ return null;
+ }
+
+ double curMin = Double.POSITIVE_INFINITY;
+ boolean sawNonNull = false;
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ try {
+ Double d = (Double)t.get(0);
if (d == null) continue;
sawNonNull = true;
curMin = java.lang.Math.min(curMin, d);
- }catch(NumberFormatException nfe){
- // do nothing - essentially treat this
- // particular input as null
} catch (RuntimeException exp) {
ExecException newE = new ExecException("Error processing: " +
t.toString() + exp.getMessage());