You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/06 12:54:14 UTC
svn commit: r1585283 [1/3] - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/fetch/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/ap...
Author: cheolsoo
Date: Sun Apr 6 10:54:13 2014
New Revision: 1585283
URL: http://svn.apache.org/r1585283
Log:
PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
Added:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
Removed:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java
pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
pig/trunk/test/org/apache/pig/test/TestPackage.java
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/Cogroup.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC19.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Apr 6 10:54:13 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
+
PIG-3449: Move JobCreationException to org.apache.pig.backend.hadoop.executionengine (cheolsoo)
PIG-3765: Ability to disable Pig commands and operators (prkommireddi)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Sun Apr 6 10:54:13 2014
@@ -32,19 +32,16 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -231,16 +228,6 @@ public class FetchOptimizer {
}
@Override
- public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException {
- planFetchable = false;
- }
-
- @Override
- public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException {
- planFetchable = false;
- }
-
- @Override
public void visitSplit(POSplit spl) throws VisitorException {
planFetchable = false;
}
@@ -271,11 +258,6 @@ public class FetchOptimizer {
}
@Override
- public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException {
- planFetchable = false;
- }
-
- @Override
public void visitCross(POCross cross) throws VisitorException {
planFetchable = false;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Sun Apr 6 10:54:13 2014
@@ -37,10 +37,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -71,13 +71,19 @@ public class AccumulatorOptimizer extend
return;
}
+ Packager pkgr = ((POPackage) po_package).getPkgr();
+ // Check that this is a standard package, not a subclass
+ if (!pkgr.getClass().equals(Packager.class)) {
+ return;
+ }
+
// if POPackage is for distinct, just return
- if (((POPackage)po_package).isDistinct()) {
+ if (pkgr.isDistinct()) {
return;
}
// if any input to POPackage is inner, just return
- boolean[] isInner = ((POPackage)po_package).getInner();
+ boolean[] isInner = pkgr.getInner();
for(boolean b: isInner) {
if (b) {
return;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Sun Apr 6 10:54:13 2014
@@ -24,41 +24,38 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.pig.PigException;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
-import org.apache.pig.data.DataType;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.impl.util.Pair;
@@ -265,8 +262,10 @@ public class CombinerOptimizer extends M
// as it needs to act differently than the regular
// package operator.
mr.combinePlan = new PhysicalPlan();
- POCombinerPackage combinePack =
- new POCombinerPackage(pack, bags);
+ CombinerPackager pkgr = new CombinerPackager(pack.getPkgr(),
+ bags);
+ POPackage combinePack = pack.clone();
+ combinePack.setPkgr(pkgr);
mr.combinePlan.add(combinePack);
mr.combinePlan.add(cfe);
mr.combinePlan.connect(combinePack, cfe);
@@ -304,20 +303,7 @@ public class CombinerOptimizer extends M
// Change the package operator in the reduce plan to
// be the POCombiner package, as it needs to act
// differently than the regular package operator.
- POCombinerPackage newReducePack =
- new POCombinerPackage(pack, bags);
- mr.reducePlan.replace(pack, newReducePack);
-
- // the replace() above only changes
- // the plan and does not change "inputs" to
- // operators
- // set up "inputs" for the operator after
- // package correctly
- List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
- packList.add(newReducePack);
- List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
- // there should be only one successor to package
- sucs.get(0).setInputs(packList);
+ pack.setPkgr(pkgr.clone());
} catch (Exception e) {
int errCode = 2018;
String msg = "Internal error. Unable to introduce the combiner for optimization.";
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Sun Apr 6 10:54:13 2014
@@ -827,7 +827,8 @@ public class JobControlCompiler{
}
if (!pigContext.inIllustrator)
conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
- conf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType()));
+ conf.set("pig.reduce.key.type",
+ Byte.toString(pack.getPkgr().getKeyType()));
if (mro.getUseSecondaryKey()) {
nwJob.setGroupingComparatorClass(PigSecondaryKeyGroupComparator.class);
@@ -840,9 +841,11 @@ public class JobControlCompiler{
}
else
{
- Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(pack.getKeyType()).getClass();
+ Class<? extends WritableComparable> keyClass = HDataType
+ .getWritableComparableTypes(
+ pack.getPkgr().getKeyType()).getClass();
nwJob.setOutputKeyClass(keyClass);
- selectComparator(mro, pack.getKeyType(), nwJob);
+ selectComparator(mro, pack.getPkgr().getKeyType(), nwJob);
}
nwJob.setOutputValueClass(NullableTuple.class);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Sun Apr 6 10:54:13 2014
@@ -57,6 +57,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
@@ -65,7 +67,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -73,8 +74,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
@@ -83,6 +82,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -1104,9 +1105,9 @@ public class MRCompiler extends PhyPlanV
try{
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
- if (op.getPackageType() == PackageType.JOIN) {
+ if (op.getPkgr().getPackageType() == PackageType.JOIN) {
curMROp.markRegularJoin();
- } else if (op.getPackageType() == PackageType.GROUP) {
+ } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
if (op.getNumInps() == 1) {
curMROp.markGroupBy();
} else if (op.getNumInps() > 1) {
@@ -1758,11 +1759,12 @@ public class MRCompiler extends PhyPlanV
curMROp.customPartitioner = op.getCustomPartitioner();
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.TUPLE);
- pkg.setDistinct(true);
+ Packager pkgr = pkg.getPkgr();
+ pkgr.setKeyType(DataType.TUPLE);
+ pkgr.setDistinct(true);
pkg.setNumInps(1);
boolean[] inner = {false};
- pkg.setInner(inner);
+ pkgr.setInner(inner);
curMROp.reducePlan.add(pkg);
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
@@ -1908,11 +1910,12 @@ public class MRCompiler extends PhyPlanV
// create POPakcage
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
- pkg.setKeyType(type);
+ Packager pkgr = pkg.getPkgr();
+ pkgr.setKeyType(type);
pkg.setResultType(DataType.TUPLE);
pkg.setNumInps(2);
boolean [] inner = op.getInnerFlags();
- pkg.setInner(inner);
+ pkgr.setInner(inner);
pkg.visit(this);
compiledInputs = new MapReduceOper[] {curMROp};
@@ -2150,8 +2153,11 @@ public class MRCompiler extends PhyPlanV
mro.setMapDone(true);
if (limit!=-1) {
- POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+ POPackage pkg_c = new POPackage(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ LitePackager pkgr = new LitePackager();
+ pkgr.setKeyType((fields.length > 1) ? DataType.TUPLE : keyType);
+ pkg_c.setPkgr(pkgr);
pkg_c.setNumInps(1);
//pkg.setResultType(DataType.TUPLE);
mro.combinePlan.add(pkg_c);
@@ -2191,11 +2197,14 @@ public class MRCompiler extends PhyPlanV
lr_c2.setResultType(DataType.TUPLE);
mro.combinePlan.addAsLeaf(lr_c2);
}
-
- POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
- keyType);
- pkg.setNumInps(1);
+
+ POPackage pkg = new POPackage(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ LitePackager pkgr = new LitePackager();
+ pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE
+ : keyType);
+ pkg.setPkgr(pkgr);
+ pkg.setNumInps(1);
mro.reducePlan.add(pkg);
PhysicalPlan ep = new PhysicalPlan();
@@ -2447,10 +2456,12 @@ public class MRCompiler extends PhyPlanV
mro.setMapDone(true);
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.CHARARRAY);
+ Packager pkgr = new Packager();
+ pkg.setPkgr(pkgr);
+ pkgr.setKeyType(DataType.CHARARRAY);
pkg.setNumInps(1);
boolean[] inner = {false};
- pkg.setInner(inner);
+ pkgr.setInner(inner);
mro.reducePlan.add(pkg);
// Lets start building the plan which will have the sort
@@ -2738,35 +2749,30 @@ public class MRCompiler extends PhyPlanV
public static void replaceWithPOJoinPackage(PhysicalPlan plan, MapReduceOper mr,
POPackage pack, POForEach forEach, String chunkSize) throws VisitorException {
- String scope = pack.getOperatorKey().scope;
- NodeIdGenerator nig = NodeIdGenerator.getGenerator();
- POJoinPackage joinPackage;
- joinPackage = new POJoinPackage(
- new OperatorKey(scope, nig.getNextNodeId(scope)),
- -1, pack, forEach);
- joinPackage.setChunkSize(Long.parseLong(chunkSize));
+ JoinPackager pkgr = new JoinPackager(pack.getPkgr(), forEach);
+ pkgr.setChunkSize(Long.parseLong(chunkSize));
+ pack.setPkgr(pkgr);
List<PhysicalOperator> succs = plan.getSuccessors(forEach);
- if (succs!=null)
- {
- if (succs.size()!=1)
- {
+ if (succs != null) {
+ if (succs.size() != 1) {
int errCode = 2028;
- String msg = "ForEach can only have one successor. Found " + succs.size() + " successors.";
- throw new MRCompilerException(msg, errCode, PigException.BUG);
+ String msg = "ForEach can only have one successor. Found "
+ + succs.size() + " successors.";
+ throw new MRCompilerException(msg, errCode,
+ PigException.BUG);
}
}
plan.remove(pack);
-
try {
- plan.replace(forEach, joinPackage);
+ plan.replace(forEach, pack);
} catch (PlanException e) {
int errCode = 2029;
- String msg = "Error rewriting POJoinPackage.";
+ String msg = "Error rewriting join package.";
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
- mr.phyToMRMap.put(forEach, joinPackage);
- LogFactory.
- getLog(LastInputStreamingOptimizer.class).info("Rewrite: POPackage->POForEach to POJoinPackage");
+ mr.phyToMRMap.put(forEach, pack);
+ LogFactory.getLog(LastInputStreamingOptimizer.class).info(
+ "Rewrite: POPackage->POForEach to POPackage(JoinPackager)");
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java Sun Apr 6 10:54:13 2014
@@ -65,10 +65,10 @@ public class MRUtil {
mro.mapPlan.addAsLeaf(lr);
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
- pkg.setKeyType(DataType.TUPLE);
+ pkg.getPkgr().setKeyType(DataType.TUPLE);
pkg.setNumInps(1);
boolean[] inner = {false};
- pkg.setInner(inner);
+ pkg.getPkgr().setInner(inner);
mro.reducePlan.add(pkg);
mro.reducePlan.addAsLeaf(getPlainForEachOP(scope, nig));
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sun Apr 6 10:54:13 2014
@@ -60,7 +60,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.XMLMRPrinter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
@@ -637,8 +637,8 @@ public class MapReduceLauncher extends L
comp.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
String lastInputChunkSize =
- pc.getProperties().getProperty(
- "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
+ pc.getProperties().getProperty(
+ "last.input.chunksize", JoinPackager.DEFAULT_CHUNK_SIZE);
String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER);
if (!pc.inIllustrator && !("true".equals(prop))) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Sun Apr 6 10:54:13 2014
@@ -18,10 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,14 +31,15 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -686,28 +687,33 @@ class MultiQueryOptimizer extends MROpPl
PhysicalPlan to, int initial, int current, byte mapKeyType) throws VisitorException {
POPackage pk = (POPackage)from.getRoots().get(0);
from.remove(pk);
+ Packager fromPkgr = pk.getPkgr();
- if(!(pk instanceof POMultiQueryPackage)){
+ if (!(fromPkgr instanceof MultiQueryPackager)) {
// XXX the index of the original keyInfo map is always 0,
// we need to shift the index so that the lookups works
// with the new indexed key
- addShiftedKeyInfoIndex(initial, pk);
+ addShiftedKeyInfoIndex(initial, fromPkgr);
}
int total = current - initial;
- POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+ MultiQueryPackager toPkgr = (MultiQueryPackager) ((POPackage) to
+ .getRoots().get(0)).getPkgr();
int pkCount = 0;
- if (pk instanceof POMultiQueryPackage) {
- List<POPackage> pkgs = ((POMultiQueryPackage)pk).getPackages();
- for (POPackage p : pkgs) {
- pkg.addPackage(p);
+ if (fromPkgr instanceof MultiQueryPackager) {
+ List<Packager> pkgs = ((MultiQueryPackager) fromPkgr)
+ .getPackagers();
+ for (Packager p : pkgs) {
+ ((MultiQueryPackager) toPkgr).addPackager(p);
pkCount++;
}
- pkg.addIsKeyWrappedList(((POMultiQueryPackage)pk).getIsKeyWrappedList());
- addShiftedKeyInfoIndex(initial, current, (POMultiQueryPackage)pk);
+ toPkgr.addIsKeyWrappedList(((MultiQueryPackager) fromPkgr)
+ .getIsKeyWrappedList());
+ addShiftedKeyInfoIndex(initial, current,
+ (MultiQueryPackager) fromPkgr);
} else {
- pkg.addPackage(pk, mapKeyType);
+ toPkgr.addPackager(fromPkgr, mapKeyType);
pkCount = 1;
}
@@ -740,14 +746,15 @@ class MultiQueryOptimizer extends MROpPl
throw new OptimizerException(msg, errCode, PigException.BUG);
}
- if (pkg.isSameMapKeyType()) {
- pkg.setKeyType(pk.getKeyType());
+ if (toPkgr.isSameMapKeyType()) {
+ toPkgr.setKeyType(fromPkgr.getKeyType());
} else {
- pkg.setKeyType(DataType.TUPLE);
+ toPkgr.setKeyType(DataType.TUPLE);
}
}
- private void addShiftedKeyInfoIndex(int index, POPackage pkg) throws OptimizerException {
+ private void addShiftedKeyInfoIndex(int index, Packager pkg)
+ throws OptimizerException {
/**
* we only do multi query optimization for single input MROpers
* Hence originally the keyInfo would have had only index 0. As
@@ -761,7 +768,8 @@ class MultiQueryOptimizer extends MROpPl
* addition should be the same as the "value" in the existing Entry. After
* addition, we should remove the older entry
*/
- Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkg.getKeyInfo();
+ Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkg
+ .getKeyInfo();
byte newIndex = (byte)(index | PigNullableWritable.mqFlag);
Set<Integer> existingIndices = keyInfo.keySet();
@@ -792,9 +800,9 @@ class MultiQueryOptimizer extends MROpPl
* @throws OptimizerException
*/
private int addShiftedKeyInfoIndex(int initialIndex, int onePastEndIndex,
- POMultiQueryPackage mpkg) throws OptimizerException {
+ MultiQueryPackager mpkgr) throws OptimizerException {
- List<POPackage> pkgs = mpkg.getPackages();
+ List<Packager> pkgs = mpkgr.getPackagers();
// if we have lesser pkgs than (onePastEndIndex - initialIndex)
// its because one or more of the pkgs is a POMultiQueryPackage which
// internally has packages.
@@ -810,7 +818,7 @@ class MultiQueryOptimizer extends MROpPl
int i = 0;
int curIndex = initialIndex;
while (i < end) {
- POPackage pkg = pkgs.get(i);
+ Packager pkg = pkgs.get(i);
addShiftedKeyInfoIndex(curIndex, pkg);
curIndex++;
i++;
@@ -823,12 +831,14 @@ class MultiQueryOptimizer extends MROpPl
PhysicalPlan to, int initial, int current, byte mapKeyType) throws VisitorException {
POPackage cpk = (POPackage)from.getRoots().get(0);
from.remove(cpk);
+ Packager cpkgr = cpk.getPkgr();
PODemux demux = (PODemux)to.getLeaves().get(0);
- POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+ MultiQueryPackager toPkgr = (MultiQueryPackager) ((POPackage) to
+ .getRoots().get(0)).getPkgr();
- boolean isSameKeyType = pkg.isSameMapKeyType();
+ boolean isSameKeyType = toPkgr.isSameMapKeyType();
// if current > initial + 1, it means we had
// a split in the map of the MROper we are trying to
@@ -844,21 +854,21 @@ class MultiQueryOptimizer extends MROpPl
// POLocalRearranges.
int total = current - initial;
int pkCount = 0;
- if (cpk instanceof POMultiQueryPackage) {
- List<POPackage> pkgs = ((POMultiQueryPackage)cpk).getPackages();
- for (POPackage p : pkgs) {
- pkg.addPackage(p);
+ if (cpkgr instanceof MultiQueryPackager) {
+ List<Packager> pkgrs = ((MultiQueryPackager) cpkgr).getPackagers();
+ for (Packager p : pkgrs) {
+ toPkgr.addPackager(p);
if (!isSameKeyType) {
p.setKeyType(DataType.TUPLE);
}
pkCount++;
}
} else {
- pkg.addPackage(cpk);
+ toPkgr.addPackager(cpkgr);
pkCount = 1;
}
- pkg.setSameMapKeyType(isSameKeyType);
+ toPkgr.setSameMapKeyType(isSameKeyType);
if (pkCount != total) {
int errCode = 2146;
@@ -868,10 +878,10 @@ class MultiQueryOptimizer extends MROpPl
// all packages should have the same key type
if (!isSameKeyType) {
- cpk.setKeyType(DataType.TUPLE);
+ cpk.getPkgr().setKeyType(DataType.TUPLE);
}
- pkg.setKeyType(cpk.getKeyType());
+ toPkgr.setKeyType(cpk.getPkgr().getKeyType());
// See comment above for why we flatten the Packages
// in the from plan - for the same reason, we flatten
@@ -936,7 +946,7 @@ class MultiQueryOptimizer extends MROpPl
private PhysicalPlan createDemuxPlan(boolean sameKeyType, boolean isCombiner)
throws VisitorException {
PODemux demux = getDemux(isCombiner);
- POMultiQueryPackage pkg= getMultiQueryPackage(sameKeyType, isCombiner);
+ POPackage pkg = getMultiQueryPackage(sameKeyType, isCombiner);
PhysicalPlan pl = new PhysicalPlan();
pl.add(pkg);
@@ -1186,11 +1196,14 @@ class MultiQueryOptimizer extends MROpPl
return demux;
}
- private POMultiQueryPackage getMultiQueryPackage(boolean sameMapKeyType, boolean inCombiner){
- POMultiQueryPackage pkg =
- new POMultiQueryPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
- pkg.setInCombiner(inCombiner);
- pkg.setSameMapKeyType(sameMapKeyType);
+ private POPackage getMultiQueryPackage(boolean sameMapKeyType,
+ boolean inCombiner) {
+ POPackage pkg = new POPackage(new OperatorKey(scope,
+ nig.getNextNodeId(scope)));
+ MultiQueryPackager pkgr = new MultiQueryPackager();
+ pkgr.setInCombiner(inCombiner);
+ pkgr.setSameMapKeyType(sameMapKeyType);
+ pkg.setPkgr(pkgr);
return pkg;
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Sun Apr 6 10:54:13 2014
@@ -20,10 +20,56 @@ package org.apache.pig.backend.hadoop.ex
import java.util.List;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -84,11 +130,6 @@ public class PhyPlanSetter extends PhyPl
}
@Override
- public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
- pkg.setParentPlan(parent);
- }
-
- @Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
super.visitPOForEach(nfe);
nfe.setParentPlan(parent);
@@ -251,11 +292,6 @@ public class PhyPlanSetter extends PhyPl
}
@Override
- public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
- joinPackage.setParentPlan(parent);
- }
-
- @Override
public void visitCast(POCast cast) {
cast.setParentPlan(parent);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Sun Apr 6 10:54:13 2014
@@ -35,7 +35,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -153,7 +153,7 @@ public class PigCombiner {
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack instanceof POJoinPackage)
+ if (pack.getPkgr() instanceof JoinPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Sun Apr 6 10:54:13 2014
@@ -23,8 +23,6 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import org.joda.time.DateTimeZone;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -60,6 +58,7 @@ import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.SpillableMemoryManager;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.joda.time.DateTimeZone;
/**
* This class is the static Mapper & Reducer classes that
@@ -396,7 +395,7 @@ public class PigGenericMapReduce {
// tuples out of the getnext() call of POJoinPackage
// In this case, we process till we see EOP from
// POJoinPacakage.getNext()
- if (pack instanceof POJoinPackage)
+ if (pack.getPkgr() instanceof JoinPackager)
{
pack.attachInput(key, tupIter.iterator());
while (true)
@@ -583,7 +582,7 @@ public class PigGenericMapReduce {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- keyType = pack.getKeyType();
+ keyType = pack.getPkgr().getKeyType();
}
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Sun Apr 6 10:54:13 2014
@@ -30,10 +30,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -221,7 +221,8 @@ public class SecondaryKeyOptimizer exten
PhysicalOperator currentNode = root;
POForEach foreach = null;
while (currentNode != null) {
- if (currentNode instanceof POPackage && !(currentNode instanceof POJoinPackage)
+ if (currentNode instanceof POPackage
+ && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
|| currentNode instanceof POFilter
|| currentNode instanceof POLimit) {
List<PhysicalOperator> succs = mr.reducePlan
@@ -372,7 +373,7 @@ public class SecondaryKeyOptimizer exten
}
}
POPackage pack = (POPackage) root;
- pack.setUseSecondaryKey(true);
+ pack.getPkgr().setUseSecondaryKey(true);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Sun Apr 6 10:54:13 2014
@@ -27,11 +27,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
@@ -78,7 +77,7 @@ public class POPackageAnnotator extends
if(pkg != null) {
// if the POPackage is actually a POPostCombinerPackage, then we should
// just look for the corresponding LocalRearrange(s) in the combine plan
- if(pkg instanceof POCombinerPackage) {
+ if (pkg.getPkgr() instanceof CombinerPackager) {
if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
int errCode = 2085;
String msg = "Unexpected problem during optimization." +
@@ -148,24 +147,6 @@ public class POPackageAnnotator extends
public void visitPackage(POPackage pkg) throws VisitorException {
this.pkg = pkg;
};
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
- */
- @Override
- public void visitJoinPackage(POJoinPackage joinPackage)
- throws VisitorException {
- this.pkg = joinPackage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
- */
- @Override
- public void visitCombinerPackage(POCombinerPackage pkg)
- throws VisitorException {
- this.pkg = pkg;
- }
/**
* @return the pkg
@@ -201,7 +182,7 @@ public class POPackageAnnotator extends
loRearrangeFound++;
Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
- if (pkg instanceof POPackageLite) {
+ if (pkg.getPkgr() instanceof LitePackager) {
if(lrearrange.getIndex() != 0) {
// Throw some exception here
throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
@@ -210,7 +191,7 @@ public class POPackageAnnotator extends
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
- keyInfo = pkg.getKeyInfo();
+ keyInfo = pkg.getPkgr().getKeyInfo();
if(keyInfo == null)
keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
@@ -227,9 +208,9 @@ public class POPackageAnnotator extends
keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
- pkg.setKeyInfo(keyInfo);
- pkg.setKeyTuple(lrearrange.isKeyTuple());
- pkg.setKeyCompound(lrearrange.isKeyCompound());
+ pkg.getPkgr().setKeyInfo(keyInfo);
+ pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+ pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
}
/**
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Sun Apr 6 10:54:13 2014
@@ -20,8 +20,57 @@ package org.apache.pig.backend.hadoop.ex
import java.util.List;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -89,14 +138,6 @@ public class PhyPlanVisitor extends Plan
//do nothing
}
- public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
- //do nothing
- }
-
- public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
- //do nothing
- }
-
public void visitPOForEach(POForEach nfe) throws VisitorException {
List<PhysicalPlan> inpPlans = nfe.getInputPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -242,10 +283,6 @@ public class PhyPlanVisitor extends Plan
// TODO Auto-generated method stub
}
-
- public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
- //do nothing
- }
public void visitCast(POCast cast) {
// TODO Auto-generated method stub
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Sun Apr 6 10:54:13 2014
@@ -30,7 +30,25 @@ import java.util.Set;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
@@ -175,11 +193,14 @@ public class PlanPrinter<O extends Opera
else if(node instanceof POForEach){
sb.append(planString(((POForEach)node).getInputPlans()));
}
- else if (node instanceof POMultiQueryPackage) {
- List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
- for (POPackage pkg : pkgs) {
- sb.append(LSep + pkg.name() + "\n");
+ else if(node instanceof POPackage){
+ Packager pkgr = ((POPackage) node).getPkgr();
+ if(pkgr instanceof MultiQueryPackager){
+ List<Packager> pkgrs = ((MultiQueryPackager) pkgr).getPackagers();
+ for (Packager child : pkgrs){
+ sb.append(LSep + child.name() + "\n");
}
+ }
}
else if(node instanceof POFRJoin){
POFRJoin frj = (POFRJoin)node;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Sun Apr 6 10:54:13 2014
@@ -33,6 +33,7 @@ import javax.xml.transform.stream.Stream
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -40,26 +41,27 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import org.w3c.dom.Node;
public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
- PhyPlanVisitor {
-
+PhyPlanVisitor {
+
private Document doc = null;
private Element parent = null;
-
+
public XMLPhysicalPlanPrinter(PhysicalPlan plan, Document doc, Element parent) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.doc = doc;
@@ -84,7 +86,7 @@ public class XMLPhysicalPlanPrinter<P ex
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-
+
StringWriter sw = new StringWriter();
StreamResult result = new StreamResult(sw);
DOMSource source = new DOMSource(doc);
@@ -94,7 +96,7 @@ public class XMLPhysicalPlanPrinter<P ex
e.printStackTrace();
}
}
-
+
private Element createAlias(PhysicalOperator po) {
Element aliasNode = null;
String alias = po.getAlias();
@@ -112,8 +114,8 @@ public class XMLPhysicalPlanPrinter<P ex
depthFirst(leaf, parentNode);
}
}
-
-
+
+
private void visitPlan(PhysicalPlan pp, Element parentNode) throws VisitorException {
if(pp!=null) {
XMLPhysicalPlanPrinter<PhysicalPlan> ppp =
@@ -121,15 +123,15 @@ public class XMLPhysicalPlanPrinter<P ex
ppp.visit();
}
}
-
-
+
+
private void visitPlan(List<PhysicalPlan> lep, Element parentNode) throws VisitorException {
if(lep!=null)
for (PhysicalPlan ep : lep) {
visitPlan(ep, parentNode);
}
}
-
+
private Element createPONode(PhysicalOperator node) {
Element PONode = doc.createElement(node.getClass().getSimpleName());
PONode.setAttribute("scope", "" + node.getOperatorKey().id);
@@ -150,18 +152,18 @@ public class XMLPhysicalPlanPrinter<P ex
Element loadFile = doc.createElement("loadFile");
loadFile.setTextContent(((POLoad)node).getLFile().getFileName());
PONode.appendChild(loadFile);
-
+
Element isTmpLoad = doc.createElement("isTmpLoad");
isTmpLoad.setTextContent(Boolean.valueOf(((POLoad)node).isTmpLoad()).toString());
PONode.appendChild(isTmpLoad);
}
return PONode;
}
-
+
private void depthFirst(PhysicalOperator node, Element parentNode) throws VisitorException {
Element childNode = null;
-
+
List<PhysicalPlan> subPlans = new ArrayList<PhysicalPlan>();
if(node instanceof POFilter){
subPlans.add(((POFilter) node).getPlan());
@@ -177,12 +179,11 @@ public class XMLPhysicalPlanPrinter<P ex
subPlans = ((POSplit)node).getPlans();
} else if (node instanceof PODemux) {
subPlans = ((PODemux)node).getPlans();
- } else if (node instanceof POMultiQueryPackage) {
- childNode = createPONode(node);
- List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
- for (POPackage pkg : pkgs) {
- childNode.appendChild(createPONode(pkg));
- }
+ } else if(node instanceof POPackage){
+ childNode = createPONode(node);
+ Packager pkgr = ((POPackage) node).getPkgr();
+ Node pkgrNode = createPackagerNode(pkgr);
+ childNode.appendChild(pkgrNode);
} else if(node instanceof POFRJoin){
childNode = createPONode(node);
POFRJoin frj = (POFRJoin)node;
@@ -198,11 +199,11 @@ public class XMLPhysicalPlanPrinter<P ex
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
if(joinPlans!=null) {
List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
- inner_plans.addAll(joinPlans.values());
- visitPlan(inner_plans, childNode);
+ inner_plans.addAll(joinPlans.values());
+ visitPlan(inner_plans, childNode);
}
}
-
+
if (childNode == null) {
childNode = createPONode(node);
if (subPlans.size() > 0) {
@@ -210,17 +211,29 @@ public class XMLPhysicalPlanPrinter<P ex
}
}
parentNode.appendChild(childNode);
-
+
List<PhysicalOperator> originalPredecessors = mPlan.getPredecessors(node);
if (originalPredecessors == null) {
return;
}
-
+
List<PhysicalOperator> predecessors = new ArrayList<PhysicalOperator>(originalPredecessors);
-
+
Collections.sort(predecessors);
for (PhysicalOperator pred : predecessors) {
depthFirst(pred, childNode);
}
}
+
+ private Node createPackagerNode(Packager pkgr) {
+ Element pkgrNode = doc.createElement(pkgr.getClass().getSimpleName());
+ if (pkgr instanceof MultiQueryPackager) {
+ List<Packager> pkgrs = ((MultiQueryPackager) pkgr)
+ .getPackagers();
+ for (Packager child : pkgrs) {
+ pkgrNode.appendChild(createPackagerNode(child));
+ }
+ }
+ return pkgrNode;
+ }
}
\ No newline at end of file
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Sun Apr 6 10:54:13 2014
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+/**
+ * The package operator that packages the globally rearranged tuples into
+ * output format after the combiner stage. It differs from POPackage in that
+ * it does not use the index in the NullableTuple to find the bag to put a
+ * tuple in. Instead, the inputs are put in a bag corresponding to their
+ * offset in the tuple.
+ */
+public class CombinerPackager extends Packager {
+ private boolean[] mBags; // For each field, indicates whether or not it
+ // needs to be put in a bag.
+
+ private Map<Integer, Integer> keyLookup;
+
+ private int numBags;
+
+ /**
+ * A new POPostCombinePackage will be constructed as a near clone of the
+ * provided POPackage.
+ * @param pkg POPackage to clone.
+ * @param bags for each field, indicates whether it should be a bag (true)
+ * or a simple field (false).
+ */
+ public CombinerPackager(Packager pkg, boolean[] bags) {
+ super();
+ keyType = pkg.keyType;
+ numInputs = 1;
+ inner = new boolean[1];
+ for (int i = 0; i < pkg.inner.length; i++) {
+ inner[i] = true;
+ }
+ if (bags != null) {
+ mBags = Arrays.copyOf(bags, bags.length);
+ }
+ numBags = 0;
+ for (int i = 0; i < mBags.length; i++) {
+ if (mBags[i]) numBags++;
+ }
+ }
+
+ /**
+ * @param keyInfo the keyInfo to set
+ */
+ @Override
+ public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+ this.keyInfo = keyInfo;
+ // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+ // group case and not in cogroups. So there should only
+ // be one LocalRearrange from which we get the keyInfo for
+ // which field in the value is in the key. This LocalRearrange
+ // has an index of 0. When we do support combiner in Cogroups
+ // THIS WILL NEED TO BE REVISITED.
+ Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+ keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+ keyLookup = lrKeyInfo.second;
+ }
+
+ private DataBag createDataBag(int numBags) {
+ String bagType = null;
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+ }
+
+ if (bagType != null && bagType.equalsIgnoreCase("default")) {
+ return new NonSpillableDataBag();
+ }
+ return new InternalCachedBag(numBags);
+ }
+
+ @Override
+ public Result getNext() throws ExecException {
+ if (bags == null) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ //Create numInputs bags
+ Object[] fields = new Object[mBags.length];
+ for (int i = 0; i < mBags.length; i++) {
+ if (mBags[i]) fields[i] = createDataBag(numBags);
+ }
+
+ // For each indexed tup in the inp, split them up and place their
+ // fields into the proper bags. If the given field isn't a bag, just
+ // set the value as is.
+ for (Tuple tup : bags[0]) {
+ int tupIndex = 0; // an index for accessing elements from
+ // the value (tup) that we have currently
+ for(int i = 0; i < mBags.length; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if(keyIndex == null && mBags[i]) {
+ // the field for this index is not the
+ // key - so just take it from the "value"
+ // we were handed - Currently THIS HAS TO BE A BAG
+ // In future if this changes, THIS WILL NEED TO BE
+ // REVISITED.
+ ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
+ tupIndex++;
+ } else {
+ // the field for this index is in the key
+ fields[i] = key;
+ }
+ }
+ }
+
+ detachInput();
+
+ // The successor of the POPackage(Combiner) as of
+ // now SHOULD be a POForeach which has been adjusted
+ // to look for its inputs by projecting from the corresponding
+ // positions in the POPackage(Combiner) output.
+ // So we will NOT be adding the key in the result here but merely
+ // putting all bags into a result tuple and returning it.
+ Tuple res;
+ res = mTupleFactory.newTuple(mBags.length);
+ for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+
+ }
+
+ @Override
+ public Tuple getValueTuple(PigNullableWritable keyWritable,
+ NullableTuple ntup, int index) throws ExecException {
+ return (Tuple) ntup.getValueAsPigType();
+ }
+
+}