You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/09/06 01:32:58 UTC
svn commit: r692579 - in /incubator/pig/branches/types: src/org/apache/pig/
src/org/apache/pig/impl/logicalLayer/optimizer/
src/org/apache/pig/impl/plan/ src/org/apache/pig/impl/plan/optimizer/
test/org/apache/pig/test/
Author: gates
Date: Fri Sep 5 16:32:57 2008
New Revision: 692579
URL: http://svn.apache.org/viewvc?rev=692579&view=rev
Log:
PIG-401 Fixed issues with implicit splits being inserted.
Modified:
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java
incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Sep 5 16:32:57 2008
@@ -38,17 +38,13 @@
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -61,10 +57,7 @@
import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.SplitIntroducer;
-import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.util.PropertiesUtil;
@@ -569,7 +562,7 @@
PlanSetter ps = new PlanSetter(lp);
ps.visit();
- (new SplitIntroducer(lp)).introduceImplSplits();
+ //(new SplitIntroducer(lp)).introduceImplSplits();
// run through validator
CompilationMessageCollector collector = new CompilationMessageCollector() ;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Fri Sep 5 16:32:57 2008
@@ -36,23 +36,40 @@
// List of rules for the logical optimizer
- // Add type casting to plans where the schema has been declared (by
- // user, data, or data catalog).
+ // This one has to be first, as the type cast inserter expects the
+ // load to only have one output.
+ // Find any places in the plan that have an implicit split and make
+ // it explicit. Since the RuleMatcher doesn't handle trees properly,
+ // we cheat and say that we match any node. Then we'll do the actual
+ // test in the transformers check method.
List<String> nodes = new ArrayList<String>(1);
- nodes.add("org.apache.pig.impl.logicalLayer.LOLoad");
Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
List<Boolean> required = new ArrayList<Boolean>(1);
+ nodes.add("any");
required.add(true);
- mRules.add(new Rule(nodes, edges, required,
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+ required, new ImplicitSplitInserter(plan)));
+
+ // Add type casting to plans where the schema has been declared (by
+ // user, data, or data catalog).
+ nodes = new ArrayList<String>(1);
+ nodes.add("org.apache.pig.impl.logicalLayer.LOLoad");
+ edges = new HashMap<Integer, Integer>();
+ required = new ArrayList<Boolean>(1);
+ required.add(true);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, required,
new TypeCastInserter(plan)));
+ // Push up limit where ever possible.
nodes = new ArrayList<String>(1);
edges = new HashMap<Integer, Integer>();
required = new ArrayList<Boolean>(1);
nodes.add("org.apache.pig.impl.logicalLayer.LOLimit");
required.add(true);
- mRules.add(new Rule(nodes, edges, required,
- new OpLimitOptimizer(plan)));
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, required,
+ new OpLimitOptimizer(plan)));
+
+
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java Fri Sep 5 16:32:57 2008
@@ -147,7 +147,28 @@
// Insert it into the plan.
mPlan.add(newNode);
mPlan.insertBetween(after, newNode, before);
-
+ fixUpContainedPlans(after, newNode, before, projectionMapping);
+ }
+
+ /**
+ * Once a node has been inserted, inner plans associated with other nodes
+ * may have references to the node that has been replaced or moved. This
+ * function walks those inner plans and patches up references.
+ * @param after Node that has had a new node inserted after it.
+ * @param newNode node that has been inserted
+ * @param before Node that has had a new node inserted before it.
+ * @param projectionMapping A map that defines how projections in after
+ * relate to projections in newNode. Keys are the projection offsets in
+ * after, values are the new offsets in newNode. If this field is null,
+ * then it will be assumed that the mapping is 1-1.
+ * @throws VisitorException, FrontendException
+ */
+ protected void fixUpContainedPlans(
+ LogicalOperator after,
+ LogicalOperator newNode,
+ LogicalOperator before,
+ Map<Integer, Integer> projectionMapping)
+ throws VisitorException, FrontendException {
// Fix up COGroup internal wiring
if (before instanceof LOCogroup) {
LOCogroup cg = (LOCogroup) before ;
@@ -175,9 +196,6 @@
new ProjectFixerUpper(lp, newNode, projectionMapping);
pfu.visit();
}
-
- // Now rebuild the schemas
- // rebuildSchemas();
}
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java Fri Sep 5 16:32:57 2008
@@ -1,107 +0,0 @@
-package org.apache.pig.impl.plan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOConst;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOSplitOutput;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-
-public class SplitIntroducer extends PlanWalker<LogicalOperator, LogicalPlan> {
- private NodeIdGenerator nodeIdGen;
-
- public SplitIntroducer(LogicalPlan plan) {
- super(plan);
- nodeIdGen = NodeIdGenerator.getGenerator();
- }
-
- private long getNextId(String scope) {
- return nodeIdGen.getNextNodeId(scope);
- }
-
- @Override
- public PlanWalker<LogicalOperator, LogicalPlan> spawnChildWalker(LogicalPlan plan) {
- return new SplitIntroducer(plan);
- }
-
- public void introduceImplSplits() throws VisitorException {
- List<LogicalOperator> roots = copySucs(mPlan.getRoots());
- if(roots == null) return;
- for (LogicalOperator root : roots) {
- processNode(root);
- }
- }
-
- @Override
- /**
- * This method is to conform to the interface.
- */
- public void walk(PlanVisitor visitor) throws VisitorException {
- throw new VisitorException(
- "This method is not to be used. This Walker does not call any visit() methods. It only alters the plan by introducing implicit splits if necessary.");
- }
-
- private void processNode(LogicalOperator root) throws VisitorException {
- if(root instanceof LOSplit || root instanceof LOSplitOutput) return;
- List<LogicalOperator> sucs = mPlan.getSuccessors(root);
- if(sucs==null) return;
- int size = sucs.size();
- if(size==0 || size==1) return;
- sucs = copySucs(mPlan.getSuccessors(root));
- disconnect(root,sucs);
- String scope = root.getOperatorKey().scope;
- LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope, getNextId(scope)), new ArrayList<LogicalOperator>());
- mPlan.add(splitOp);
- try {
- mPlan.connect(root, splitOp);
- int index = -1;
- for (LogicalOperator operator : sucs) {
- LogicalPlan condPlan = new LogicalPlan();
- LOConst cnst = new LOConst(mPlan,new OperatorKey(scope, getNextId(scope)), true);
- cnst.setType(DataType.BOOLEAN);
- condPlan.add(cnst);
- LOSplitOutput splitOutput = new LOSplitOutput(mPlan, new OperatorKey(scope, getNextId(scope)), ++index, condPlan);
- splitOp.addOutput(splitOutput);
- mPlan.add(splitOutput);
- mPlan.connect(splitOp, splitOutput);
- mPlan.connect(splitOutput, operator);
- }
- } catch (PlanException e) {
- throw new VisitorException(e);
- }
- }
- private List<LogicalOperator> copySucs(List<LogicalOperator> successors){
- ArrayList<LogicalOperator> ret = new ArrayList<LogicalOperator>();
- for (LogicalOperator operator : successors) {
- ret.add(operator);
- }
- return ret;
- }
-
- private void disconnect(LogicalOperator from, List<LogicalOperator> successors) {
- for (LogicalOperator operator : successors) {
- mPlan.disconnect(from, operator);
- }
- }
-
- /* public static void main(String[] args) throws ExecException, IOException, FrontendException {
- PigServer ser = new PigServer(ExecType.LOCAL);
- ser.registerQuery("A = load 'file:/etc/passwd' using PigStorage(':');");
- ser.registerQuery("B = foreach A generate $0;");
- ser.registerQuery("C = foreach A generate $1;");
- ser.registerQuery("D = group B by $0, C by $0;");
- LogicalPlan lp = ser.getPlanFromAlias("D", "Testing");
- lp.explain(System.out, System.err);
- LogicalPlan fixedLp = ser.compileLp(lp, "Testing");
- System.out.println();
- fixedLp.explain(System.out, System.err);
- }*/
-}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java Fri Sep 5 16:32:57 2008
@@ -161,7 +161,8 @@
for (int i = 0; i < sz; i++) mMatch.add(null);
List<O> successors = new ArrayList<O>();
- if (node.getClass().getName().equals(mRule.nodes.get(0))) {
+ if (node.getClass().getName().equals(mRule.nodes.get(0)) ||
+ mRule.nodes.get(0).equals("any")) {
mMatch.set(0, node);
// Follow the edge to see the next node we should be looking for.
Integer nextOpNum = mRule.edges.get(0);
@@ -180,7 +181,8 @@
}
private boolean continueMatch(O current, Integer nodeNumber) {
- if (current.getClass().getName() == mRule.nodes.get(nodeNumber)) {
+ if (current.getClass().getName().equals(mRule.nodes.get(nodeNumber)) ||
+ mRule.nodes.get(nodeNumber).equals("any")) {
mMatch.set(nodeNumber, current);
// Follow the edge to see the next node we should be looking for.
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java Fri Sep 5 16:32:57 2008
@@ -17,16 +17,11 @@
*/
package org.apache.pig.test;
-import java.io.BufferedReader;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
import org.apache.pig.impl.logicalLayer.*;
import org.apache.pig.impl.logicalLayer.optimizer.*;
import org.apache.pig.test.utils.LogicalPlanTester;
-import org.apache.pig.test.utils.TypeCheckingTestUtil;
import org.junit.Test;
import org.junit.Before;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java Fri Sep 5 16:32:57 2008
@@ -660,6 +660,39 @@
assertTrue(transformer.mTransformed);
}
+ // Test that we match when the pattern says any. Will give
+ // a pattern of any and a plan of S->S->M.
+ @Test
+ public void testOptimizerMatchesAny() throws Exception {
+ // Build a plan
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[3];
+ ops[0] = new SingleOperator("1");
+ plan.add(ops[0]);
+ ops[1] = new SingleOperator("2");
+ plan.add(ops[1]);
+ ops[2] = new MultiOperator("3");
+ plan.add(ops[2]);
+ plan.connect(ops[0], ops[1]);
+ plan.connect(ops[1], ops[2]);
+
+ // Create our rule
+ ArrayList<String> nodes = new ArrayList<String>(3);
+ nodes.add("any");
+ HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+ ArrayList<Boolean> required = new ArrayList<Boolean>(1);
+ required.add(true);
+ AlwaysTransform transformer = new AlwaysTransform(plan);
+ Rule<TOperator, TPlan> r =
+ new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+ TOptimizer optimizer = new TOptimizer(plan);
+ optimizer.addRule(r);
+
+ optimizer.optimize();
+ assertTrue(transformer.mTransformed);
+ }
+
// Test that we match when the whole plan doesn't match. Will give
// a pattern of S->S->M and a plan of S->S->S->M.
@Test