You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/01/20 01:53:06 UTC
svn commit: r901019 - in /hadoop/pig/branches/branch-0.6: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
test/org/apache/pig/test/TestSecondarySort.java
Author: daijy
Date: Wed Jan 20 00:53:05 2010
New Revision: 901019
URL: http://svn.apache.org/viewvc?rev=901019&view=rev
Log:
PIG-1193: Secondary sort issue on nested desc sort
Modified:
hadoop/pig/branches/branch-0.6/CHANGES.txt
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=901019&r1=901018&r2=901019&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Wed Jan 20 00:53:05 2010
@@ -270,6 +270,8 @@
PIG-1186: Pig do not take values in "pig-cluster-hadoop-site.xml" (daijy)
+PIG-1193: Secondary sort issue on nested desc sort (daijy)
+
Release 0.5.0
INCOMPATIBLE CHANGES
Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=901019&r1=901018&r2=901019&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Wed Jan 20 00:53:05 2010
@@ -29,7 +29,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -43,7 +42,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -245,17 +244,22 @@
for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
// visit inner plans to figure out the sort order for distinct /
// sort
- SecondaryKeyDiscoverVisitor innerPlanVisitor = new SecondaryKeyDiscoverVisitor(
+ SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
innerPlan, sortKeyInfos, secondarySortKeyInfo);
- innerPlanVisitor.visit();
- secondarySortKeyInfo = innerPlanVisitor.getSecondarySortKeyInfo();
- if (innerPlanVisitor.getSortsToRemove() != null) {
- for (POSort sort : innerPlanVisitor.getSortsToRemove()) {
+ try {
+ innerPlanDiscover.process();
+ } catch (FrontendException e) {
+ int errorCode = 2213;
+ throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
+ }
+ secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
+ if (innerPlanDiscover.getSortsToRemove() != null) {
+ for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
}
}
- if (innerPlanVisitor.getDistinctsToChange() != null) {
- for (PODistinct distinct : innerPlanVisitor
+ if (innerPlanDiscover.getDistinctsToChange() != null) {
+ for (PODistinct distinct : innerPlanDiscover
.getDistinctsToChange()) {
distinctsToChange.add(new POToChange(distinct, innerPlan,
foreach));
@@ -289,10 +293,14 @@
String scope = oldSort.getOperatorKey().scope;
List<PhysicalOperator> preds = sortToRemove.plan
.getPredecessors(sortToRemove.oper);
+ List<PhysicalOperator> succs = sortToRemove.plan
+ .getSuccessors(sortToRemove.oper);
POProject project = null;
- if (preds == null
+ if ((preds == null
|| preds.get(0).getResultType() != DataType.BAG
- && oldSort.getResultType() == DataType.BAG) {
+ && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
+ && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
+ {
project = new PORelationToExprProject(new OperatorKey(
scope, NodeIdGenerator.getGenerator()
.getNextNodeId(scope)), oldSort
@@ -351,7 +359,7 @@
}
if (!found)
{
- int errorCode = 2209;
+ int errorCode = 2214;
new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
}
}
@@ -429,7 +437,9 @@
// we cannot do any secondary key optimization because we only have 1
// secondary
// sort key.
- private static class SecondaryKeyDiscoverVisitor extends PhyPlanVisitor {
+ private static class SecondaryKeyDiscover {
+ PhysicalPlan mPlan;
+
List<POSort> sortsToRemove = new ArrayList<POSort>();
List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
@@ -438,25 +448,60 @@
SortKeyInfo secondarySortKeyInfo;
- ColumnChainInfo columnChainInfo = new ColumnChainInfo();
-
- boolean sawInvalidPhysicalOper = false;
+ ColumnChainInfo columnChainInfo = null;
// PhysicalPlan here is foreach inner plan
- SecondaryKeyDiscoverVisitor(PhysicalPlan plan,
+ SecondaryKeyDiscover(PhysicalPlan plan,
List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
- super(plan,
- new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
- plan));
+ this.mPlan = plan;
this.sortKeyInfos = sortKeyInfos;
this.secondarySortKeyInfo = secondarySortKeyInfo;
}
+
+ public void process() throws FrontendException
+ {
+ List<PhysicalOperator> roots = mPlan.getRoots();
+ for (PhysicalOperator root : roots) {
+ columnChainInfo = new ColumnChainInfo();
+ processRoot(root);
+ }
+ }
+
+ public void processRoot(PhysicalOperator root) throws FrontendException {
+ PhysicalOperator currentNode = root;
+ while (currentNode!=null) {
+ boolean sawInvalidPhysicalOper = false;
+ if (currentNode instanceof PODistinct)
+ sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
+ else if (currentNode instanceof POSort)
+ sawInvalidPhysicalOper = processSort((POSort)currentNode);
+ else if (currentNode instanceof POProject)
+ sawInvalidPhysicalOper = processProject((POProject)currentNode);
+ else if (currentNode instanceof POForEach)
+ sawInvalidPhysicalOper = processForEach((POForEach)currentNode);
+ else if (currentNode instanceof POUserFunc ||
+ currentNode instanceof POUnion)
+ break;
+
+ if (sawInvalidPhysicalOper)
+ break;
+
+ List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
+ if (succs==null)
+ currentNode = null;
+ else {
+ if (succs.size()>1) {
+ int errorCode = 2215;
+ throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
+ errorCode);
+ }
+ currentNode = succs.get(0);
+ }
+ }
+ }
// We see PODistinct, check which key it is using
- @Override
- public void visitDistinct(PODistinct distinct) throws VisitorException {
- if (sawInvalidPhysicalOper)
- return;
+ public boolean processDistinct(PODistinct distinct) throws FrontendException {
SortKeyInfo keyInfos = new SortKeyInfo();
try {
keyInfos.insertColumnChainInfo(0,
@@ -469,7 +514,7 @@
for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
if (sortKeyInfo.moreSpecificThan(keyInfos)) {
distinctsToChange.add(distinct);
- return;
+ return false;
}
}
@@ -477,7 +522,7 @@
if (secondarySortKeyInfo != null
&& secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
distinctsToChange.add(distinct);
- return;
+ return false;
}
// Now set the secondary key
@@ -485,40 +530,22 @@
distinctsToChange.add(distinct);
secondarySortKeyInfo = keyInfos;
}
- }
-
- @Override
- public void visitLimit(POLimit limit) throws VisitorException {
+ return false;
}
// Accumulate column info
- @Override
- public void visitProject(POProject project) throws VisitorException {
+ public boolean processProject(POProject project) throws FrontendException {
columnChainInfo.insertInReduce(project.isStar(), project
.getColumns(), project.getResultType());
- }
-
- @Override
- public void visitFilter(POFilter filter) throws VisitorException {
- }
-
- @Override
- public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
- sawInvalidPhysicalOper = true;
- }
-
- @Override
- public void visitUnion(POUnion union) throws VisitorException {
- sawInvalidPhysicalOper = true;
+ return false;
}
// Accumulate column info from nested project
- @Override
- public void visitPOForEach(POForEach fe) throws VisitorException {
+ public boolean processForEach(POForEach fe) throws FrontendException {
if (fe.getInputPlans().size() > 1) {
// I may be wrong, but for now let's assume foreach plan before
// sort/distinct only have one foreach plan
- throw new VisitorException(
+ throw new FrontendException(
"POForEach has more than 1 input plans");
}
boolean r = false;
@@ -527,19 +554,15 @@
columnChainInfo);
} catch (PlanException e) {
int errorCode = 2205;
- throw new VisitorException("Error visiting POForEach inner plan",
+ throw new FrontendException("Error visiting POForEach inner plan",
errorCode, e);
}
// See something other than POProject in POForEach, set the flag to stop further processing
- if (r)
- sawInvalidPhysicalOper = true;
+ return r;
}
// We see POSort, check which key it is using
- @Override
- public void visitSort(POSort sort) throws VisitorException {
- if (sawInvalidPhysicalOper)
- return;
+ public boolean processSort(POSort sort) throws FrontendException{
SortKeyInfo keyInfo = new SortKeyInfo();
for (int i = 0; i < sort.getSortPlans().size(); i++) {
PhysicalPlan sortPlan = sort.getSortPlans().get(i);
@@ -555,13 +578,12 @@
r = collectColumnChain(sortPlan, sortChainInfo);
} catch (PlanException e) {
int errorCode = 2206;
- throw new VisitorException("Error visiting POSort inner plan",
+ throw new FrontendException("Error visiting POSort inner plan",
errorCode, e);
}
- if (r) // if we saw physical operator other than project in sort plan
+ if (r==true) // if we saw physical operator other than project in sort plan
{
- sawInvalidPhysicalOper = true;
- return;
+ return true;
}
keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
.getMAscCols().get(i));
@@ -570,14 +592,14 @@
for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
if (sortKeyInfo.moreSpecificThan(keyInfo)) {
sortsToRemove.add(sort);
- return;
+ return false;
}
}
// if it is part of secondary key
if (secondarySortKeyInfo != null
&& secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
sortsToRemove.add(sort);
- return;
+ return false;
}
// Now set the secondary key
@@ -585,6 +607,7 @@
sortsToRemove.add(sort);
secondarySortKeyInfo = keyInfo;
}
+ return false;
}
public List<POSort> getSortsToRemove() {
Modified: hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java?rev=901019&r1=901018&r2=901019&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestSecondarySort.java Wed Jan 20 00:53:05 2010
@@ -366,6 +366,26 @@
assertTrue(so.getDistinctChanged()==0);
}
+ // See PIG-1193
+ public void testSortOptimization8() throws Exception{
+ // Sort desc, used in UDF twice
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+ planTester.buildPlan("A = LOAD 'input1' AS (a0);");
+ planTester.buildPlan("B = group A all;");
+ planTester.buildPlan("C = foreach B { D = order A by $0 desc; generate DIFF(D, D);};");
+
+ LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+ PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+ MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+
+ SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+ so.visit();
+
+ assertTrue(so.getNumMRUseSecondaryKey()==1);
+ assertTrue(so.getNumSortRemoved()==2);
+ assertTrue(so.getDistinctChanged()==0);
+ }
+
public void testNestedDistinctEndToEnd1() throws Exception{
File tmpFile1 = File.createTempFile("test", "txt");
PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));