You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/04/15 09:45:00 UTC
svn commit: r765073 [1/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/impl/logicalLayer/optimizer/
src/org/apache/pig/impl/plan/ src/org/apache/pig/impl/plan/optimizer/
src/org/apache/pig/pen/util/ test/org/apache/pig/test/ test/o...
Author: sms
Date: Wed Apr 15 07:44:59 2009
New Revision: 765073
URL: http://svn.apache.org/viewvc?rev=765073&view=rev
Log:
PIG-693: Proposed improvements to pig's optimizer
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/Main.java
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java
hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan4.dot
hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan5.dot
hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan6.dot
hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan7.dot
hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot
hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Apr 15 07:44:59 2009
@@ -24,6 +24,8 @@
IMPROVEMENTS
+PIG-693: Proposed improvements to pig's optimizer (sms)
+
PIG-700: To automate the pig patch test process (gkesavan via sms)
BUG FIXES
Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Wed Apr 15 07:44:59 2009
@@ -36,6 +36,7 @@
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.tools.cmdline.CmdLineParser;
import org.apache.pig.tools.grunt.Grunt;
@@ -82,6 +83,7 @@
boolean dryrun = false;
ArrayList<String> params = new ArrayList<String>();
ArrayList<String> paramFiles = new ArrayList<String>();
+ HashSet<String> optimizerRules = new HashSet<String>();
CmdLineParser opts = new CmdLineParser(args);
opts.registerOpt('4', "log4jconf", CmdLineParser.ValueExpected.REQUIRED);
@@ -91,16 +93,17 @@
opts.registerOpt('e', "execute", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('f', "file", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('h', "help", CmdLineParser.ValueExpected.NOT_ACCEPTED);
- opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
- opts.registerOpt('j', "jar", CmdLineParser.ValueExpected.REQUIRED);
- opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
- opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('i', "version", CmdLineParser.ValueExpected.OPTIONAL);
- opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
+ opts.registerOpt('j', "jar", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('l', "logfile", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
+ opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+ opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
- opts.registerOpt('l', "logfile", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('t', "optimizer_off", CmdLineParser.ValueExpected.REQUIRED);
+ opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('w', "warning", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+ opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
ExecMode mode = ExecMode.UNKNOWN;
String file = null;
@@ -162,6 +165,10 @@
usage();
return;
+ case 'i':
+ System.out.println(getVersionString());
+ return;
+
case 'j':
String jarsString = opts.getValStr();
if(jarsString != null){
@@ -205,6 +212,10 @@
// will be extended in the future
dryrun = true;
break;
+
+ case 't':
+ optimizerRules.add(opts.getValStr());
+ break;
case 'v':
properties.setProperty(VERBOSE, ""+true);
@@ -222,9 +233,6 @@
throw new RuntimeException("ERROR: Unrecognized exectype.", e);
}
break;
- case 'i':
- System.out.println(getVersionString());
- return;
default: {
Character cc = new Character(opt);
throw new AssertionError("Unhandled option " + cc.toString());
@@ -241,6 +249,10 @@
}
pigContext.getProperties().setProperty("pig.logfile", logFileName);
+
+ if(optimizerRules.size() > 0) {
+ pigContext.getProperties().setProperty("pig.optimizer.rules", ObjectSerializer.serialize(optimizerRules));
+ }
LogicalPlanBuilder.classloader = pigContext.createCl(null);
@@ -483,14 +495,21 @@
System.out.println(" -b, -brief brief logging (no timestamps)");
System.out.println(" -c, -cluster clustername, kryptonite is default");
System.out.println(" -d, -debug debug level, INFO is default");
+ System.out.println(" -e, -execute commands to execute (within quotes)");
+ System.out.println(" -f, -file path to the script to execute");
System.out.println(" -h, -help display this message");
+ System.out.println(" -i, -version display version information");
System.out.println(" -j, -jar jarfile load jarfile");
+ System.out.println(" -l, -logfile path to client side log file; current working directory is default");
+ System.out.println(" -m, -param_file path to the parameter file");
System.out.println(" -o, -hod read hod server from system property ssh.gateway");
+ System.out.println(" -p, -param key value pair of the form param=val");
+ System.out.println(" -r, -dryrun CmdLineParser.ValueExpected.NOT_ACCEPTED");
+ System.out.println(" -t, -optimizer_off optimizer rule name, turn optimizer off for this rule; use all to turn all rules off, optimizer is turned on by default");
System.out.println(" -v, -verbose print all error messages to screen");
- System.out.println(" -x, -exectype local|mapreduce, mapreduce is default");
- System.out.println(" -i, -version display version information");
- System.out.println(" -l, -logfile path to client side log file; current working directory is default");
System.out.println(" -w, -warning turn warning on; also turns warning aggregation off");
+ System.out.println(" -x, -exectype local|mapreduce, mapreduce is default");
+
}
private static String validateLogFile(String logFileName, String scriptName) {
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Apr 15 07:44:59 2009
@@ -28,6 +28,7 @@
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -64,6 +65,7 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.logicalLayer.LODefine;
import org.apache.pig.impl.logicalLayer.LOStore;
@@ -728,8 +730,6 @@
PlanSetter ps = new PlanSetter(lpClone);
ps.visit();
- //(new SplitIntroducer(lp)).introduceImplSplits();
-
// run through validator
CompilationMessageCollector collector = new CompilationMessageCollector() ;
FrontendException caught = null;
@@ -758,8 +758,18 @@
// optimize
if (optimize) {
- //LogicalOptimizer optimizer = new LogicalOptimizer(lpClone);
- LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, pigContext.getExecType());
+ HashSet<String> optimizerRules = null;
+ try {
+ optimizerRules = (HashSet<String>) ObjectSerializer
+ .deserialize(pigContext.getProperties().getProperty(
+ "pig.optimizer.rules"));
+ } catch (IOException ioe) {
+ int errCode = 2110;
+ String msg = "Unable to deserialize optimizer rules.";
+ throw new FrontendException(msg, errCode, PigException.BUG, ioe);
+ }
+
+ LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, pigContext.getExecType(), optimizerRules);
optimizer.optimize();
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Wed Apr 15 07:44:59 2009
@@ -17,14 +17,18 @@
*/
package org.apache.pig.impl.logicalLayer.optimizer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
import org.apache.pig.ExecType;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOPrinter;
+import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.*;
/**
@@ -33,8 +37,10 @@
public class LogicalOptimizer extends
PlanOptimizer<LogicalOperator, LogicalPlan> {
- public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
- public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
+ private static final String SCOPE = "RULE";
+ private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
+
+ private Set<String> mRulesOff = null;
public LogicalOptimizer(LogicalPlan plan) {
this(plan, ExecType.MAPREDUCE);
@@ -44,8 +50,16 @@
super(plan);
runOptimizations(plan, mode);
}
+
+ public LogicalOptimizer(LogicalPlan plan, ExecType mode, Set<String> turnOffRules) {
+ super(plan);
+ mRulesOff = turnOffRules;
+ runOptimizations(plan, mode);
+ }
private void runOptimizations(LogicalPlan plan, ExecType mode) {
+ RulePlan rulePlan;
+
// List of rules for the logical optimizer
// This one has to be first, as the type cast inserter expects the
@@ -54,49 +68,74 @@
// it explicit. Since the RuleMatcher doesn't handle trees properly,
// we cheat and say that we match any node. Then we'll do the actual
// test in the transformers check method.
- List<String> nodes = new ArrayList<String>(1);
- Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
- List<Boolean> required = new ArrayList<Boolean>(1);
- nodes.add("any");
- required.add(true);
- mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
- required, new ImplicitSplitInserter(plan)));
+
+ boolean turnAllRulesOff = false;
+ if (mRulesOff != null) {
+ for (String rule : mRulesOff) {
+ if ("all".equalsIgnoreCase(rule)) {
+ turnAllRulesOff = true;
+ break;
+ }
+ }
+ }
+
+ rulePlan = new RulePlan();
+ RuleOperator anyLogicalOperator = new RuleOperator(LogicalOperator.class, RuleOperator.NodeType.ANY_NODE,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(anyLogicalOperator);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+ new ImplicitSplitInserter(plan), "ImplicitSplitInserter"));
// Add type casting to plans where the schema has been declared (by
// user, data, or data catalog).
- nodes = new ArrayList<String>(1);
- nodes.add(LOLOAD_CLASSNAME);
- edges = new HashMap<Integer, Integer>();
- required = new ArrayList<Boolean>(1);
- required.add(true);
- mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
- required, new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+ rulePlan = new RulePlan();
+ RuleOperator loLoad = new RuleOperator(LOLoad.class,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(loLoad);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+ new TypeCastInserter(plan, LOLoad.class.getName()), "LoadTypeCastInserter"));
// Add type casting to plans where the schema has been declared by
// user in a statement with stream operator.
- nodes = new ArrayList<String>(1);
- nodes.add(LOSTREAM_CLASSNAME);
- edges = new HashMap<Integer, Integer>();
- required = new ArrayList<Boolean>(1);
- required.add(true);
- mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan,
- LOSTREAM_CLASSNAME)));
+ rulePlan = new RulePlan();
+ RuleOperator loStream= new RuleOperator(LOStream.class,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(loStream);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
+ LOStream.class.getName()), "StreamTypeCastInserter"));
// Optimize when LOAD precedes STREAM and the loader class
// is the same as the serializer for the STREAM.
// Similarly optimize when STREAM is followed by store and the
// deserializer class is same as the Storage class.
- mRules.add(new Rule(nodes, edges, required, new StreamOptimizer(plan,
- LOSTREAM_CLASSNAME)));
+ if(!turnAllRulesOff) {
+ Rule rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan, new StreamOptimizer(plan,
+ LOStream.class.getName()), "StreamOptimizer");
+ checkAndAddRule(rule);
+ }
// Push up limit where ever possible.
- nodes = new ArrayList<String>(1);
- edges = new HashMap<Integer, Integer>();
- required = new ArrayList<Boolean>(1);
- nodes.add("org.apache.pig.impl.logicalLayer.LOLimit");
- required.add(true);
- mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
- required, new OpLimitOptimizer(plan, mode)));
+ if(!turnAllRulesOff) {
+ rulePlan = new RulePlan();
+ RuleOperator loLimit = new RuleOperator(LOLimit.class,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(loLimit);
+ Rule rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+ new OpLimitOptimizer(plan, mode), "LimitOptimizer");
+ checkAndAddRule(rule);
+ }
+
+ }
+
+ private void checkAndAddRule(Rule rule) {
+ if(mRulesOff != null) {
+ for(String ruleOff: mRulesOff) {
+ String ruleName = rule.getRuleName();
+ if(ruleName == null) continue;
+ if(ruleName.equalsIgnoreCase(ruleOff)) return;
+ }
+ }
+ mRules.add(rule);
}
-}
+}
\ No newline at end of file
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Wed Apr 15 07:44:59 2009
@@ -60,7 +60,7 @@
}
@Override
- public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
+ public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
try {
LogicalOperator op = getOperator(nodes);
Schema s = op.getSchema();
@@ -70,7 +70,7 @@
List<Schema.FieldSchema> fss = s.getFields();
List<Byte> types = new ArrayList<Byte>(s.size());
Schema determinedSchema = null;
- if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+ if(LOLoad.class.getName().equals(operatorClassName)) {
determinedSchema = ((LOLoad)op).getDeterminedSchema();
}
for (int i = 0; i < fss.size(); i++) {
@@ -105,7 +105,7 @@
}
LogicalOperator lo = nodes.get(0);
- if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+ if(LOLoad.class.getName().equals(operatorClassName)) {
if (lo == null || !(lo instanceof LOLoad)) {
int errCode = 2005;
String msg = "Expected " + LOLoad.class.getSimpleName() + ", got " + lo.getClass().getSimpleName();
@@ -113,7 +113,7 @@
}
return lo;
- } else if(operatorClassName == LogicalOptimizer.LOSTREAM_CLASSNAME){
+ } else if(LOStream.class.getName().equals(operatorClassName)){
if (lo == null || !(lo instanceof LOStream)) {
int errCode = 2005;
String msg = "Expected " + LOStream.class.getSimpleName() + ", got " + lo.getClass().getSimpleName();
@@ -147,7 +147,7 @@
// Note that in this case, the data coming out of the loader is not
// a BYTEARRAY but is whatever determineSchema() says it is.
Schema determinedSchema = null;
- if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+ if(LOLoad.class.getName().equals(operatorClassName)) {
determinedSchema = ((LOLoad)lo).getDeterminedSchema();
}
for (int i = 0; i < s.size(); i++) {
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Wed Apr 15 07:44:59 2009
@@ -17,6 +17,8 @@
*/
package org.apache.pig.impl.plan;
+import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.util.ArrayList;
@@ -651,6 +653,12 @@
}
}
+ public void explain(
+ OutputStream out,
+ PrintStream ps) throws VisitorException, IOException {
+ PlanPrinter pp = new PlanPrinter(ps, this);
+ pp.print(out);
+ }
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.io.PrintStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.plan.optimizer.RuleOperator;
+import org.apache.pig.impl.plan.optimizer.RulePlan;
+import org.apache.pig.impl.plan.optimizer.RulePlanVisitor;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class PlanPrinter<O extends Operator, P extends OperatorPlan<O>> extends PlanVisitor<O, P> {
+
+ private PrintStream mStream = null;
+ private String TAB1 = " ";
+ private String TABMore = "| ";
+ private String LSep = "|\n|---";
+ private String USep = "| |\n| ";
+ private int levelCntr = -1;
+ private OutputStream printer;
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan Logical plan to print
+ */
+ public PlanPrinter(PrintStream ps, P plan) {
+ //super(plan, new DependencyOrderWalker(plan));
+ super(plan, new DepthFirstWalker(plan));
+ mStream = ps;
+ }
+
+ @Override
+ public void visit() throws VisitorException {
+ try {
+ mStream.write(depthFirst().getBytes());
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ public void print(OutputStream printer) throws VisitorException, IOException {
+ this.printer = printer;
+ printer.write(depthFirst().getBytes());
+ }
+
+
+ protected String depthFirst() throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder();
+ List<O> leaves = mPlan.getLeaves();
+ Collections.sort(leaves);
+ for (O leaf : leaves) {
+ sb.append(depthFirst(leaf));
+ sb.append("\n");
+ }
+ //sb.delete(sb.length() - "\n".length(), sb.length());
+ //sb.delete(sb.length() - "\n".length(), sb.length());
+ return sb.toString();
+ }
+
+ private String planString(P plan) throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ if(plan!=null)
+ plan.explain(baos, mStream);
+ else
+ return "";
+ sb.append(USep);
+ sb.append(shiftStringByTabs(baos.toString(), 2));
+ return sb.toString();
+ }
+
+ private String planString(
+ List<P> plans) throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder();
+ if(plans!=null)
+ for (P plan : plans) {
+ sb.append(planString(plan));
+ }
+ return sb.toString();
+ }
+
+ private String depthFirst(O node) throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder(node.name());
+ sb.append("\n");
+
+ List<O> originalPredecessors = mPlan.getPredecessors(node);
+ if (originalPredecessors == null)
+ return sb.toString();
+
+ List<O> predecessors = new ArrayList<O>(originalPredecessors);
+
+ Collections.sort(predecessors);
+ int i = 0;
+ for (O pred : predecessors) {
+ i++;
+ String DFStr = depthFirst(pred);
+ if (DFStr != null) {
+ sb.append(LSep);
+ if (i < predecessors.size())
+ sb.append(shiftStringByTabs(DFStr, 2));
+ else
+ sb.append(shiftStringByTabs(DFStr, 1));
+ }
+ }
+ return sb.toString();
+ }
+
+ private String shiftStringByTabs(String DFStr, int TabType) {
+ StringBuilder sb = new StringBuilder();
+ String[] spl = DFStr.split("\n");
+
+ String tab = (TabType == 1) ? TAB1 : TABMore;
+
+ sb.append(spl[0] + "\n");
+ for (int i = 1; i < spl.length; i++) {
+ sb.append(tab);
+ sb.append(spl[i]);
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ private void dispTabs() {
+ for (int i = 0; i < levelCntr; i++)
+ System.out.print(TAB1);
+ }
+}
+
+
Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class CommonNodeFinder extends
+ RulePlanVisitor {
+
+ private List<RuleOperator> mCommonNodes = null;
+
+ public CommonNodeFinder(RulePlan plan) {
+ super(plan, new DependencyOrderWalker<RuleOperator, RulePlan>(plan));
+ }
+
+ public int getCount() {
+ return ((mCommonNodes == null) ? 0 : mCommonNodes.size());
+ }
+
+ public List<RuleOperator> getCommonNodes() {
+ return mCommonNodes;
+ }
+
+ private void reset() {
+ mCommonNodes = new ArrayList<RuleOperator>();
+ }
+
+ @Override
+ public void visit() throws VisitorException {
+ reset();
+ super.visit();
+ }
+
+ /**
+ * @param ruleOp
+ * the rule operator that has to be visited
+ * @throws VisitorException
+ */
+ protected void visit(RuleOperator ruleOp)
+ throws VisitorException {
+ /**
+ * A common node is a node that appears in the common path of two nodes in the rule plan
+ * Any node that has more than one predecessor is a common node
+ * Any node that has a predecessor which is a common node is a common node
+ * Any node that has more than one successor is a common node
+ */
+ if(ruleOp.getNodeType().equals(RuleOperator.NodeType.ANY_NODE)) {
+ return;
+ }
+ List<RuleOperator> predecessors = mPlan.getPredecessors(ruleOp);
+ List<RuleOperator> successors = mPlan.getSuccessors(ruleOp);
+
+ if(predecessors != null) {
+ if(predecessors.size() > 1) {
+ ruleOp.setNodeType(RuleOperator.NodeType.COMMON_NODE);
+ mCommonNodes.add(ruleOp);
+ return;
+ } else {
+ //has to be one predecessor
+ //check if the predecessor is a common node then this node is
+ //also a common node
+ RuleOperator ruleOperatorPredecessor = predecessors.get(0);
+ if(ruleOperatorPredecessor.getNodeType().equals(RuleOperator.NodeType.COMMON_NODE)) {
+ ruleOp.setNodeType(RuleOperator.NodeType.COMMON_NODE);
+ mCommonNodes.add(ruleOp);
+ return;
+ }
+ }
+ }
+
+ if(successors != null) {
+ if (successors.size() > 1) {
+ ruleOp.setNodeType(RuleOperator.NodeType.COMMON_NODE);
+ mCommonNodes.add(ruleOp);
+ return;
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java Wed Apr 15 07:44:59 2009
@@ -22,6 +22,7 @@
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
/******************************************************************************
* A class to optimize plans. This class need not be subclassed for a
@@ -34,15 +35,29 @@
protected List<Rule> mRules;
protected P mPlan;
+ protected int mMaxIterations;
/**
* @param plan Plan to optimize
*/
protected PlanOptimizer(P plan) {
+ this(plan, 500);
+ }
+
+ /**
+ * @param plan Plan to optimize
+ * @param iterations maximum number of optimization iterations
+ */
+ protected PlanOptimizer(P plan, int iterations) {
mRules = new ArrayList<Rule>();
mPlan = plan;
+ if(iterations < 0) {
+ mMaxIterations = 1000;
+ } else {
+ mMaxIterations = iterations;
+ }
}
-
+
/**
* Run the optimizer. This method attempts to match each of the Rules
* against the plan. If a Rule matches, it then calls the check
@@ -52,20 +67,26 @@
* @throws OptimizerException
*/
public final void optimize() throws OptimizerException {
- RuleMatcher matcher = new RuleMatcher();
- for (Rule rule : mRules) {
- if (matcher.match(rule)) {
- // It matches the pattern. Now check if the transformer
- // approves as well.
- List<List<O>> matches = matcher.getAllMatches();
- for (List<O> match:matches)
- {
- if (rule.transformer.check(match)) {
- // The transformer approves.
- rule.transformer.transform(match);
- }
+ boolean sawMatch = false;
+ int numIterations = 0;
+ do {
+ sawMatch = false;
+ for (Rule rule : mRules) {
+ RuleMatcher matcher = new RuleMatcher();
+ if (matcher.match(rule)) {
+ // It matches the pattern. Now check if the transformer
+ // approves as well.
+ List<List<O>> matches = matcher.getAllMatches();
+ for (List<O> match:matches)
+ {
+ if (rule.getTransformer().check(match)) {
+ // The transformer approves.
+ sawMatch = true;
+ rule.getTransformer().transform(match);
+ }
+ }
}
}
- }
+ } while(sawMatch && ++numIterations < mMaxIterations);
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java Wed Apr 15 07:44:59 2009
@@ -17,56 +17,70 @@
*/
package org.apache.pig.impl.plan.optimizer;
-import java.util.List;
-import java.util.Map;
-
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
/**
- * A rule for optimizing a plan. The rule contains a pattern that must be
- * matched in the plan before the optimizer can consider applying the rule
- * and a transformer to do further checks and possibly transform the plan.
- * The rule pattern is expressed as a list of node names, a map of edges in
- * the plan, and a list of boolean values indicating whether the node is
- * required. For example, a rule pattern could be expressed as:
- * [Filter, Filter] {[0, 1]} [true, true], which would indicate this rule
- * matches two nodes of class name Filter, with an edge between the two,
- * and both are required.
+ * A rule for optimizing a plan. The rule contains a pattern that must be
+ * matched in the plan before the optimizer can consider applying the rule and a
+ * transformer to do further checks and possibly transform the plan. The rule
+ * pattern is expressed as a list of node names, a map of edges in the plan, and
+ * a list of boolean values indicating whether the node is required. For
+ * example, a rule pattern could be expressed as: [Filter, Filter] {[0, 1]}
+ * [true, true], which would indicate this rule matches two nodes of class name
+ * Filter, with an edge between the two, and both are required.
*/
public class Rule<O extends Operator, P extends OperatorPlan<O>> {
- public enum WalkerAlgo {DepthFirstWalker, DependencyOrderWalker};
- public List<String> nodes;
- public Map<Integer, Integer> edges;
- public List<Boolean> required;
- public Transformer<O, P> transformer;
- public WalkerAlgo algo;
+ public enum WalkerAlgo {
+ DepthFirstWalker, DependencyOrderWalker
+ };
+
+ private RulePlan mRulePlan;
+ private Transformer<O, P> mTransformer;
+ private WalkerAlgo mWalkerAlgo;
+ private String mRuleName = null;
+
+ /**
+ * @param plan
+ * pattern to look for
+ * @param t
+ * Transformer to apply if the rule matches.
+ */
+ public Rule(RulePlan plan, Transformer<O, P> t, String ruleName) {
+ this(plan, t, ruleName, WalkerAlgo.DependencyOrderWalker);
+ }
/**
- * @param n List of node types to look for.
- * @param e Map of integers to integers. Each integer
- * represents the offset into nodes list.
- * @param r List of boolean indicating whether given nodes are
- * required for the pattern to match.
- * @param t Transformer to apply if the rule matches.
- * @param al Walker algorithm to find rule match within the plan.
+ * @param plan
+ * pattern to look for
+ * @param t
+ * Transformer to apply if the rule matches.
+ * @param al
+ * Walker algorithm to find rule match within the plan.
*/
- public Rule(List<String> n,
- Map<Integer, Integer> e,
- List<Boolean> r,
- Transformer<O, P> t, WalkerAlgo al) {
- nodes = n;
- edges = e;
- required = r;
- transformer = t;
- algo = al;
- }
-
- public Rule(List<String> n,
- Map<Integer, Integer> e,
- List<Boolean> r,
- Transformer<O, P> t) {
- this(n, e, r, t, WalkerAlgo.DependencyOrderWalker);
+ public Rule(RulePlan plan, Transformer<O, P> t, String ruleName,
+ WalkerAlgo al) {
+ mRulePlan = plan;
+ mTransformer = t;
+ mRuleName = ruleName;
+ mWalkerAlgo = al;
+ }
+
+ public RulePlan getPlan() {
+ return mRulePlan;
}
-}
+
+ public Transformer<O, P> getTransformer() {
+ return mTransformer;
+ }
+
+ public String getRuleName() {
+ return mRuleName;
+ }
+
+ public WalkerAlgo getWalkerAlgo() {
+ return mWalkerAlgo;
+ }
+
+}
\ No newline at end of file
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java Wed Apr 15 07:44:59 2009
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -27,9 +28,12 @@
import java.util.Queue;
import java.util.Set;
+import org.apache.pig.PigException;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.RuleOperator.NodeType;
+import org.apache.pig.impl.util.Pair;
/**
* RuleMatcher contains the logic to determine whether a given rule matches.
@@ -41,25 +45,37 @@
public class RuleMatcher<O extends Operator, P extends OperatorPlan<O>> {
private Rule<O, P> mRule;
- private List<O> mMatch;
+ private List<Pair<O, RuleOperator.NodeType>> mMatch;
+ private List<List<Pair<O, RuleOperator.NodeType>>> mPrelimMatches = new ArrayList<List<Pair<O, RuleOperator.NodeType>>>();
private List<List<O>> mMatches = new ArrayList<List<O>>();
- private P mPlan; // for convience.
- Set<O> seen = new HashSet<O>();
+ private P mPlan; // for convenience.
+ private int mNumCommonNodes = 0;
+ private List<RuleOperator> mCommonNodes = null;
/**
* Test a rule to see if it matches the current plan. Save all matched nodes using BFS
* @param rule Rule to test for a match.
* @return true if the plan matches.
*/
- public boolean match(Rule<O, P> rule) {
+ public boolean match(Rule<O, P> rule) throws OptimizerException {
mRule = rule;
-
- mPlan = mRule.transformer.getPlan();
+ CommonNodeFinder commonNodeFinder = new CommonNodeFinder(mRule.getPlan());
+ try {
+ commonNodeFinder.visit();
+ mNumCommonNodes = commonNodeFinder.getCount();
+ mCommonNodes = commonNodeFinder.getCommonNodes();
+ } catch (VisitorException ve) {
+ int errCode = 2125;
+ String msg = "Internal error. Problem in computing common nodes in the Rule Plan.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, ve);
+ }
+ mPlan = mRule.getTransformer().getPlan();
mMatches.clear();
+ mPrelimMatches.clear();
- if (mRule.algo == Rule.WalkerAlgo.DependencyOrderWalker)
+ if (mRule.getWalkerAlgo() == Rule.WalkerAlgo.DependencyOrderWalker)
DependencyOrderWalker();
- else if (mRule.algo == Rule.WalkerAlgo.DepthFirstWalker)
+ else if (mRule.getWalkerAlgo() == Rule.WalkerAlgo.DepthFirstWalker)
DepthFirstWalker();
return (mMatches.size()!=0);
@@ -78,11 +94,181 @@
for (O op: fifo) {
if (beginMatch(op))
{
- mMatches.add(mMatch);
+ mPrelimMatches.add(mMatch);
}
}
+
+ if(mPrelimMatches.size() > 0) {
+ processPreliminaryMatches();
+ }
}
+ /**
+ * A method to compute the final matches
+ */
+ private void processPreliminaryMatches() {
+ //The preliminary matches contain paths that match
+ //the specification in the RulePlan. However, if there
+ //are twigs and DAGs, then a further computation is required
+ //to extract the nodes in the mPlan that correspond to the
+ //roots of the RulePlan
+
+ //compute the number of common nodes in each preliminary match
+
+ List<List<O>> commonNodesPerMatch = new ArrayList<List<O>>();
+ for(int i = 0; i < mPrelimMatches.size(); ++i) {
+ commonNodesPerMatch.add(getCommonNodesFromMatch(mPrelimMatches.get(i)));
+ }
+
+ if(mNumCommonNodes == 0) {
+ //the rule plan had simple paths
+
+ //verification step
+ //if any of the preliminary matches had common nodes
+ //then its an anomaly
+
+ for(int i = 0; i < commonNodesPerMatch.size(); ++i) {
+ if(commonNodesPerMatch.get(i) != null) {
+ //we have found common nodes when there should be none
+ //just return as mMatches will be empty
+ return;
+ }
+ }
+
+ //pick the first node of each match and put them into individual lists
+ //put the lists inside the list of lists mMatches
+
+ for(int i = 0; i < mPrelimMatches.size(); ++i) {
+ List<O> match = new ArrayList<O>();
+ match.add(mPrelimMatches.get(i).get(0).first);
+ mMatches.add(match);
+ }
+ //all the matches have been computed for the simple path
+ return;
+ } else {
+ for(int i = 0; i < commonNodesPerMatch.size(); ++i) {
+ int commonNodes = (commonNodesPerMatch.get(i) == null? 0 : commonNodesPerMatch.get(i).size());
+ if(commonNodes != mNumCommonNodes) {
+ //if there are is a mismatch in the common nodes then we have a problem
+ //the rule plan states that we have mNumCommonNodes but we have commonNodes
+ //in the match. Just return
+
+ return;
+ }
+ }
+ }
+
+ //keep track of the matches that have been processed
+ List<Boolean> processedMatches = new ArrayList<Boolean>();
+ for(int i = 0; i < mPrelimMatches.size(); ++i) {
+ processedMatches.add(false);
+ }
+
+ //a do while loop to handle single matches
+ int outerIndex = 0;
+ do {
+
+ if(processedMatches.get(outerIndex)) {
+ ++outerIndex;
+ continue;
+ }
+
+ List<Pair<O, RuleOperator.NodeType>> outerMatch = mPrelimMatches.get(outerIndex);
+ List<O> outerCommonNodes = commonNodesPerMatch.get(outerIndex);
+ Set<O> outerSetCommonNodes = new HashSet<O>(outerCommonNodes);
+ Set<O> finalIntersection = new HashSet<O>(outerCommonNodes);
+ Set<O> cumulativeIntersection = new HashSet<O>(outerCommonNodes);
+ List<O> patternMatchingRoots = new ArrayList<O>();
+ Set<O> unionOfRoots = new HashSet<O>();
+ boolean innerMatchProcessed = false;
+ unionOfRoots.add(outerMatch.get(0).first);
+
+
+ for(int innerIndex = outerIndex + 1;
+ (innerIndex < mPrelimMatches.size()) && (!processedMatches.get(innerIndex));
+ ++innerIndex) {
+ List<Pair<O, RuleOperator.NodeType>> innerMatch = mPrelimMatches.get(innerIndex);
+ List<O> innerCommonNodes = commonNodesPerMatch.get(innerIndex);
+ Set<O> innerSetCommonNodes = new HashSet<O>(innerCommonNodes);
+
+ //we need to compute the intersection of the common nodes
+ //the size of the intersection should be equal to the number
+ //of common nodes and the type of each rule node class
+ //if there is no match then it could be that we hit a match
+ //for a different path, i.e., another pattern that matched
+ //with a different set of nodes. In this case, we mark this
+ //match as not processed and move onto the next match
+
+ outerSetCommonNodes.retainAll(innerSetCommonNodes);
+
+ if(outerSetCommonNodes.size() != mNumCommonNodes) {
+ //there was no match
+ //continue to the next match
+ continue;
+ } else {
+ Set<O> tempCumulativeIntersection = new HashSet<O>(cumulativeIntersection);
+ tempCumulativeIntersection.retainAll(outerSetCommonNodes);
+ if(tempCumulativeIntersection.size() != mNumCommonNodes) {
+ //problem - there was a set intersection with a size mismatch
+ //between the cumulative intersection and the intersection of the
+ //inner and outer common nodes
+ //set mMatches to empty and return
+ mMatches = new ArrayList<List<O>>();
+ return;
+ } else {
+ processedMatches.set(innerIndex, true);
+ innerMatchProcessed = true;
+ cumulativeIntersection = tempCumulativeIntersection;
+ unionOfRoots.add(innerMatch.get(0).first);
+ }
+ }
+ }
+
+ cumulativeIntersection.retainAll(finalIntersection);
+ if(cumulativeIntersection.size() != mNumCommonNodes) {
+ //the cumulative and final intersections did not intersect
+ //this could happen when each of the matches are disjoint
+ //check if the innerMatches were processed at all
+ if(innerMatchProcessed) {
+ //problem - the inner matches were processed and we did
+ //not find common intersections
+ mMatches = new ArrayList<List<O>>();
+ return;
+ }
+ }
+ processedMatches.set(outerIndex, true);
+ for(O node: unionOfRoots) {
+ patternMatchingRoots.add(node);
+ }
+ mMatches.add(patternMatchingRoots);
+ ++outerIndex;
+ } while (outerIndex < mPrelimMatches.size() - 1);
+ }
+
+ private List<O> getCommonNodesFromMatch(List<Pair<O, NodeType>> match) {
+ List<O> commonNodes = null;
+ //A lookup table to weed out duplicates
+ Map<O, Boolean> lookup = new HashMap<O, Boolean>();
+ for(int index = 0; index < match.size(); ++index) {
+ if(match.get(index).second.equals(RuleOperator.NodeType.COMMON_NODE)) {
+ if(commonNodes == null) {
+ commonNodes = new ArrayList<O>();
+ }
+ O node = match.get(index).first;
+ //lookup the node under question
+ //if the node is not found in the table
+ //then we are examining it for the first time
+ //add it to the output list and mark it as seen
+ //else continue to the next iteration
+ if(lookup.get(node) == null) {
+ commonNodes.add(node);
+ lookup.put(node, true);
+ }
+ }
+ }
+ return commonNodes;
+ }
+
private void BFSDoAllPredecessors(O node, Set<O> seen, Collection<O> fifo) {
if (!seen.contains(node)) {
// We haven't seen this one before.
@@ -111,7 +297,7 @@
for (O suc : successors) {
if (seen.add(suc)) {
if (beginMatch(suc))
- mMatches.add(mMatch);
+ mPrelimMatches.add(mMatch);
Collection<O> newSuccessors = mPlan.getSuccessors(suc);
DFSVisit(suc, newSuccessors, seen);
}
@@ -144,75 +330,83 @@
/*
* This pattern matching is fairly simple and makes some important
* assumptions.
- * 1) The pattern to be matched must be expressable as a simple list. That
- * is it can match patterns like: 1->2, 2->3. It cannot match patterns
- * like 1->2, 1->3.
- * 2) The pattern must always begin with the first node in the nodes array.
- * After that it can go where it wants. So 1->3, 3->4 is legal. A
- * pattern of 2->1 is not.
+ * 1) The pattern to be matched must be expressible as a graph.
+ * 2) The pattern must always begin with one of the root nodes in the rule plan.
+ * After that it can go where it wants.
*
*/
private boolean beginMatch(O node) {
if (node == null) return false;
- int sz = mRule.nodes.size();
- mMatch = new ArrayList<O>(sz);
- // Add sufficient slots in the matches array
- for (int i = 0; i < sz; i++) mMatch.add(null);
-
- List<O> successors = new ArrayList<O>();
- if (node.getClass().getName().equals(mRule.nodes.get(0)) ||
- mRule.nodes.get(0).equals("any")) {
- mMatch.set(0, node);
- // Follow the edge to see the next node we should be looking for.
- Integer nextOpNum = mRule.edges.get(0);
- if (nextOpNum == null) {
- // This was looking for a single node
- return true;
- }
- successors = mPlan.getSuccessors(node);
- if (successors == null) return false;
- for (O successorOp : successors) {
- if (continueMatch(successorOp, nextOpNum)) return true;
+ mMatch = new ArrayList<Pair<O, RuleOperator.NodeType>>();
+
+ List<O> nodeSuccessors;
+ List<RuleOperator> ruleRoots = mRule.getPlan().getRoots();
+ for(RuleOperator ruleRoot: ruleRoots) {
+ if (node.getClass().getName().equals(ruleRoot.getNodeClass().getName()) ||
+ ruleRoot.getNodeType().equals(RuleOperator.NodeType.ANY_NODE)) {
+ mMatch.add(new Pair<O, RuleOperator.NodeType>(node, ruleRoot.getNodeType()));
+ // Follow the edge to see the next node we should be looking for.
+ List<RuleOperator> ruleRootSuccessors = mRule.getPlan().getSuccessors(ruleRoot);
+ if (ruleRootSuccessors == null) {
+ // This was looking for a single node
+ return true;
+ }
+ nodeSuccessors = mPlan.getSuccessors(node);
+ if ((nodeSuccessors == null) || (nodeSuccessors.size() != ruleRootSuccessors.size())) {
+ //the ruleRoot has successors but the node does not
+ //OR
+ //the number of successors for the ruleRoot does not match
+ //the number of successors for the node
+ return false;
+ }
+ boolean foundMatch = false;
+ for (O nodeSuccessor : nodeSuccessors) {
+ foundMatch |= continueMatch(nodeSuccessor, ruleRootSuccessors);
+ }
+ return foundMatch;
}
}
// If we get here we haven't found it.
return false;
}
- private boolean continueMatch(O current, Integer nodeNumber) {
- if (current.getClass().getName().equals(mRule.nodes.get(nodeNumber)) ||
- mRule.nodes.get(nodeNumber).equals("any")) {
- mMatch.set(nodeNumber, current);
-
- // Follow the edge to see the next node we should be looking for.
- Integer nextOpNum = mRule.edges.get(nodeNumber);
- if (nextOpNum == null) {
- // We've comleted the match
- return true;
- }
- List<O> successors = new ArrayList<O>();
- successors = mPlan.getSuccessors(current);
- if (successors == null) return false;
- for (O successorOp : successors) {
- if (continueMatch(successorOp, nextOpNum)) return true;
- }
- } else if (!mRule.required.get(nodeNumber)) {
- // This node was optional, so it's okay if we don't match, keep
- // going anyway. Keep looking for the current node (don't find our
- // successors, but look for the next edge.
- Integer nextOpNum = mRule.edges.get(nodeNumber);
- if (nextOpNum == null) {
- // We've comleted the match
- return true;
- }
- if (continueMatch(current, nextOpNum)) return true;
- }
-
- // We can arrive here either because we didn't match at this node or
- // further down the line. One way or another we need to remove ourselves
- // from the match vector and return false.
- mMatch.set(nodeNumber, null);
+ private boolean continueMatch(O node, List<RuleOperator> ruleOperators) {
+ for(RuleOperator ruleOperator: ruleOperators) {
+ if (node.getClass().getName().equals(ruleOperator.getNodeClass().getName()) ||
+ ruleOperator.getNodeType().equals(RuleOperator.NodeType.ANY_NODE)) {
+ mMatch.add(new Pair<O, RuleOperator.NodeType>(node,ruleOperator.getNodeType()));
+
+ // Follow the edge to see the next node we should be looking for.
+ List<RuleOperator> ruleOperatorSuccessors = mRule.getPlan().getSuccessors(ruleOperator);
+ if (ruleOperatorSuccessors == null) {
+ // We've completed the match
+ return true;
+ }
+ List<O> nodeSuccessors;
+ nodeSuccessors = mPlan.getSuccessors(node);
+ if ((nodeSuccessors == null) ||
+ (nodeSuccessors.size() != ruleOperatorSuccessors.size())) {
+ //the ruleOperator has successors but the node does not
+ //OR
+ //the number of successors for the ruleOperator does not match
+ //the number of successors for the node
+ return false;
+ }
+ boolean foundMatch = false;
+ for (O nodeSuccessor : nodeSuccessors) {
+ foundMatch |= continueMatch(nodeSuccessor, ruleOperatorSuccessors);
+ }
+ return foundMatch;
+ }
+
+ // We can arrive here either because we didn't match at this node or
+ // further down the line. One way or another we need to remove ourselves
+ // from the match vector and return false.
+ //SMS - I don't think we need this as mMatch will be discarded anyway
+ //mMatch.set(nodeNumber, null);
+ return false;
+ }
return false;
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.plan.optimizer;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Parent for all Logical operators.
+ */
+public class RuleOperator extends Operator<RulePlanVisitor> {
+ private static final long serialVersionUID = 2L;
+
+ private static Log log = LogFactory.getLog(RuleOperator.class);
+
+ public enum NodeType {
+ ANY_NODE,
+ SIMPLE_NODE,
+ MULTI_NODE,
+ COMMON_NODE;
+ }
+
+ private Class mNodeClass;
+ private NodeType mNodeType = NodeType.SIMPLE_NODE;
+
+ /**
+ * @param clazz
+ * Class type of this node, e.g.: LOFilter.class
+ * @param k
+ * Operator key to assign to this node.
+ */
+ public RuleOperator(Class clazz, OperatorKey k) {
+ super(k);
+ mNodeClass = clazz;
+ }
+
+ /**
+ * @param clazz
+ * Class type of this node, e.g.: LOFilter.class
+ * @param nodeType
+ * Node type of this node
+ * @param k
+ * Operator key to assign to this node.
+ */
+ public RuleOperator(Class clazz, NodeType nodeType, OperatorKey k) {
+ super(k);
+ mNodeType = nodeType;
+ mNodeClass = clazz;
+ }
+
+ /**
+ * Set the node type of this rule operator.
+ *
+ * @param type
+ * Node type to set this operator to.
+ */
+ final public void setNodeType(NodeType type) {
+ mNodeType = type;
+ }
+
+ /**
+ * Get the node type of this operator.
+ */
+ public NodeType getNodeType() {
+ return mNodeType;
+ }
+
+ /**
+ * Get the node class of this operator.
+ */
+ public Class getNodeClass() {
+ return mNodeClass;
+ }
+
+ @Override
+ public String toString() {
+ return name();
+ }
+
+ /**
+ * Visit this node with the provided visitor. This should only be called by
+ * the visitor class itself, never directly.
+ *
+ * @param v
+ * Visitor to visit with.
+ * @throws VisitException
+ * if the visitor has a problem.
+ */
+ public void visit(RulePlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ /**
+ * @see org.apache.pig.impl.plan.Operator#clone()
+ * Do not use the clone method directly. Operators are cloned when logical plans
+ * are cloned using {@link LogicalPlanCloner}
+ */
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ RuleOperator ruleOpClone = (RuleOperator)super.clone();
+ return ruleOpClone;
+ }
+
+ @Override
+ public String name() {
+ StringBuffer msg = new StringBuffer();
+ msg.append("(Name: " + mNodeClass.getSimpleName() + " Node Type: " + mNodeType + "[" + this.mKey + "]" + ")");
+ return msg.toString();
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+
+public class RulePlan extends OperatorPlan<RuleOperator> {
+ private static final long serialVersionUID = 2L;
+
+ public RulePlan() {
+ super();
+ }
+
+ public void explain(
+ OutputStream out,
+ PrintStream ps) throws VisitorException, IOException {
+ RulePlanPrinter rpp = new RulePlanPrinter(ps, this);
+
+ rpp.print(out);
+ }
+
+ /**
+ * Do not use the clone method directly. Use {@link LogicalPlanCloner} instead.
+ */
+/* @Override
+ public RulePlan clone() throws CloneNotSupportedException {
+ RulePlan clone = new RulePlan();
+
+ // Get all the nodes in this plan, and clone them. As we make
+ // clones, create a map between clone and original. Then walk the
+ // connections in this plan and create equivalent connections in the
+ // clone.
+ Map<RuleOperator, RuleOperator> matches =
+ //new HashMap<LogicalOperator, LogicalOperator>(mOps.size());
+ LogicalPlanCloneHelper.mOpToCloneMap;
+ for (RuleOperator op : mOps.keySet()) {
+ try {
+ RuleOperator c = (RuleOperator)op.clone();
+ clone.add(c);
+ matches.put(op, c);
+ } catch (CloneNotSupportedException cnse) {
+ cnse.printStackTrace();
+ throw cnse;
+ }
+ }
+
+ // Build the edges
+ for (RuleOperator op : mToEdges.keySet()) {
+ RuleOperator cloneTo = matches.get(op);
+ if (cloneTo == null) {
+ String msg = new String("Unable to find clone for op "
+ + op.name());
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+ Collection<RuleOperator> fromOps = mToEdges.get(op);
+ for (RuleOperator fromOp : fromOps) {
+ RuleOperator cloneFrom = matches.get(fromOp);
+ if (cloneFrom == null) {
+ String msg = new String("Unable to find clone for op "
+ + fromOp.name());
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+ try {
+ clone.connect(cloneFrom, cloneTo);
+ } catch (PlanException pe) {
+ throw new RuntimeException(pe);
+ }
+ }
+ }
+
+ return clone;
+ }
+*/
+}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.io.PrintStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class RulePlanPrinter extends RulePlanVisitor {
+
+ private PrintStream mStream = null;
+ private String TAB1 = " ";
+ private String TABMore = "| ";
+ private String LSep = "|\n|---";
+ private String USep = "| |\n| ";
+ private int levelCntr = -1;
+ private OutputStream printer;
+
+ /**
+ * @param ps PrintStream to output plan information to
+ * @param plan Logical plan to print
+ */
+ public RulePlanPrinter(PrintStream ps, RulePlan plan) {
+ //super(plan, new DependencyOrderWalker(plan));
+ super(plan, new DepthFirstWalker(plan));
+ mStream = ps;
+ }
+
+ @Override
+ public void visit() throws VisitorException {
+ try {
+ mStream.write(depthFirst().getBytes());
+ } catch (IOException e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ public void print(OutputStream printer) throws VisitorException, IOException {
+ this.printer = printer;
+ printer.write(depthFirst().getBytes());
+ }
+
+
+ protected String depthFirst() throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder();
+ List<RuleOperator> leaves = mPlan.getLeaves();
+ Collections.sort(leaves);
+ for (RuleOperator leaf : leaves) {
+ sb.append(depthFirst(leaf));
+ sb.append("\n");
+ }
+ //sb.delete(sb.length() - "\n".length(), sb.length());
+ //sb.delete(sb.length() - "\n".length(), sb.length());
+ return sb.toString();
+ }
+
+ private String planString(RulePlan rulePlan) throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ if(rulePlan!=null)
+ rulePlan.explain(baos, mStream);
+ else
+ return "";
+ sb.append(USep);
+ sb.append(shiftStringByTabs(baos.toString(), 2));
+ return sb.toString();
+ }
+
+ private String planString(
+ List<RulePlan> rulePlans) throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder();
+ if(rulePlans!=null)
+ for (RulePlan rulePlan : rulePlans) {
+ sb.append(planString(rulePlan));
+ }
+ return sb.toString();
+ }
+
+ private String depthFirst(RuleOperator node) throws VisitorException, IOException {
+ StringBuilder sb = new StringBuilder(node.name());
+ sb.append("\n");
+
+ List<RuleOperator> originalPredecessors = mPlan.getPredecessors(node);
+ if (originalPredecessors == null)
+ return sb.toString();
+
+ List<RuleOperator> predecessors = new ArrayList<RuleOperator>(originalPredecessors);
+
+ Collections.sort(predecessors);
+ int i = 0;
+ for (RuleOperator pred : predecessors) {
+ i++;
+ String DFStr = depthFirst(pred);
+ if (DFStr != null) {
+ sb.append(LSep);
+ if (i < predecessors.size())
+ sb.append(shiftStringByTabs(DFStr, 2));
+ else
+ sb.append(shiftStringByTabs(DFStr, 1));
+ }
+ }
+ return sb.toString();
+ }
+
+ private String shiftStringByTabs(String DFStr, int TabType) {
+ StringBuilder sb = new StringBuilder();
+ String[] spl = DFStr.split("\n");
+
+ String tab = (TabType == 1) ? TAB1 : TABMore;
+
+ sb.append(spl[0] + "\n");
+ for (int i = 1; i < spl.length; i++) {
+ sb.append(tab);
+ sb.append(spl[i]);
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ private void dispTabs() {
+ for (int i = 0; i < levelCntr; i++)
+ System.out.print(TAB1);
+ }
+}
+
+
Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+abstract public class RulePlanVisitor extends
+ PlanVisitor<RuleOperator, RulePlan> {
+
+ public RulePlanVisitor(RulePlan plan,
+ PlanWalker<RuleOperator, RulePlan> walker) {
+ super(plan, walker);
+ }
+
+ /**
+ * @param ruleOp
+ * the rule operator that has to be visited
+ * @throws VisitorException
+ */
+ protected void visit(RuleOperator ruleOp)
+ throws VisitorException {
+ //
+ // Do Nothing
+ //
+ }
+
+}
\ No newline at end of file
Modified: hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java Wed Apr 15 07:44:59 2009
@@ -23,25 +23,34 @@
import java.util.List;
import java.util.Map;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.optimizer.ImplicitSplitInserter;
import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer;
import org.apache.pig.impl.logicalLayer.optimizer.StreamOptimizer;
import org.apache.pig.impl.logicalLayer.optimizer.TypeCastInserter;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.optimizer.PlanOptimizer;
import org.apache.pig.impl.plan.optimizer.Rule;
+import org.apache.pig.impl.plan.optimizer.RuleOperator;
+import org.apache.pig.impl.plan.optimizer.RulePlan;
+import org.apache.pig.impl.util.MultiMap;
//This optimiser puts in the bare minimum modifications needed to make sure the plan is functional
public class FunctionalLogicalOptimizer extends
PlanOptimizer<LogicalOperator, LogicalPlan> {
- public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
- public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
-
+ private static final String SCOPE = "RULE";
+ private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
+
public FunctionalLogicalOptimizer(LogicalPlan plan) {
super(plan);
+ RulePlan rulePlan;
+
// List of rules for the logical optimizer
// This one has to be first, as the type cast inserter expects the
@@ -50,33 +59,32 @@
// it explicit. Since the RuleMatcher doesn't handle trees properly,
// we cheat and say that we match any node. Then we'll do the actual
// test in the transformers check method.
- List<String> nodes = new ArrayList<String>(1);
- Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
- List<Boolean> required = new ArrayList<Boolean>(1);
- nodes.add("any");
- required.add(true);
- mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
- required, new ImplicitSplitInserter(plan)));
+
+ rulePlan = new RulePlan();
+ RuleOperator anyLogicalOperator = new RuleOperator(LogicalOperator.class, RuleOperator.NodeType.ANY_NODE,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(anyLogicalOperator);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+ new ImplicitSplitInserter(plan), "ImplicitSplitInserter"));
+
// Add type casting to plans where the schema has been declared (by
// user, data, or data catalog).
- nodes = new ArrayList<String>(1);
- nodes.add(LOLOAD_CLASSNAME);
- edges = new HashMap<Integer, Integer>();
- required = new ArrayList<Boolean>(1);
- required.add(true);
- mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
- required, new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+ rulePlan = new RulePlan();
+ RuleOperator loLoad = new RuleOperator(LOLoad.class,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(loLoad);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+ new TypeCastInserter(plan, LOLoad.class.getName()), "LoadTypeCastInserter"));
// Add type casting to plans where the schema has been declared by
// user in a statement with stream operator.
- nodes = new ArrayList<String>(1);
- nodes.add(LOSTREAM_CLASSNAME);
- edges = new HashMap<Integer, Integer>();
- required = new ArrayList<Boolean>(1);
- required.add(true);
- mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan,
- LOSTREAM_CLASSNAME)));
+ rulePlan = new RulePlan();
+ RuleOperator loStream= new RuleOperator(LOStream.class,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(loStream);
+ mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
+ LOStream.class.getName()), "StreamTypeCastInserter"));
}