You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/09/15 19:51:30 UTC
svn commit: r1171195 [1/2] - in /pig/trunk: ./ conf/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executi...
Author: thejas
Date: Thu Sep 15 17:51:27 2011
New Revision: 1171195
URL: http://svn.apache.org/viewvc?rev=1171195&view=rev
Log:
PIG-2228: support partial aggregation in map task (thejas)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
pig/trunk/src/org/apache/pig/data/SizeUtil.java
pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/conf/pig.properties
pig/trunk/src/org/apache/pig/Algebraic.java
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
pig/trunk/src/org/apache/pig/data/DefaultTuple.java
pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/org/apache/pig/test/TestDataBag.java
pig/trunk/test/org/apache/pig/test/Util.java
pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 15 17:51:27 2011
@@ -130,6 +130,8 @@ OPTIMIZATIONS
PIG-2011: Speed up TestTypedMap.java (dvryaboy)
+PIG-2228: support partial aggregation in map task (thejas)
+
BUG FIXES
PIG-2273: Pig.compileFromFile in embedded python fails when pig script starts with a comment (ddaniels888 via gates)
Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Thu Sep 15 17:51:27 2011
@@ -1,5 +1,7 @@
# Pig configuration file. All values can be overwritten by command line arguments.
+# Use the "-h properties" command to see description of the properties
+
# log4jconf log4j configuration file
# log4jconf=./conf/log4j.properties
@@ -25,4 +27,31 @@
#the following two parameters are to help estimate the reducer number
#pig.exec.reducers.bytes.per.reducer=1000000000
-#pig.exec.reducers.max=999
\ No newline at end of file
+#pig.exec.reducers.max=999
+
+#Logging properties
+#verbose=false
+#brief=false
+#debug=INFO
+#aggregate.warning=true
+
+#Performance tuning properties
+#pig.cachedbag.memusage=0.2
+#pig.skewedjoin.reduce.memusagea=0.3
+#pig.exec.nocombiner=false
+#opt.multiquery=true
+#pig.tmpfilecompression=false
+
+#value can be lzo or gzip
+#pig.tmpfilecompression.codec=gzip
+#pig.noSplitCombination=true
+#pig.exec.mapPartAgg=false
+#pig.exec.mapPartAgg.minReduction=10
+
+
+#exectype=mapreduce
+#pig.additional.jars=<comma seperated list of jars>
+#udf.import.list=<comma seperated list of imports>
+#stop.on.failure=false
+
+
Modified: pig/trunk/src/org/apache/pig/Algebraic.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Algebraic.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Algebraic.java (original)
+++ pig/trunk/src/org/apache/pig/Algebraic.java Thu Sep 15 17:51:27 2011
@@ -39,6 +39,7 @@ import org.apache.pig.classification.Int
* whether the intermediate function is called 0, 1, or more times. Hadoop makes no guarantees
* about how many times the combiner will be called in a job.
*
+ *
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
@@ -47,12 +48,14 @@ public interface Algebraic{
/**
* Get the initial function.
* @return A function name of f_init. f_init should be an eval func.
+ * The return type of f_init.exec() has to be Tuple
*/
public String getInitial();
/**
* Get the intermediate function.
* @return A function name of f_intermed. f_intermed should be an eval func.
+ * The return type of f_intermed.exec() has to be Tuple
*/
public String getIntermed();
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Thu Sep 15 17:51:27 2011
@@ -800,6 +800,12 @@ public static void printProperties(){
System.out.println(" Used in conjunction with pig.tmpfilecompression. Defines compression type.");
System.out.println(" pig.noSplitCombination=true|false. Split combination is on by default.");
System.out.println(" Determines if multiple small files are combined into a single map.");
+ System.out.println(" pig.exec.mapPartAgg=true|false. Default is false.");
+ System.out.println(" Determines if partial aggregation is done within map phase, ");
+ System.out.println(" before records are sent to combiner.");
+ System.out.println(" pig.exec.mapPartAgg.minReduction=<min aggregation factor>. Default is 10.");
+ System.out.println(" If the in-map partial aggregation does not reduce the output num records");
+ System.out.println(" by this factor, it gets disabled.");
System.out.println(" Miscellaneous:");
System.out.println(" exectype=mapreduce|local; default is mapreduce. This property is the same as -x switch");
System.out.println(" pig.additional.jars=<comma seperated list of jars>. Used in place of register command.");
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Sep 15 17:51:27 2011
@@ -24,12 +24,14 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigException;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigWarning;
import org.apache.pig.data.DataType;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -45,6 +47,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -91,15 +94,18 @@ public class CombinerOptimizer extends M
private CompilationMessageCollector messageCollector = null;
- public CombinerOptimizer(MROperPlan plan, String chunkSize) {
- super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
- messageCollector = new CompilationMessageCollector() ;
+ private boolean doMapAgg;
+
+ public CombinerOptimizer(MROperPlan plan, boolean doMapAgg) {
+ this(plan, doMapAgg, new CompilationMessageCollector());
}
- public CombinerOptimizer(MROperPlan plan, String chunkSize,
+ public CombinerOptimizer(MROperPlan plan, boolean doMapAgg,
CompilationMessageCollector messageCollector) {
+
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
- this.messageCollector = messageCollector ;
+ this.messageCollector = messageCollector;
+ this.doMapAgg = doMapAgg;
}
public CompilationMessageCollector getMessageCollector() {
@@ -264,12 +270,17 @@ public class CombinerOptimizer extends M
mr.combinePlan.add(combinePack);
mr.combinePlan.add(cfe);
mr.combinePlan.connect(combinePack, cfe);
+
// No need to connect projections in cfe to cp, because
// PigCombiner directly attaches output from package to
// root of remaining plan.
POLocalRearrange mlr = getNewRearrange(rearrange);
+ POPartialAgg mapAgg = null;
+ if(doMapAgg){
+ mapAgg = createPartialAgg(cfe);
+ }
// A specialized local rearrange operator will replace
// the normal local rearrange in the map plan. This behaves
@@ -284,7 +295,7 @@ public class CombinerOptimizer extends M
// it is added to the end (This is required so that we can
// set up the inner plan of the new Local Rearrange leaf in the map
// and combine plan to contain just the project of the key).
- patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mlr);
+ patchUpMap(mr.mapPlan, getPreCombinerLR(rearrange), mfe, mapAgg, mlr);
POLocalRearrange clr = getNewRearrange(rearrange);
mr.combinePlan.add(clr);
@@ -317,6 +328,31 @@ public class CombinerOptimizer extends M
/**
+ * Translate POForEach in combiner into a POPartialAgg
+ * @param combineFE
+ * @return partial aggregate operator
+ * @throws CloneNotSupportedException
+ */
+ private POPartialAgg createPartialAgg(POForEach combineFE)
+ throws CloneNotSupportedException {
+ String scope = combineFE.getOperatorKey().scope;
+ POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ poAgg.setAlias(combineFE.getAlias());
+ poAgg.setResultType(combineFE.getResultType());
+
+ //first plan in combine foreach is the group key
+ poAgg.setKeyPlan(combineFE.getInputPlans().get(0).clone());
+
+ List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
+ for(int i=1; i<combineFE.getInputPlans().size(); i++){
+ valuePlans.add(combineFE.getInputPlans().get(i).clone());
+ }
+ poAgg.setValuePlans(valuePlans);
+ return poAgg;
+ }
+
+ /**
* find algebraic operators and also check if the foreach statement
* is suitable for combiner use
* @param feInners inner plans of foreach
@@ -544,11 +580,13 @@ public class CombinerOptimizer extends M
* @param mapPlan
* @param preCombinerLR
* @param mfe
+ * @param mapAgg
* @param mlr
* @throws PlanException
*/
private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
- POForEach mfe, POLocalRearrange mlr) throws PlanException {
+ POForEach mfe, POPartialAgg mapAgg, POLocalRearrange mlr)
+ throws PlanException {
POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
mapPlan.replace(oldLR, preCombinerLR);
@@ -556,8 +594,17 @@ public class CombinerOptimizer extends M
mapPlan.add(mfe);
mapPlan.connect(preCombinerLR, mfe);
+ //the operator before local rearrange
+ PhysicalOperator opBeforeLR = mfe;
+
+ if(mapAgg != null){
+ mapPlan.add(mapAgg);
+ mapPlan.connect(mfe, mapAgg);
+ opBeforeLR = mapAgg;
+ }
+
mapPlan.add(mlr);
- mapPlan.connect(mfe, mlr);
+ mapPlan.connect(opBeforeLR, mlr);
}
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Sep 15 17:51:27 2011
@@ -56,7 +56,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -75,6 +74,15 @@ import org.apache.pig.tools.pigstats.Scr
*
*/
public class MapReduceLauncher extends Launcher{
+
+ public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+
+ public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+ public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
+
+
private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
//used to track the exception thrown by the job control which is run in a separate thread
@@ -83,11 +91,6 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
private Map<FileSpec, Exception> failureMap;
-
- public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
-
- public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
- "mapreduce.fileoutputcommitter.marksuccessfuljobs";
private JobControl jc=null;
@@ -516,7 +519,9 @@ public class MapReduceLauncher extends L
//String prop = System.getProperty("pig.exec.nocombiner");
String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
if (!pc.inIllustrator && !("true".equals(prop))) {
- CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize);
+ boolean doMapAgg =
+ Boolean.valueOf(pc.getProperties().getProperty(PROP_EXEC_MAP_PARTAGG,"false"));
+ CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg);
co.visit();
//display the warning message(s) from the CombinerOptimizer
co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Sep 15 17:51:27 2011
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -278,6 +277,12 @@ public class PhyPlanSetter extends PhyPl
lrfi.setParentPlan(parent);
}
*/
+
+ @Override
+ public void visitPartialAgg(POPartialAgg poPartialAgg) {
+ poPartialAgg.setParentPlan(parent);
+ }
+
@Override
public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) {
optimizedForEach.setParentPlan(parent);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Sep 15 17:51:27 2011
@@ -23,6 +23,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -94,6 +95,12 @@ public class EndOfAllInputSetter extends
throws VisitorException {
endOfAllInputFlag = true;
}
+
+ @Override
+ public void visitPartialAgg(POPartialAgg partAgg){
+ endOfAllInputFlag = true;
+ }
+
/**
* @return if end of all input is present
*/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Sep 15 17:51:27 2011
@@ -61,7 +61,8 @@ public class PhyPlanVisitor extends Plan
visit();
popWalker();
}
-
+
+
public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
List<PhysicalPlan> inpPlans = mg.getPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -298,7 +299,9 @@ public class PhyPlanVisitor extends Plan
public void visitPreCombinerLocalRearrange(
POPreCombinerLocalRearrange preCombinerLocalRearrange) {
// TODO Auto-generated method stub
-
+ }
+
+ public void visitPartialAgg(POPartialAgg poPartialAgg) {
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Sep 15 17:51:27 2011
@@ -156,6 +156,10 @@ public class PlanPrinter<O extends Opera
else if(node instanceof POLocalRearrange){
sb.append(planString(((POLocalRearrange)node).getPlans()));
}
+ else if(node instanceof POPartialAgg){
+ sb.append(planString(((POPartialAgg)node).getKeyPlan()));
+ sb.append(planString(((POPartialAgg)node).getValuePlans()));
+ }
else if(node instanceof POCollectedGroup){
sb.append(planString(((POCollectedGroup)node).getPlans()));
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Sep 15 17:51:27 2011
@@ -354,6 +354,7 @@ public class POLocalRearrange extends Ph
return inp;
}
+
private void detachPlans(List<PhysicalPlan> plans) {
for (PhysicalPlan ep : plans) {
ep.detachInput();
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,528 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.SelfSpillBag.MemoryLimits;
+import org.apache.pig.data.SizeUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Do partial aggregation in map plan. It uses a hash-map to aggregate. If
+ * consecutive records have same key, it will aggregate those without adding
+ * them to the hash-map. As future optimization, the use of hash-map could be
+ * disabled when input data is sorted on group-by keys
+ */
+public class POPartialAgg extends PhysicalOperator {
+
+ public static final String PROP_PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
+
+ private static final Log log = LogFactory.getLog(POPartialAgg.class);
+ private static final long serialVersionUID = 1L;
+
+ private PhysicalPlan keyPlan;
+ private ExpressionOperator keyLeaf;
+
+ private List<PhysicalPlan> valuePlans;
+ private List<ExpressionOperator> valueLeaves;
+ private static final Result ERR_RESULT = new Result();
+ private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
+ null);
+
+ // run time variables
+ private transient Object currentKey = null;
+ private transient Map<Object, Tuple> aggMap;
+ // tuple of the format - (null(key),bag-val1,bag-val2,...)
+ // attach this to the plans with algebraic udf before evaluating the plans
+ private transient Tuple valueTuple = null;
+
+ private boolean isFinished = false;
+
+ private transient Iterator<Tuple> mapDumpIterator;
+ private transient int numToDump;
+
+ // maximum bag size of currentValues cached before aggregation is done
+ private static final int MAX_SIZE_CURVAL_CACHE = 1024;
+
+ // number of records to sample to determine average size used by each
+ // entry in hash map
+ private static final int NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE = 100;
+
+ // params for auto disabling map aggregation
+ private static final int NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION = 1000;
+
+ private static final int DEFAULT_MIN_REDUCTION = 10;
+
+ private boolean disableMapAgg = false;
+ private int num_inp_recs;
+ private boolean sizeReductionChecked = false;
+
+ private transient int maxHashMapSize;
+
+ private transient TupleFactory tupleFact;
+ private transient MemoryLimits memLimits;
+
+ public POPartialAgg(OperatorKey k) {
+ super(k);
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // combiner optimizer does not get invoked if the plan is being executed
+ // under illustrate, so POPartialAgg should not get used in that case
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitPartialAgg(this);
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+
+ if (disableMapAgg) {
+ // map aggregation has been automatically disabled
+ if (mapDumpIterator != null) {
+ // there are some accumulated entries in map to be dumped
+ return getNextResFromMap();
+ } else {
+ Result inp = processInput();
+ if (disableMapAgg) {
+ // the in-map partial aggregation is an optional step, just
+ // like the combiner.
+ // act as if this operator was never there, by just
+ // returning the input
+ return inp;
+ }
+ }
+ }
+
+ if (mapDumpIterator != null) {
+ // if this iterator is not null, we are process of dumping records
+ // from the map
+ if (isFinished) {
+ return getNextResFromMap();
+ } else if (numToDump > 0) {
+ // there are some tuples yet to be dumped, to free memory
+ --numToDump;
+ return getNextResFromMap();
+ } else {
+ mapDumpIterator = null;
+ }
+ }
+
+ if (isFinished) {
+ // done with dumping all records
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ while (true) {
+ //process each input until EOP
+ Result inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_ERR) {
+ // error
+ return inp;
+ }
+ if (inp.returnStatus == POStatus.STATUS_EOP) {
+ if (parentPlan.endOfAllInput) {
+ // it is actually end of all input
+ // start dumping results
+ isFinished = true;
+ logCapacityOfAggMap();
+ // check if there was ANY input
+ if (valueTuple == null) {
+ return EOP_RESULT;
+ }
+
+ // first return agg for currentKey
+ Result output = getOutput();
+ aggMap.remove(currentKey);
+
+ mapDumpIterator = aggMap.values().iterator();
+
+ // free the variables not needed anymore
+ currentKey = null;
+ valueTuple = null;
+
+ return output;
+ } else {
+ // return EOP
+ return inp;
+ }
+ }
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ // check if this operator is doing a good job of reducing the number
+ // of records going to output to justify the costs of itself
+ // if not , disable map partial agg
+ if ((!sizeReductionChecked)) {
+ checkSizeReduction();
+
+ if (disableMapAgg) {
+ // in-map partial aggregation just got disabled
+ // return the new input record, it has not been aggregated
+ return inp;
+ }
+ }
+
+ // we have some real input data
+
+ // setup input for keyplan
+ Tuple inpTuple = (Tuple) inp.result;
+ keyPlan.attachInput(inpTuple);
+
+ // evaluate the key
+ Result keyRes = getResult(keyLeaf);
+ if (keyRes == ERR_RESULT) {
+ return ERR_RESULT;
+ }
+ Object key = keyRes.result;
+ keyPlan.detachInput();
+
+ if (valueTuple == null) {
+ // this is the first record the operator is seeing
+ // do initializations
+ init(key, inpTuple);
+ continue;
+ } else {
+ // check if key changed
+ boolean keyChanged = (currentKey != null && key == null)
+ || ((key != null) && (!key.equals(currentKey)));
+
+ if (!keyChanged) {
+ addToCurrentValues(inpTuple);
+
+ // if there are enough number of values,
+ // aggregate the values accumulated in valueTuple
+ if (((DefaultDataBag) valueTuple.get(1)).size() >= MAX_SIZE_CURVAL_CACHE) {
+ // not a key change, so store the agg result back to bag
+ aggregateCurrentValues();
+ }
+ continue;
+ } else {// new key
+
+ // compute aggregate for currentKey
+ Result output = getOutput();
+ if (output.returnStatus != POStatus.STATUS_OK) {
+ return ERR_RESULT;
+ }
+
+ // set new current key, value
+ currentKey = key;
+ resetCurrentValues();
+ addToCurrentValues(inpTuple);
+
+ // get existing result from map (if any) and add it to
+ // current values
+ Tuple existingResult = aggMap.get(key);
+
+ // existingResult will be null only if key is absent in
+ // aggMap
+ if (existingResult != null) {
+ addToCurrentValues(existingResult);
+ }
+
+ // storing a new entry in the map, so update estimate of
+ // num of entries that will fit into the map
+ if (memLimits.getNumObjectsSizeAdded() < NUM_RESRECS_TO_SAMPLE_SZ_ESTIMATE) {
+ updateMaxMapSize(output.result);
+ }
+
+ // check if it is time to dump some aggs from the hashmap
+ if (aggMap.size() >= maxHashMapSize) {
+ // dump 10% of max hash size because dumping just one
+ // record at a time might result in most group key being
+ // dumped (depending on hashmap implementation)
+ // TODO: dump the least recently/frequently used entries
+ numToDump = maxHashMapSize / 10;
+ mapDumpIterator = aggMap.values().iterator();
+
+ return output;
+ } else {
+ // there is space available in the hashmap, store the
+ // output there
+ addOutputToAggMap(output);
+ }
+
+ continue;
+ }
+ }
+ }
+ }
+
+ private void updateMaxMapSize(Object result) {
+ long size = SizeUtil.getMapEntrySize(currentKey,
+ result);
+ memLimits.addNewObjSize(size);
+ maxHashMapSize = memLimits.getCacheLimit();
+ }
+
+ /**
+ * Aggregate values accumulated in
+ *
+ * @throws ExecException
+ */
+ private void aggregateCurrentValues() throws ExecException {
+ for (int i = 0; i < valuePlans.size(); i++) {
+ valuePlans.get(i).attachInput(valueTuple);
+ Result valRes = getResult(valueLeaves.get(i));
+ if (valRes == ERR_RESULT) {
+ throw new ExecException(
+ "Error computing aggregate during in-map partial aggregation");
+ }
+
+ Tuple aggVal = getAggResultTuple(valRes.result);
+
+ // i'th plan should read only from i'th bag
+ // so we are done with i'th bag, clear it and
+ // add the new agg result to it
+ DataBag valBag = (DataBag) valueTuple.get(i + 1);
+ valBag.clear();
+ valBag.add(aggVal);
+
+ valuePlans.get(i).detachInput();
+ }
+ }
+
+ private void init(Object key, Tuple inpTuple) throws ExecException {
+ tupleFact = TupleFactory.getInstance();
+
+ // value tuple has bags of values for currentKey
+ valueTuple = tupleFact.newTuple(valuePlans.size() + 1);
+
+ for (int i = 0; i < valuePlans.size(); i++) {
+ valueTuple.set(i + 1, new DefaultDataBag(new ArrayList<Tuple>(
+ MAX_SIZE_CURVAL_CACHE)));
+ }
+
+ // set current key, add value
+ currentKey = key;
+ addToCurrentValues(inpTuple);
+ aggMap = new HashMap<Object, Tuple>();
+
+ // TODO: keep track of actual number of objects that share the
+ // memory limit. For now using a default of 3, which is what is
+ // used by InternalCachedBag
+ memLimits = new MemoryLimits(3, -1);
+ maxHashMapSize = Integer.MAX_VALUE;
+
+ }
+
+ private Tuple getAggResultTuple(Object result) throws ExecException {
+ try {
+ return (Tuple) result;
+ } catch (ClassCastException ex) {
+ throw new ExecException("Intermediate Algebraic "
+ + "functions must implement EvalFunc<Tuple>");
+ }
+ }
+
+ private void checkSizeReduction() throws ExecException {
+
+ num_inp_recs++;
+ if (num_inp_recs == NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION
+ || (aggMap != null && aggMap.size() == maxHashMapSize - 1)) {
+ // the above check for the hashmap current size is
+ // done to avoid having to keep track of any dumps that
+ // could
+ // happen before NUM_INPRECS_TO_SAMPLE_SZ_REDUCTION is
+ // reached
+
+ sizeReductionChecked = true;
+
+ // find out how many output records we have for this many
+ // input records
+
+ int outputReduction = aggMap.size() == 0 ? Integer.MAX_VALUE
+ : num_inp_recs / aggMap.size();
+ int min_output_reduction = getMinOutputReductionFromProp();
+ if (outputReduction < min_output_reduction) {
+ disableMapAgg = true;
+ log.info("Disabling in-map partial aggregation because the "
+ + "reduction in tuples (" + outputReduction
+ + ") is lower than threshold (" + min_output_reduction
+ + ")");
+ logCapacityOfAggMap();
+ // get current key vals output
+ Result output = getOutput();
+
+ // free the variables not needed anymore
+ currentKey = null;
+ valueTuple = null;
+
+ // store the output into hash map for now
+ addOutputToAggMap(output);
+
+ mapDumpIterator = aggMap.values().iterator();
+ }
+ }
+
+ }
+
+ private void logCapacityOfAggMap() {
+ log.info("Maximum capacity of hashmap used for map"
+ + " partial aggregation was " + maxHashMapSize + " entries");
+ }
+
+ private void addOutputToAggMap(Result output) throws ExecException {
+ aggMap.put(((Tuple) output.result).get(0), (Tuple) output.result);
+ }
+
+ private int getMinOutputReductionFromProp() {
+ int minReduction = PigMapReduce.sJobConfInternal.get().getInt(
+ PROP_PARTAGG_MINREDUCTION, 0);
+
+ if (minReduction <= 0) {
+ // the default minimum reduction is 10
+ minReduction = DEFAULT_MIN_REDUCTION;
+ }
+ return minReduction;
+ }
+
+ private Result getNextResFromMap() {
+ if (!mapDumpIterator.hasNext()) {
+ mapDumpIterator = null;
+ return EOP_RESULT;
+ }
+ Tuple outTuple = mapDumpIterator.next();
+ mapDumpIterator.remove();
+ return new Result(POStatus.STATUS_OK, outTuple);
+ }
+
+ private Result getOutput() throws ExecException {
+ Tuple output = tupleFact.newTuple(valuePlans.size() + 1);
+ output.set(0, currentKey);
+
+ for (int i = 0; i < valuePlans.size(); i++) {
+ valuePlans.get(i).attachInput(valueTuple);
+ Result valRes = getResult(valueLeaves.get(i));
+ if (valRes == ERR_RESULT) {
+ return ERR_RESULT;
+ }
+ output.set(i + 1, valRes.result);
+ }
+ return new Result(POStatus.STATUS_OK, output);
+ }
+
+ private void resetCurrentValues() throws ExecException {
+ for (int i = 1; i < valueTuple.size(); i++) {
+ ((DataBag) valueTuple.get(i)).clear();
+ }
+ }
+
+ private void addToCurrentValues(Tuple inpTuple) throws ExecException {
+ for (int i = 1; i < inpTuple.size(); i++) {
+ DataBag bag = (DataBag) valueTuple.get(i);
+ bag.add((Tuple) inpTuple.get(i));
+ }
+ }
+
+ private Result getResult(ExpressionOperator op) throws ExecException {
+ Result res = ERR_RESULT;
+
+ switch (op.getResultType()) {
+ case DataType.BAG:
+ case DataType.BOOLEAN:
+ case DataType.BYTEARRAY:
+ case DataType.CHARARRAY:
+ case DataType.DOUBLE:
+ case DataType.FLOAT:
+ case DataType.INTEGER:
+ case DataType.LONG:
+ case DataType.MAP:
+ case DataType.TUPLE:
+ res = op.getNext(getDummy(op.getResultType()), op.getResultType());
+ break;
+ default:
+ String msg = "Invalid result type: "
+ + DataType.findType(op.getResultType());
+ throw new ExecException(msg, 2270, PigException.BUG);
+ }
+
+ // allow null as group by key
+ if (res.returnStatus == POStatus.STATUS_OK
+ || res.returnStatus == POStatus.STATUS_NULL) {
+ return res;
+ }
+ return ERR_RESULT;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return getAliasString() + "Partial Agg" + "["
+ + DataType.findTypeName(resultType) + "]" + mKey.toString();
+
+ }
+
+ public PhysicalPlan getKeyPlan() {
+ return keyPlan;
+ }
+
+ public void setKeyPlan(PhysicalPlan keyPlan) {
+ this.keyPlan = keyPlan;
+ keyLeaf = (ExpressionOperator) keyPlan.getLeaves().get(0);
+ }
+
+ public List<PhysicalPlan> getValuePlans() {
+ return valuePlans;
+ }
+
+ public void setValuePlans(List<PhysicalPlan> valuePlans) {
+ this.valuePlans = valuePlans;
+ valueLeaves = new ArrayList<ExpressionOperator>();
+ for (PhysicalPlan plan : valuePlans) {
+ valueLeaves.add((ExpressionOperator) plan.getLeaves().get(0));
+ }
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DefaultTuple.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/DefaultTuple.java Thu Sep 15 17:51:27 2011
@@ -206,27 +206,18 @@ public class DefaultTuple implements Tup
+ 32 /* mFields array list fixed size */;
// rest of the fixed portion of mfields size is accounted within empty_tuple_size
- long mfields_var_size = roundToEight(4 + 4 * mFields.size());
+ long mfields_var_size = SizeUtil.roundToEight(4 + 4 * mFields.size());
// in java hotspot 32bit vm, there seems to be a minimum tuple size of 96
// which is probably from the minimum size of this array list
mfields_var_size = Math.max(40, mfields_var_size);
long sum = empty_tuple_size + mfields_var_size;
while (i.hasNext()) {
- sum += getFieldMemorySize(i.next());
+ sum += SizeUtil.getPigObjMemSize(i.next());
}
return sum;
}
- /**
- * Memory size of objects are rounded to multiple of 8 bytes
- *
- * @param i
- * @return i rounded to a equal of higher multiple of 8
- */
- private long roundToEight(long i) {
- return 8 * ((i + 7) / 8); // integer division rounds the result down
- }
/**
* Write a tuple of atomic values into a string. All values in the tuple must be atomic (no bags, tuples, or maps).
@@ -562,78 +553,6 @@ public class DefaultTuple implements Tup
}
}
- @SuppressWarnings("unchecked")
- private long getFieldMemorySize(Object o) {
- // 12 is added to each to account for the object overhead and the
- // pointer in the tuple.
- switch (DataType.findType(o)) {
- case DataType.BYTEARRAY: {
- byte[] bytes = ((DataByteArray) o).get();
- // bytearray size including rounding to 8 bytes
- long byte_array_sz = roundToEight(bytes.length + 12);
-
- return byte_array_sz + 16 /* 16 is additional size of DataByteArray */;
- }
-
- case DataType.CHARARRAY: {
- String s = (String) o;
- // See PIG-1443 for a reference for this formula
- return roundToEight((s.length() * 2) + 38);
- }
-
- case DataType.TUPLE: {
- Tuple t = (Tuple) o;
- return t.getMemorySize();
- }
-
- case DataType.BAG: {
- DataBag b = (DataBag) o;
- return b.getMemorySize();
- }
-
- case DataType.INTEGER:
- return 4 + 8 + 4/* +4 to round to 8 bytes */;
-
- case DataType.LONG:
- return 8 + 8;
-
- case DataType.MAP: {
- Map<String, Object> m = (Map<String, Object>) o;
- Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
- long sum = 0;
- while (i.hasNext()) {
- Map.Entry<String, Object> entry = i.next();
- sum += getFieldMemorySize(entry.getKey());
- sum += getFieldMemorySize(entry.getValue());
- }
- // based on experiments on 32 bit Java HotSpot VM
- // size of map with 0 entries is 120 bytes
- // each additional entry have around 24 bytes overhead at
- // small number of entries. At larger number of entries, the
- // overhead is around 32 bytes, probably because of the expanded
- // data structures in anticapation of more entries being added
- return sum + m.size() * 32 + 120;
- }
-
- case DataType.FLOAT:
- return 4 + 8 + 4/* +4 to round to 8 bytes */;
-
- case DataType.DOUBLE:
- return 8 + 8;
-
- case DataType.BOOLEAN:
- // boolean takes 1 byte , +7 to round it to 8
- return 1 + 8 + 7;
-
- case DataType.NULL:
- return 0;
-
- default:
- // ??
- return 12;
- }
- }
-
/**
* @return true if this Tuple is null
*/
Modified: pig/trunk/src/org/apache/pig/data/InternalCachedBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalCachedBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalCachedBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalCachedBag.java Thu Sep 15 17:51:27 2011
@@ -33,16 +33,16 @@ import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigCounters;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
-public class InternalCachedBag extends DefaultAbstractBag {
+public class InternalCachedBag extends SelfSpillBag {
private static final long serialVersionUID = 1L;
private static final Log log = LogFactory.getLog(InternalCachedBag.class);
- private transient int cacheLimit;
- private transient long maxMemUsage;
- private transient long memUsage;
private transient DataOutputStream out;
private transient boolean addDone;
private transient TupleFactory factory;
@@ -51,58 +51,36 @@ public class InternalCachedBag extends D
private transient int numTuplesSpilled = 0;
public InternalCachedBag() {
- this(1);
+ this(1, -1f);
}
public InternalCachedBag(int bagCount) {
- float percent = 0.2F;
-
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
- if (usage != null) {
- percent = Float.parseFloat(usage);
- }
- }
-
- init(bagCount, percent);
+ this(bagCount, -1f);
}
public InternalCachedBag(int bagCount, float percent) {
- init(bagCount, percent);
+ super(bagCount, percent);
+ init();
}
- private void init(int bagCount, float percent) {
- factory = TupleFactory.getInstance();
- mContents = new ArrayList<Tuple>();
-
- long max = Runtime.getRuntime().maxMemory();
- maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
- cacheLimit = Integer.MAX_VALUE;
-
- // set limit to 0, if memusage is 0 or really really small.
- // then all tuples are put into disk
- if (maxMemUsage < 1) {
- cacheLimit = 0;
- }
-
+ private void init() {
+ factory = TupleFactory.getInstance();
+ mContents = new ArrayList<Tuple>();
addDone = false;
}
+
public void add(Tuple t) {
if(addDone) {
throw new IllegalStateException("InternalCachedBag is closed for adding new tuples");
}
- if(mContents.size() < cacheLimit) {
+ if(mContents.size() < memLimit.getCacheLimit()) {
mContents.add(t);
if(mContents.size() < 100)
{
- memUsage += t.getMemorySize();
- long avgUsage = memUsage / (long)mContents.size();
- if (avgUsage > 0) {
- cacheLimit = (int)(maxMemUsage / avgUsage);
- }
+ memLimit.addNewObjSize(t.getMemorySize());
}
} else {
// above cacheLimit, spill to disk
Modified: pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalDistinctBag.java Thu Sep 15 17:51:27 2011
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -37,9 +36,9 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.PigCounters;
-import org.apache.pig.PigWarning;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
@@ -54,6 +53,8 @@ import org.apache.pig.backend.hadoop.exe
* This bag spills pro-actively when the number of tuples in memory
* reaches a limit
*/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
public class InternalDistinctBag extends SortedSpillBag {
/**
@@ -65,21 +66,18 @@ public class InternalDistinctBag extends
private static TupleFactory gTupleFactory = TupleFactory.getInstance();
- private transient boolean mReadStarted = false;
-
- private transient int cacheLimit;
- private transient long maxMemUsage;
- private transient long memUsage;
+ private transient boolean mReadStarted = false;
public InternalDistinctBag() {
- this(1, -1.0);
+ this(1, -1.0f);
}
public InternalDistinctBag(int bagCount) {
- this(bagCount, -1.0);
+ this(bagCount, -1.0f);
}
- public InternalDistinctBag(int bagCount, double percent) {
+ public InternalDistinctBag(int bagCount, float percent) {
+ super(bagCount, percent);
if (percent < 0) {
percent = 0.2F;
if (PigMapReduce.sJobConfInternal.get() != null) {
@@ -95,16 +93,6 @@ public class InternalDistinctBag extends
private void init(int bagCount, double percent) {
mContents = new HashSet<Tuple>();
-
- long max = Runtime.getRuntime().maxMemory();
- maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
- cacheLimit = Integer.MAX_VALUE;
-
- // set limit to 0, if memusage is 0 or really really small.
- // then all tuples are put into disk
- if (maxMemUsage < 1) {
- cacheLimit = 0;
- }
}
public boolean isSorted() {
@@ -144,7 +132,7 @@ public class InternalDistinctBag extends
throw new IllegalStateException("InternalDistinctBag is closed for adding new tuples");
}
- if (mContents.size() > cacheLimit) {
+ if (mContents.size() > memLimit.getCacheLimit()) {
proactive_spill(null);
}
@@ -154,12 +142,7 @@ public class InternalDistinctBag extends
// check how many tuples memory can hold by getting average
// size of first 100 tuples
if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())) {
- memUsage += t.getMemorySize();
- long avgUsage = memUsage / (long)mContents.size();
- if (avgUsage >0) {
- cacheLimit = (int)(maxMemUsage / avgUsage);
- log.debug("Memory can hold "+ cacheLimit + " records.");
- }
+ memLimit.addNewObjSize(t.getMemorySize());
}
}
}
Modified: pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Thu Sep 15 17:51:27 2011
@@ -64,10 +64,6 @@ public class InternalSortedBag extends S
private transient Comparator<Tuple> mComp;
private transient boolean mReadStarted = false;
-
- private transient int cacheLimit;
- private transient long maxMemUsage;
- private transient long memUsage;
static private class DefaultComparator implements Comparator<Tuple> {
@SuppressWarnings("unchecked")
@@ -93,20 +89,11 @@ public class InternalSortedBag extends S
}
public InternalSortedBag(int bagCount, Comparator<Tuple> comp) {
- this(bagCount, -1.0, comp);
+ this(bagCount, -1.0f, comp);
}
- public InternalSortedBag(int bagCount, double percent, Comparator<Tuple> comp) {
- if (percent < 0) {
- percent = 0.2F;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String usage = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.memusage");
- if (usage != null) {
- percent = Float.parseFloat(usage);
- }
- }
- }
-
+ public InternalSortedBag(int bagCount, float percent, Comparator<Tuple> comp) {
+ super(bagCount, percent);
init(bagCount, percent, comp);
}
@@ -116,19 +103,8 @@ public class InternalSortedBag extends S
*/
private void init(int bagCount, double percent, Comparator<Tuple> comp) {
mComp = (comp == null) ? new DefaultComparator() : comp;
-
-
mContents = new ArrayList<Tuple>();
-
- long max = Runtime.getRuntime().maxMemory();
- maxMemUsage = (long)(((float)max * percent) / (float)bagCount);
- cacheLimit = Integer.MAX_VALUE;
-
- // set limit to 0, if memusage is 0 or really really small.
- // then all tuples are put into disk
- if (maxMemUsage < 1) {
- cacheLimit = 0;
- }
+
}
public void add(Tuple t) {
@@ -136,7 +112,7 @@ public class InternalSortedBag extends S
throw new IllegalStateException("InternalSortedBag is closed for adding new tuples");
}
- if (mContents.size() > cacheLimit) {
+ if (mContents.size() > memLimit.getCacheLimit()) {
proactive_spill(mComp);
}
@@ -146,11 +122,7 @@ public class InternalSortedBag extends S
// size of first 100 tuples
if(mSize < 100 && (mSpillFiles == null || mSpillFiles.isEmpty())&&t!=null)
{
- memUsage += t.getMemorySize();
- long avgUsage = memUsage / (long)mContents.size();
- if (avgUsage >0) {
- cacheLimit = (int)(maxMemUsage / avgUsage);
- }
+ memLimit.addNewObjSize(t.getMemorySize());
}
mSize++;
Added: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (added)
+++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * Class to hold code common to self spilling bags such as InternalCachedBag
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class SelfSpillBag extends DefaultAbstractBag {
+ private static final long serialVersionUID = 1L;
+ protected MemoryLimits memLimit;
+
+ public SelfSpillBag(int bagCount) {
+ memLimit = new MemoryLimits(bagCount, -1);
+ }
+
+ public SelfSpillBag(int bagCount, float percent) {
+ memLimit = new MemoryLimits(bagCount, percent);
+ }
+
+ /**
+ * This class helps to compute the number of entries that should be held in
+ * memory so that memory consumption is limited. The memory limit is
+ * computed using the percentage of max memory that the user of this class
+ * is allowed to use, and number of similar objects that share this limit.
+ * The number of objects that will fit into this memory limit is computed
+ * using the average memory size of the objects whose size is given to this
+ * class.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ public static class MemoryLimits {
+
+ public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
+ private long maxMemUsage;
+ private int cacheLimit = Integer.MAX_VALUE;
+ private long memUsage = 0;
+ private long numObjsSizeChecked = 0;
+
+ /**
+ * @param bagCount
+ * @param percent
+ */
+ public MemoryLimits(int bagCount, float percent) {
+ init(bagCount, percent);
+ }
+
+ private void init(int bagCount, float percent) {
+
+ if (percent < 0) {
+ percent = 0.2F;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get(
+ PROP_CACHEDBAG_MEMUSAGE);
+ if (usage != null) {
+ percent = Float.parseFloat(usage);
+ }
+ }
+ }
+
+ long max = Runtime.getRuntime().maxMemory();
+ maxMemUsage = (long) (((float) max * percent) / (float) bagCount);
+
+ // set limit to 0, if memusage is 0 or really really small.
+ // then all tuples are put into disk
+ if (maxMemUsage < 1) {
+ cacheLimit = 0;
+ }
+ }
+
+ /**
+ * Computes the number of objects that would fit into memory based on
+ * the memory limit and average size of each object.
+ *
+ * @return number of objects limit
+ */
+ public int getCacheLimit() {
+ if (numObjsSizeChecked > 0) {
+ long avgUsage = memUsage / numObjsSizeChecked;
+ if (avgUsage > 0) {
+ cacheLimit = (int) (maxMemUsage / avgUsage);
+ }
+ }
+ return cacheLimit;
+ }
+
+ /**
+ * Submit information about size of another object
+ *
+ * @param memorySize
+ */
+ public void addNewObjSize(long memorySize) {
+ memUsage += memorySize;
+ ++numObjsSizeChecked;
+ }
+
+ /**
+ * @return the size of
+ */
+ public long getNumObjectsSizeAdded() {
+ return numObjsSizeChecked;
+ }
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/data/SizeUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SizeUtil.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SizeUtil.java (added)
+++ pig/trunk/src/org/apache/pig/data/SizeUtil.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * Utility functions for estimating size of objects of pig types
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SizeUtil {
+
+ private static final int MAP_MEM_PER_ENTRY = 32 + 120;
+
+ public static long getPigObjMemSize(Object o) {
+ // 12 is added to each to account for the object overhead and the
+ // pointer in the tuple.
+ switch (DataType.findType(o)) {
+ case DataType.BYTEARRAY: {
+ byte[] bytes = ((DataByteArray) o).get();
+ // bytearray size including rounding to 8 bytes
+ long byte_array_sz = roundToEight(bytes.length + 12);
+
+ return byte_array_sz + 16 /* 16 is additional size of DataByteArray */;
+ }
+
+ case DataType.CHARARRAY: {
+ String s = (String) o;
+ // See PIG-1443 for a reference for this formula
+ return roundToEight((s.length() * 2) + 38);
+ }
+
+ case DataType.TUPLE: {
+ Tuple t = (Tuple) o;
+ return t.getMemorySize();
+ }
+
+ case DataType.BAG: {
+ DataBag b = (DataBag) o;
+ return b.getMemorySize();
+ }
+
+ case DataType.INTEGER:
+ return 4 + 8 + 4/* +4 to round to 8 bytes */;
+
+ case DataType.LONG:
+ return 8 + 8;
+
+ case DataType.MAP: {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> m = (Map<String, Object>) o;
+ Iterator<Map.Entry<String, Object>> i = m.entrySet().iterator();
+ long sum = 0;
+ while (i.hasNext()) {
+ Entry<String, Object> entry = i.next();
+ sum += getMapEntrySize(entry.getKey(), entry.getValue());
+ }
+ return sum;
+ }
+
+ case DataType.FLOAT:
+ return 4 + 8 + 4/* +4 to round to 8 bytes */;
+
+ case DataType.DOUBLE:
+ return 8 + 8;
+
+ case DataType.BOOLEAN:
+ // boolean takes 1 byte , +7 to round it to 8
+ return 1 + 8 + 7;
+
+ case DataType.NULL:
+ return 0;
+
+ default:
+ // ??
+ return 12;
+ }
+ }
+
+ public static long getMapEntrySize(Object key, Object value) {
+ // based on experiments on 32 bit Java HotSpot VM
+ // size of map with 0 entries is 120 bytes
+ // each additional entry have around 24 bytes overhead at
+ // small number of entries. At larger number of entries, the
+ // overhead is around 32 bytes, probably because of the expanded
+ // data structures in anticapation of more entries being added
+ return getPigObjMemSize(key) + getPigObjMemSize(value)
+ + MAP_MEM_PER_ENTRY;
+ }
+
+ /**
+ * Memory size of objects are rounded to multiple of 8 bytes
+ *
+ * @param i
+ * @return i rounded to a equal of higher multiple of 8
+ */
+ public static long roundToEight(long i) {
+ return 8 * ((i + 7) / 8); // integer division rounds the result down
+ }
+
+}
Modified: pig/trunk/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SortedSpillBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SortedSpillBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/SortedSpillBag.java Thu Sep 15 17:51:27 2011
@@ -24,15 +24,23 @@ import java.util.Comparator;
import org.apache.pig.PigCounters;
import org.apache.pig.PigWarning;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
/**
* Common functionality for proactively spilling bags that need to keep the data
* sorted.
*/
-public abstract class SortedSpillBag extends DefaultAbstractBag {
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class SortedSpillBag extends SelfSpillBag {
private static final long serialVersionUID = 1L;
+ SortedSpillBag(int bagCount, float percent){
+ super(bagCount, percent);
+ }
+
/**
* Sort contents of mContents and write them to disk
* @param comp Comparator to sort contents of mContents
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Sep 15 17:51:27 2011
@@ -58,6 +58,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -150,7 +151,8 @@ public class ScriptState {
LIMIT,
UNION,
COMBINER,
- NATIVE;
+ NATIVE,
+ MAP_PARTIALAGG;
};
/**
@@ -607,7 +609,13 @@ public class ScriptState {
@Override
public void visitDemux(PODemux demux) throws VisitorException {
feature.set(PIG_FEATURE.MULTI_QUERY.ordinal());
- }
+ }
+
+ @Override
+ public void visitPartialAgg(POPartialAgg partAgg){
+ feature.set(PIG_FEATURE.MAP_PARTIALAGG.ordinal());
+ }
+
}
static class LogicalPlanFeatureVisitor extends LogicalRelationalNodesVisitor {
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Thu Sep 15 17:51:27 2011
@@ -507,10 +507,81 @@ c = load ':INPATH:/singlefile/studenttab
d = cogroup b by group, c by name;
e = foreach d generate flatten(group), SUM(c.gpa), COUNT(c.name);
store e into ':OUTPATH:';\,
- }
+ },
],
},
{
+ 'name' => 'MapPartialAgg',
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = group a by name;
+c = foreach b generate group, COUNT(a.age);
+store c into ':OUTPATH:';\,
+ 'java_params' => ['-Dpig.exec.mapPartAgg=true']
+ },
+ {
+ #multiquery with group in one sub query
+ 'num' => 2,
+ 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
+ b = filter a by age < 22; store b into ':OUTPATH:.1';
+ c = group b by age;
+ d = foreach c generate group, SUM(b.gpa);
+ store d into ':OUTPATH:.2'; #,
+ 'java_params' => ['-Dpig.exec.mapPartAgg=true']
+
+ },
+ {
+ #multi query with two group on diff columns
+ 'num' => 3,
+ 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
+ g1 = group a by name;
+ f1 = foreach g1 generate group as name, MAX(a.gpa);
+ store f1 into ':OUTPATH:.1';
+ g2 = group a by age;
+ f2 = foreach g2 generate group as age, AVG(a.gpa);
+ store f2 into ':OUTPATH:.2'; #,
+ 'java_params' => ['-Dpig.exec.mapPartAgg=true']
+
+ },
+ {
+ #multi query with three groups on diff columns, group key being an expression
+ 'num' => 4,
+ 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
+ g1 = group a by name;
+ f1 = foreach g1 generate group as name, MAX(a.gpa);
+ store f1 into ':OUTPATH:.1';
+ g2 = group a by age%10;
+ f2 = foreach g2 generate group as age_mod10, AVG(a.gpa);
+ store f2 into ':OUTPATH:.2';
+ g3 = group a by age;
+ f3 = foreach g3 generate group%10, AVG(a.gpa);
+ store f3 into ':OUTPATH:.3';
+ g4 = group a by gpa;
+ f4 = foreach g4 generate group as gpa, COUNT(a);
+ store f4 into ':OUTPATH:.4';
+
+ #,
+ 'java_params' => ['-Dpig.exec.mapPartAgg=true']
+
+ },
+ {
+ #aggregation gets more than one tuple for every tuple from load func
+
+ 'num' => 5,
+ 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float);
+ b = foreach a generate name, age, gpa, flatten(TOBAG(age,age)) as x;
+ c = group b by age;
+ d = foreach c generate group, AVG(b.gpa);
+ store d into ':OUTPATH:'; #,
+ 'java_params' => ['-Dpig.exec.mapPartAgg=true']
+
+ },
+
+ ],
+ },
+ {
'name' => 'EvalFunc',
'tests' => [
{
Modified: pig/trunk/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDataBag.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestDataBag.java Thu Sep 15 17:51:27 2011
@@ -900,7 +900,7 @@ public class TestDataBag extends junit.f
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
// check bag with data written to disk
- DataBag bg3 = new InternalSortedBag(1, 0.0, null);
+ DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
for (int i = 0; i < tupleContents.length; i++) {
bg3.add(Util.createTuple(tupleContents[i]));
@@ -917,7 +917,7 @@ public class TestDataBag extends junit.f
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
- DataBag bg4 = new InternalSortedBag(1, 0.0, null);
+ DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
bg4.add(iter.next());
bg4.add(iter.next());
assertTrue(iter.hasNext());
@@ -1022,7 +1022,7 @@ public class TestDataBag extends junit.f
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
// check bag with data written to disk
- DataBag bg3 = new InternalDistinctBag(1, 0.0);
+ DataBag bg3 = new InternalDistinctBag(1, 0.0f);
tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
for (int i = 0; i < tupleContents.length; i++) {
bg3.add(Util.createTuple(tupleContents[i]));
@@ -1036,7 +1036,7 @@ public class TestDataBag extends junit.f
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
- DataBag bg4 = new InternalDistinctBag(1, 0.0);
+ DataBag bg4 = new InternalDistinctBag(1, 0.0f);
bg4.add(iter.next());
bg4.add(iter.next());
assertTrue(iter.hasNext());