You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/04/15 09:45:00 UTC

svn commit: r765073 [1/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apache/pig/impl/plan/ src/org/apache/pig/impl/plan/optimizer/ src/org/apache/pig/pen/util/ test/org/apache/pig/test/ test/o...

Author: sms
Date: Wed Apr 15 07:44:59 2009
New Revision: 765073

URL: http://svn.apache.org/viewvc?rev=765073&view=rev
Log:
PIG-693: Proposed improvements to pig's optimizer

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/Main.java
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
    hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan4.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan5.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan6.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan7.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan8.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/DotFiles/optlimitplan9.dot
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
    hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Apr 15 07:44:59 2009
@@ -24,6 +24,8 @@
 
 IMPROVEMENTS
 
+PIG-693: Proposed improvements to pig's optimizer (sms)
+
 PIG-700: To automate the pig patch test process (gkesavan via sms)
 
 BUG FIXES

Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Wed Apr 15 07:44:59 2009
@@ -36,6 +36,7 @@
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
@@ -82,6 +83,7 @@
         boolean dryrun = false;
         ArrayList<String> params = new ArrayList<String>();
         ArrayList<String> paramFiles = new ArrayList<String>();
+        HashSet<String> optimizerRules = new HashSet<String>();
 
         CmdLineParser opts = new CmdLineParser(args);
         opts.registerOpt('4', "log4jconf", CmdLineParser.ValueExpected.REQUIRED);
@@ -91,16 +93,17 @@
         opts.registerOpt('e', "execute", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('f', "file", CmdLineParser.ValueExpected.REQUIRED);
         opts.registerOpt('h', "help", CmdLineParser.ValueExpected.NOT_ACCEPTED);
-        opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
-        opts.registerOpt('j', "jar", CmdLineParser.ValueExpected.REQUIRED);
-        opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
-        opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
         opts.registerOpt('i', "version", CmdLineParser.ValueExpected.OPTIONAL);
-        opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
+        opts.registerOpt('j', "jar", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('l', "logfile", CmdLineParser.ValueExpected.REQUIRED);
         opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
+        opts.registerOpt('o', "hod", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+        opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
         opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
-        opts.registerOpt('l', "logfile", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('t', "optimizer_off", CmdLineParser.ValueExpected.REQUIRED);
+        opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('w', "warning", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+        opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
 
         ExecMode mode = ExecMode.UNKNOWN;
         String file = null;
@@ -162,6 +165,10 @@
                 usage();
                 return;
 
+            case 'i':
+            	System.out.println(getVersionString());
+            	return;
+
             case 'j': 
                 String jarsString = opts.getValStr();
                 if(jarsString != null){
@@ -205,6 +212,10 @@
                 // will be extended in the future
                 dryrun = true;
                 break;
+
+            case 't':
+            	optimizerRules.add(opts.getValStr());
+                break;
                             
             case 'v':
                 properties.setProperty(VERBOSE, ""+true);
@@ -222,9 +233,6 @@
                         throw new RuntimeException("ERROR: Unrecognized exectype.", e);
                     }
                 break;
-            case 'i':
-            	System.out.println(getVersionString());
-            	return;
             default: {
                 Character cc = new Character(opt);
                 throw new AssertionError("Unhandled option " + cc.toString());
@@ -241,6 +249,10 @@
         }
         
         pigContext.getProperties().setProperty("pig.logfile", logFileName);
+        
+        if(optimizerRules.size() > 0) {
+        	pigContext.getProperties().setProperty("pig.optimizer.rules", ObjectSerializer.serialize(optimizerRules));
+        }
 
         LogicalPlanBuilder.classloader = pigContext.createCl(null);
 
@@ -483,14 +495,21 @@
     System.out.println("    -b, -brief brief logging (no timestamps)");
     System.out.println("    -c, -cluster clustername, kryptonite is default");
     System.out.println("    -d, -debug debug level, INFO is default");
+    System.out.println("    -e, -execute commands to execute (within quotes)");
+    System.out.println("    -f, -file path to the script to execute");
     System.out.println("    -h, -help display this message");
+    System.out.println("    -i, -version display version information");
     System.out.println("    -j, -jar jarfile load jarfile"); 
+    System.out.println("    -l, -logfile path to client side log file; current working directory is default");
+    System.out.println("    -m, -param_file path to the parameter file");
     System.out.println("    -o, -hod read hod server from system property ssh.gateway");
+    System.out.println("    -p, -param key value pair of the form param=val");
+    System.out.println("    -r, -dryrun CmdLineParser.ValueExpected.NOT_ACCEPTED");
+    System.out.println("    -t, -optimizer_off optimizer rule name, turn optimizer off for this rule; use all to turn all rules off, optimizer is turned on by default");
     System.out.println("    -v, -verbose print all error messages to screen");
-    System.out.println("    -x, -exectype local|mapreduce, mapreduce is default");
-    System.out.println("    -i, -version display version information");
-    System.out.println("    -l, -logfile path to client side log file; current working directory is default");
     System.out.println("    -w, -warning turn warning on; also turns warning aggregation off");
+    System.out.println("    -x, -exectype local|mapreduce, mapreduce is default");
+
 }
 
 private static String validateLogFile(String logFileName, String scriptName) {

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Apr 15 07:44:59 2009
@@ -28,6 +28,7 @@
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -64,6 +65,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.impl.logicalLayer.LODefine;
 import org.apache.pig.impl.logicalLayer.LOStore;
@@ -728,8 +730,6 @@
         PlanSetter ps = new PlanSetter(lpClone);
         ps.visit();
         
-        //(new SplitIntroducer(lp)).introduceImplSplits();
-        
         // run through validator
         CompilationMessageCollector collector = new CompilationMessageCollector() ;
         FrontendException caught = null;
@@ -758,8 +758,18 @@
 
         // optimize
         if (optimize) {
-            //LogicalOptimizer optimizer = new LogicalOptimizer(lpClone);
-            LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, pigContext.getExecType());
+            HashSet<String> optimizerRules = null;
+            try {
+                optimizerRules = (HashSet<String>) ObjectSerializer
+                        .deserialize(pigContext.getProperties().getProperty(
+                                "pig.optimizer.rules"));
+            } catch (IOException ioe) {
+                int errCode = 2110;
+                String msg = "Unable to deserialize optimizer rules.";
+                throw new FrontendException(msg, errCode, PigException.BUG, ioe);
+            }
+
+            LogicalOptimizer optimizer = new LogicalOptimizer(lpClone, pigContext.getExecType(), optimizerRules);
             optimizer.optimize();
         }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Wed Apr 15 07:44:59 2009
@@ -17,14 +17,18 @@
  */
 package org.apache.pig.impl.logicalLayer.optimizer;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
 
 import org.apache.pig.ExecType;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOPrinter;
+import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.optimizer.*;
 
 /**
@@ -33,8 +37,10 @@
 public class LogicalOptimizer extends
         PlanOptimizer<LogicalOperator, LogicalPlan> {
 
-    public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
-    public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
+    private static final String SCOPE = "RULE";
+    private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
+    
+    private Set<String> mRulesOff = null;
 
     public LogicalOptimizer(LogicalPlan plan) {
         this(plan, ExecType.MAPREDUCE);
@@ -44,8 +50,16 @@
         super(plan);
         runOptimizations(plan, mode);
     }
+    
+    public LogicalOptimizer(LogicalPlan plan, ExecType mode, Set<String> turnOffRules) {
+        super(plan);
+        mRulesOff = turnOffRules;
+        runOptimizations(plan, mode);
+    }
 
     private void runOptimizations(LogicalPlan plan, ExecType mode) {
+        RulePlan rulePlan;
+
         // List of rules for the logical optimizer
 
         // This one has to be first, as the type cast inserter expects the
@@ -54,49 +68,74 @@
         // it explicit. Since the RuleMatcher doesn't handle trees properly,
         // we cheat and say that we match any node. Then we'll do the actual
         // test in the transformers check method.
-        List<String> nodes = new ArrayList<String>(1);
-        Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
-        List<Boolean> required = new ArrayList<Boolean>(1);
-        nodes.add("any");
-        required.add(true);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
-                required, new ImplicitSplitInserter(plan)));
+        
+        boolean turnAllRulesOff = false;
+        if (mRulesOff != null) {
+            for (String rule : mRulesOff) {
+                if ("all".equalsIgnoreCase(rule)) {
+                    turnAllRulesOff = true;
+                    break;
+                }
+            }
+        }
+        
+        rulePlan = new RulePlan();
+        RuleOperator anyLogicalOperator = new RuleOperator(LogicalOperator.class, RuleOperator.NodeType.ANY_NODE, 
+                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+        rulePlan.add(anyLogicalOperator);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+                new ImplicitSplitInserter(plan), "ImplicitSplitInserter"));
 
         // Add type casting to plans where the schema has been declared (by
         // user, data, or data catalog).
-        nodes = new ArrayList<String>(1);
-        nodes.add(LOLOAD_CLASSNAME);
-        edges = new HashMap<Integer, Integer>();
-        required = new ArrayList<Boolean>(1);
-        required.add(true);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
-                required, new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+        rulePlan = new RulePlan();
+        RuleOperator loLoad = new RuleOperator(LOLoad.class, 
+                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+        rulePlan.add(loLoad);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+                new TypeCastInserter(plan, LOLoad.class.getName()), "LoadTypeCastInserter"));
 
         // Add type casting to plans where the schema has been declared by
         // user in a statement with stream operator.
-        nodes = new ArrayList<String>(1);
-        nodes.add(LOSTREAM_CLASSNAME);
-        edges = new HashMap<Integer, Integer>();
-        required = new ArrayList<Boolean>(1);
-        required.add(true);
-        mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan,
-                LOSTREAM_CLASSNAME)));
+        rulePlan = new RulePlan();
+        RuleOperator loStream= new RuleOperator(LOStream.class, 
+                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+        rulePlan.add(loStream);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
+                LOStream.class.getName()), "StreamTypeCastInserter"));
 
         // Optimize when LOAD precedes STREAM and the loader class
         // is the same as the serializer for the STREAM.
         // Similarly optimize when STREAM is followed by store and the
         // deserializer class is same as the Storage class.
-        mRules.add(new Rule(nodes, edges, required, new StreamOptimizer(plan,
-                LOSTREAM_CLASSNAME)));
+        if(!turnAllRulesOff) {
+            Rule rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan, new StreamOptimizer(plan,
+                    LOStream.class.getName()), "StreamOptimizer");
+            checkAndAddRule(rule);
+        }
 
         // Push up limit where ever possible.
-        nodes = new ArrayList<String>(1);
-        edges = new HashMap<Integer, Integer>();
-        required = new ArrayList<Boolean>(1);
-        nodes.add("org.apache.pig.impl.logicalLayer.LOLimit");
-        required.add(true);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
-                required, new OpLimitOptimizer(plan, mode)));
+        if(!turnAllRulesOff) {
+            rulePlan = new RulePlan();
+            RuleOperator loLimit = new RuleOperator(LOLimit.class, 
+                    new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+            rulePlan.add(loLimit);
+            Rule rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+                    new OpLimitOptimizer(plan, mode), "LimitOptimizer");
+            checkAndAddRule(rule);
+        }
+        
+    }
+
+    private void checkAndAddRule(Rule rule) {
+        if(mRulesOff != null) {
+            for(String ruleOff: mRulesOff) {
+                String ruleName = rule.getRuleName();
+                if(ruleName == null) continue;
+                if(ruleName.equalsIgnoreCase(ruleOff)) return;
+            }
+        }
+        mRules.add(rule);
     }
 
-}
+}
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Wed Apr 15 07:44:59 2009
@@ -60,7 +60,7 @@
     }
 
     @Override
-    public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
+    public boolean check(List<LogicalOperator> nodes) throws OptimizerException {       
         try {
             LogicalOperator op = getOperator(nodes);
             Schema s = op.getSchema();
@@ -70,7 +70,7 @@
             List<Schema.FieldSchema> fss = s.getFields();
             List<Byte> types = new ArrayList<Byte>(s.size());
             Schema determinedSchema = null;
-            if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+            if(LOLoad.class.getName().equals(operatorClassName)) {
                 determinedSchema = ((LOLoad)op).getDeterminedSchema();
             }
             for (int i = 0; i < fss.size(); i++) {
@@ -105,7 +105,7 @@
         }
         
         LogicalOperator lo = nodes.get(0);
-        if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+        if(LOLoad.class.getName().equals(operatorClassName)) {
             if (lo == null || !(lo instanceof LOLoad)) {
                 int errCode = 2005;
                 String msg = "Expected " + LOLoad.class.getSimpleName() + ", got " + lo.getClass().getSimpleName();
@@ -113,7 +113,7 @@
             }
     
             return lo;
-        } else if(operatorClassName == LogicalOptimizer.LOSTREAM_CLASSNAME){
+        } else if(LOStream.class.getName().equals(operatorClassName)){
             if (lo == null || !(lo instanceof LOStream)) {
                 int errCode = 2005;
                 String msg = "Expected " + LOStream.class.getSimpleName() + ", got " + lo.getClass().getSimpleName();
@@ -147,7 +147,7 @@
             // Note that in this case, the data coming out of the loader is not
             // a BYTEARRAY but is whatever determineSchema() says it is.
             Schema determinedSchema = null;
-            if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+            if(LOLoad.class.getName().equals(operatorClassName)) {
                 determinedSchema = ((LOLoad)lo).getDeterminedSchema();
             }
             for (int i = 0; i < s.size(); i++) {

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Wed Apr 15 07:44:59 2009
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.impl.plan;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -651,6 +653,12 @@
         }
     }
     
+    public void explain(
+            OutputStream out,
+            PrintStream ps) throws VisitorException, IOException {
+        PlanPrinter pp = new PlanPrinter(ps, this);
+        pp.print(out);
+    }
 
 
 }

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/PlanPrinter.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.io.PrintStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.plan.optimizer.RuleOperator;
+import org.apache.pig.impl.plan.optimizer.RulePlan;
+import org.apache.pig.impl.plan.optimizer.RulePlanVisitor;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class PlanPrinter<O extends Operator, P extends OperatorPlan<O>> extends PlanVisitor<O, P> {
+
+    private PrintStream mStream = null;
+    private String TAB1 = "    ";
+    private String TABMore = "|   ";
+    private String LSep = "|\n|---";
+    private String USep = "|   |\n|   ";
+    private int levelCntr = -1;
+    private OutputStream printer;
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan Logical plan to print
+     */
+    public PlanPrinter(PrintStream ps, P plan) {
+        //super(plan, new DependencyOrderWalker(plan));
+        super(plan, new DepthFirstWalker(plan));
+        mStream = ps;
+    }
+
+    @Override
+    public void visit() throws VisitorException {
+        try {
+            mStream.write(depthFirst().getBytes());
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
+    }
+
+    public void print(OutputStream printer) throws VisitorException, IOException {
+        this.printer = printer;
+        printer.write(depthFirst().getBytes());
+    }
+
+
+    protected String depthFirst() throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder();
+        List<O> leaves = mPlan.getLeaves();
+        Collections.sort(leaves);
+        for (O leaf : leaves) {
+            sb.append(depthFirst(leaf));
+            sb.append("\n");
+        }
+        //sb.delete(sb.length() - "\n".length(), sb.length());
+        //sb.delete(sb.length() - "\n".length(), sb.length());
+        return sb.toString();
+    }
+    
+    private String planString(P plan) throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        if(plan!=null)
+            plan.explain(baos, mStream);
+        else
+            return "";
+        sb.append(USep);
+        sb.append(shiftStringByTabs(baos.toString(), 2));
+        return sb.toString();
+    }
+    
+    private String planString(
+            List<P> plans) throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder();
+        if(plans!=null)
+            for (P plan : plans) {
+                sb.append(planString(plan));
+            }
+        return sb.toString();
+    }
+
+    private String depthFirst(O node) throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder(node.name());
+        sb.append("\n");
+        
+        List<O> originalPredecessors =  mPlan.getPredecessors(node);
+        if (originalPredecessors == null)
+            return sb.toString();
+        
+        List<O> predecessors =  new ArrayList<O>(originalPredecessors);
+        
+        Collections.sort(predecessors);
+        int i = 0;
+        for (O pred : predecessors) {
+            i++;
+            String DFStr = depthFirst(pred);
+            if (DFStr != null) {
+                sb.append(LSep);
+                if (i < predecessors.size())
+                    sb.append(shiftStringByTabs(DFStr, 2));
+                else
+                    sb.append(shiftStringByTabs(DFStr, 1));
+            }
+        }
+        return sb.toString();
+    }
+
+    private String shiftStringByTabs(String DFStr, int TabType) {
+        StringBuilder sb = new StringBuilder();
+        String[] spl = DFStr.split("\n");
+
+        String tab = (TabType == 1) ? TAB1 : TABMore;
+
+        sb.append(spl[0] + "\n");
+        for (int i = 1; i < spl.length; i++) {
+            sb.append(tab);
+            sb.append(spl[i]);
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
+
+    private void dispTabs() {
+        for (int i = 0; i < levelCntr; i++)
+            System.out.print(TAB1);
+    }
+}
+
+        

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/CommonNodeFinder.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class CommonNodeFinder extends
+        RulePlanVisitor {
+
+    private List<RuleOperator> mCommonNodes = null;
+    
+    public CommonNodeFinder(RulePlan plan) {
+        super(plan, new DependencyOrderWalker<RuleOperator, RulePlan>(plan));
+    }
+    
+    public int getCount() {
+        return ((mCommonNodes == null) ? 0 : mCommonNodes.size());
+    }
+    
+    public List<RuleOperator> getCommonNodes() {
+        return mCommonNodes;
+    }
+    
+    private void reset() {
+        mCommonNodes = new ArrayList<RuleOperator>();
+    }
+
+    @Override
+    public void visit() throws VisitorException {
+        reset();
+        super.visit();
+    }
+    
+    /**
+     * @param ruleOp
+     *            the rule operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(RuleOperator ruleOp)
+            throws VisitorException {
+        /**
+         * A common node is a node that appears in the common path of two nodes in the rule plan
+         * Any node that has more than one predecessor is a common node
+         * Any node that has a predecessor which is a common node is a common node
+         * Any node that has more than one successor is a common node
+         */
+        if(ruleOp.getNodeType().equals(RuleOperator.NodeType.ANY_NODE)) {
+           return; 
+        }
+        List<RuleOperator> predecessors = mPlan.getPredecessors(ruleOp);
+        List<RuleOperator> successors = mPlan.getSuccessors(ruleOp);
+        
+        if(predecessors != null) {
+            if(predecessors.size() > 1) {
+                ruleOp.setNodeType(RuleOperator.NodeType.COMMON_NODE);
+                mCommonNodes.add(ruleOp);
+                return;
+            } else {
+                //has to be one predecessor
+                //check if the predecessor is a common node then this node is
+                //also a common node
+                RuleOperator ruleOperatorPredecessor = predecessors.get(0);
+                if(ruleOperatorPredecessor.getNodeType().equals(RuleOperator.NodeType.COMMON_NODE)) {
+                    ruleOp.setNodeType(RuleOperator.NodeType.COMMON_NODE);
+                    mCommonNodes.add(ruleOp);
+                    return;
+                }
+            }
+        }
+        
+        if(successors != null) {
+            if (successors.size() > 1) {
+                ruleOp.setNodeType(RuleOperator.NodeType.COMMON_NODE);
+                mCommonNodes.add(ruleOp);
+                return;
+            }
+        }
+    }
+
+}
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/PlanOptimizer.java Wed Apr 15 07:44:59 2009
@@ -22,6 +22,7 @@
 
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
 
 /******************************************************************************
  * A class to optimize plans.  This class need not be subclassed for a
@@ -34,15 +35,29 @@
     
     protected List<Rule> mRules;
     protected P mPlan;
+    protected int mMaxIterations;
 
     /**
      * @param plan Plan to optimize
      */
     protected PlanOptimizer(P plan) {
+        this(plan, 500);
+    }
+
+    /**
+     * @param plan Plan to optimize
+     * @param iterations maximum number of optimization iterations
+     */
+    protected PlanOptimizer(P plan, int iterations) {
         mRules = new ArrayList<Rule>();
         mPlan = plan;
+        if(iterations < 0) {
+            mMaxIterations = 1000;
+        } else {
+            mMaxIterations = iterations;
+        }
     }
-
+    
     /**
      * Run the optimizer.  This method attempts to match each of the Rules
      * against the plan.  If a Rule matches, it then calls the check
@@ -52,20 +67,26 @@
      * @throws OptimizerException
      */
     public final void optimize() throws OptimizerException {
-        RuleMatcher matcher = new RuleMatcher();
-        for (Rule rule : mRules) {
-            if (matcher.match(rule)) {
-                // It matches the pattern.  Now check if the transformer
-                // approves as well.
-                List<List<O>> matches = matcher.getAllMatches();
-                for (List<O> match:matches)
-                {
-	                if (rule.transformer.check(match)) {
-	                    // The transformer approves.
-	                    rule.transformer.transform(match);
-	                }
+        boolean sawMatch = false;
+        int numIterations = 0;
+        do {
+            sawMatch = false;
+            for (Rule rule : mRules) {
+                RuleMatcher matcher = new RuleMatcher();
+                if (matcher.match(rule)) {
+                    // It matches the pattern.  Now check if the transformer
+                    // approves as well.
+                    List<List<O>> matches = matcher.getAllMatches();
+                    for (List<O> match:matches)
+                    {
+    	                if (rule.getTransformer().check(match)) {
+    	                    // The transformer approves.
+    	                    sawMatch = true;
+    	                    rule.getTransformer().transform(match);
+    	                }
+                    }
                 }
             }
-        }
+        } while(sawMatch && ++numIterations < mMaxIterations);
     }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/Rule.java Wed Apr 15 07:44:59 2009
@@ -17,56 +17,70 @@
  */
 package org.apache.pig.impl.plan.optimizer;
 
-import java.util.List;
-import java.util.Map;
-
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
 
 /**
- * A rule for optimizing a plan.  The rule contains a pattern that must be
- * matched in the plan before the optimizer can consider applying the rule
- * and a transformer to do further checks and possibly transform the plan.
- * The rule pattern is expressed as a list of node names, a map of edges in
- * the plan, and a list of boolean values indicating whether the node is
- * required.  For example, a rule pattern could be expressed as:
- * [Filter, Filter] {[0, 1]} [true, true], which would indicate this rule
- * matches two nodes of class name Filter, with an edge between the two,
- * and both are required.
+ * A rule for optimizing a plan. The rule contains a pattern that must be
+ * matched in the plan before the optimizer can consider applying the rule and a
+ * transformer to do further checks and possibly transform the plan. The rule
+ * pattern is expressed as a list of node names, a map of edges in the plan, and
+ * a list of boolean values indicating whether the node is required. For
+ * example, a rule pattern could be expressed as: [Filter, Filter] {[0, 1]}
+ * [true, true], which would indicate this rule matches two nodes of class name
+ * Filter, with an edge between the two, and both are required.
  */
 public class Rule<O extends Operator, P extends OperatorPlan<O>> {
 
-	public enum WalkerAlgo {DepthFirstWalker, DependencyOrderWalker};
-    public List<String> nodes;
-    public Map<Integer, Integer> edges;
-    public List<Boolean> required;
-    public Transformer<O, P> transformer;
-    public WalkerAlgo algo;
+    public enum WalkerAlgo {
+        DepthFirstWalker, DependencyOrderWalker
+    };
+
+    private RulePlan mRulePlan;
+    private Transformer<O, P> mTransformer;
+    private WalkerAlgo mWalkerAlgo;
+    private String mRuleName = null;
+
+    /**
+     * @param plan
+     *            pattern to look for
+     * @param t
+     *            Transformer to apply if the rule matches.
+     */
+    public Rule(RulePlan plan, Transformer<O, P> t, String ruleName) {
+        this(plan, t, ruleName, WalkerAlgo.DependencyOrderWalker);
+    }
 
     /**
-     * @param n List of node types to look for.
-     * @param e Map of integers to integers.  Each integer
-     * represents the offset into nodes list.
-     * @param r List of boolean indicating whether given nodes are
-     * required for the pattern to match.
-     * @param t Transformer to apply if the rule matches.
-     * @param al Walker algorithm to find rule match within the plan.
+     * @param plan
+     *            pattern to look for
+     * @param t
+     *            Transformer to apply if the rule matches.
+     * @param al
+     *            Walker algorithm to find rule match within the plan.
      */
-    public Rule(List<String> n,
-                Map<Integer, Integer> e, 
-                List<Boolean> r,
-                Transformer<O, P> t, WalkerAlgo al) {
-        nodes = n;
-        edges = e;
-        required = r;
-        transformer = t;
-        algo = al;
-    }
-    
-    public Rule(List<String> n,
-                Map<Integer, Integer> e, 
-                List<Boolean> r,
-                Transformer<O, P> t) {
-    	this(n, e, r, t, WalkerAlgo.DependencyOrderWalker);
+    public Rule(RulePlan plan, Transformer<O, P> t, String ruleName,
+            WalkerAlgo al) {
+        mRulePlan = plan;
+        mTransformer = t;
+        mRuleName = ruleName;
+        mWalkerAlgo = al;
+    }
+
+    public RulePlan getPlan() {
+        return mRulePlan;
     }
-}
+
+    public Transformer<O, P> getTransformer() {
+        return mTransformer;
+    }
+
+    public String getRuleName() {
+        return mRuleName;
+    }
+
+    public WalkerAlgo getWalkerAlgo() {
+        return mWalkerAlgo;
+    }
+
+}
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java Wed Apr 15 07:44:59 2009
@@ -19,6 +19,7 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -27,9 +28,12 @@
 import java.util.Queue;
 import java.util.Set;
 
+import org.apache.pig.PigException;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.RuleOperator.NodeType;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * RuleMatcher contains the logic to determine whether a given rule matches.
@@ -41,25 +45,37 @@
 public class RuleMatcher<O extends Operator, P extends OperatorPlan<O>> {
 
     private Rule<O, P> mRule;
-    private List<O> mMatch;
+    private List<Pair<O, RuleOperator.NodeType>> mMatch;
+    private List<List<Pair<O, RuleOperator.NodeType>>> mPrelimMatches = new ArrayList<List<Pair<O, RuleOperator.NodeType>>>();
     private List<List<O>> mMatches = new ArrayList<List<O>>();
-    private P mPlan; // for convience.
-    Set<O> seen = new HashSet<O>();
+    private P mPlan; // for convenience.
+    private int mNumCommonNodes = 0;
+    private List<RuleOperator> mCommonNodes = null;
 
     /**
      * Test a rule to see if it matches the current plan. Save all matched nodes using BFS
      * @param rule Rule to test for a match.
      * @return true if the plan matches.
      */
-    public boolean match(Rule<O, P> rule) {
+    public boolean match(Rule<O, P> rule) throws OptimizerException {
         mRule = rule;
-        
-        mPlan = mRule.transformer.getPlan();
+        CommonNodeFinder commonNodeFinder = new CommonNodeFinder(mRule.getPlan());
+        try {
+            commonNodeFinder.visit();
+            mNumCommonNodes = commonNodeFinder.getCount();
+            mCommonNodes = commonNodeFinder.getCommonNodes();
+        } catch (VisitorException ve) {
+            int errCode = 2125;
+            String msg = "Internal error. Problem in computing common nodes in the Rule Plan.";
+            throw new OptimizerException(msg, errCode, PigException.BUG, ve);
+        }
+        mPlan = mRule.getTransformer().getPlan();
         mMatches.clear();
+        mPrelimMatches.clear();
         
-        if (mRule.algo == Rule.WalkerAlgo.DependencyOrderWalker)
+        if (mRule.getWalkerAlgo() == Rule.WalkerAlgo.DependencyOrderWalker)
         	DependencyOrderWalker();
-        else if (mRule.algo == Rule.WalkerAlgo.DepthFirstWalker)
+        else if (mRule.getWalkerAlgo() == Rule.WalkerAlgo.DepthFirstWalker)
         	DepthFirstWalker();        
         
         return (mMatches.size()!=0);
@@ -78,11 +94,181 @@
         for (O op: fifo) {
         	if (beginMatch(op))
 			{
-        		mMatches.add(mMatch);
+        		mPrelimMatches.add(mMatch);
 			}
         }
+        
+        if(mPrelimMatches.size() > 0) {
+            processPreliminaryMatches();
+        }
     }
     
+    /**
+     * A method to compute the final matches
+     */
+    private void processPreliminaryMatches() {
+        //The preliminary matches contain paths that match
+        //the specification in the RulePlan. However, if there
+        //are twigs and DAGs, then a further computation is required
+        //to extract the nodes in the mPlan that correspond to the
+        //roots of the RulePlan
+        
+        //compute the number of common nodes in each preliminary match
+        
+        List<List<O>> commonNodesPerMatch = new ArrayList<List<O>>();
+        for(int i = 0; i < mPrelimMatches.size(); ++i) {
+            commonNodesPerMatch.add(getCommonNodesFromMatch(mPrelimMatches.get(i)));
+        }
+        
+        if(mNumCommonNodes == 0) {
+            //the rule plan had simple paths
+            
+            //verification step
+            //if any of the preliminary matches had common nodes 
+            //then its an anomaly
+            
+            for(int i = 0; i < commonNodesPerMatch.size(); ++i) {
+                if(commonNodesPerMatch.get(i) != null) {
+                    //we have found common nodes when there should be none
+                    //just return as mMatches will be empty
+                    return;
+                }
+            }
+            
+            //pick the first node of each match and put them into individual lists
+            //put the lists inside the list of lists mMatches
+            
+            for(int i = 0; i < mPrelimMatches.size(); ++i) {
+                List<O> match = new ArrayList<O>();
+                match.add(mPrelimMatches.get(i).get(0).first);
+                mMatches.add(match);
+            }
+            //all the matches have been computed for the simple path
+            return;
+        } else {
+            for(int i = 0; i < commonNodesPerMatch.size(); ++i) {
+                int commonNodes = (commonNodesPerMatch.get(i) == null? 0 : commonNodesPerMatch.get(i).size());
+                if(commonNodes != mNumCommonNodes) {
+                    //if there are is a mismatch in the common nodes then we have a problem
+                    //the rule plan states that we have mNumCommonNodes but we have commonNodes 
+                    //in the match. Just return
+                    
+                    return;
+                }
+            }
+        }
+        
+        //keep track of the matches that have been processed
+        List<Boolean> processedMatches = new ArrayList<Boolean>();
+        for(int i = 0; i < mPrelimMatches.size(); ++i) {
+            processedMatches.add(false);
+        }
+        
+        //a do while loop to handle single matches
+        int outerIndex = 0;
+        do {
+            
+            if(processedMatches.get(outerIndex)) {
+               ++outerIndex;
+               continue;
+            }
+            
+            List<Pair<O, RuleOperator.NodeType>> outerMatch = mPrelimMatches.get(outerIndex);
+            List<O> outerCommonNodes = commonNodesPerMatch.get(outerIndex);
+            Set<O> outerSetCommonNodes = new HashSet<O>(outerCommonNodes);
+            Set<O> finalIntersection = new HashSet<O>(outerCommonNodes);
+            Set<O> cumulativeIntersection = new HashSet<O>(outerCommonNodes);
+            List<O> patternMatchingRoots = new ArrayList<O>();
+            Set<O> unionOfRoots = new HashSet<O>();
+            boolean innerMatchProcessed = false;
+            unionOfRoots.add(outerMatch.get(0).first);
+            
+            
+            for(int innerIndex = outerIndex + 1; 
+                (innerIndex < mPrelimMatches.size()) && (!processedMatches.get(innerIndex)); 
+                ++innerIndex) {
+                List<Pair<O, RuleOperator.NodeType>> innerMatch = mPrelimMatches.get(innerIndex);
+                List<O> innerCommonNodes = commonNodesPerMatch.get(innerIndex);
+                Set<O> innerSetCommonNodes = new HashSet<O>(innerCommonNodes);
+                
+                //we need to compute the intersection of the common nodes
+                //the size of the intersection should be equal to the number
+                //of common nodes and the type of each rule node class
+                //if there is no match then it could be that we hit a match
+                //for a different path, i.e., another pattern that matched
+                //with a different set of nodes. In this case, we mark this
+                //match as not processed and move onto the next match
+                
+                outerSetCommonNodes.retainAll(innerSetCommonNodes);
+                
+                if(outerSetCommonNodes.size() != mNumCommonNodes) {
+                    //there was no match
+                    //continue to the next match
+                    continue;
+                } else {
+                    Set<O> tempCumulativeIntersection = new HashSet<O>(cumulativeIntersection);
+                    tempCumulativeIntersection.retainAll(outerSetCommonNodes);
+                    if(tempCumulativeIntersection.size() != mNumCommonNodes) {
+                        //problem - there was a set intersection with a size mismatch
+                        //between the cumulative intersection and the intersection of the
+                        //inner and outer common nodes 
+                        //set mMatches to empty and return
+                        mMatches = new ArrayList<List<O>>();
+                        return;
+                    } else {
+                        processedMatches.set(innerIndex, true);
+                        innerMatchProcessed = true;
+                        cumulativeIntersection = tempCumulativeIntersection;
+                        unionOfRoots.add(innerMatch.get(0).first);
+                    }
+                }
+            }
+            
+            cumulativeIntersection.retainAll(finalIntersection);
+            if(cumulativeIntersection.size() != mNumCommonNodes) {
+                //the cumulative and final intersections did not intersect
+                //this could happen when each of the matches are disjoint
+                //check if the innerMatches were processed at all
+                if(innerMatchProcessed) {
+                    //problem - the inner matches were processed and we did
+                    //not find common intersections
+                    mMatches = new ArrayList<List<O>>();
+                    return;
+                }
+            }
+            processedMatches.set(outerIndex, true);
+            for(O node: unionOfRoots) {
+                patternMatchingRoots.add(node);
+            }
+            mMatches.add(patternMatchingRoots);
+            ++outerIndex;
+        } while (outerIndex < mPrelimMatches.size() - 1);        
+    }
+
+    private List<O> getCommonNodesFromMatch(List<Pair<O, NodeType>> match) {
+        List<O> commonNodes = null;
+        //A lookup table to weed out duplicates
+        Map<O, Boolean> lookup = new HashMap<O, Boolean>();
+        for(int index = 0; index < match.size(); ++index) {
+            if(match.get(index).second.equals(RuleOperator.NodeType.COMMON_NODE)) {
+                if(commonNodes == null) {
+                    commonNodes = new ArrayList<O>();
+                }
+                O node = match.get(index).first;
+                //lookup the node under question
+                //if the node is not found in the table
+                //then we are examining it for the first time
+                //add it to the output list and mark it as seen
+                //else continue to the next iteration
+                if(lookup.get(node) == null) {
+                    commonNodes.add(node);
+                    lookup.put(node, true);
+                }
+            }
+        }
+        return commonNodes;
+    }
+
     private void BFSDoAllPredecessors(O node, Set<O> seen, Collection<O> fifo)  {
 		if (!seen.contains(node)) {
 		// We haven't seen this one before.
@@ -111,7 +297,7 @@
         for (O suc : successors) {
             if (seen.add(suc)) {
             	if (beginMatch(suc))
-            		mMatches.add(mMatch);
+            		mPrelimMatches.add(mMatch);
                 Collection<O> newSuccessors = mPlan.getSuccessors(suc);
                 DFSVisit(suc, newSuccessors, seen);
             }
@@ -144,75 +330,83 @@
     /*
      * This pattern matching is fairly simple and makes some important
      * assumptions.
-     * 1)  The pattern to be matched must be expressable as a simple list.  That
-     *     is it can match patterns like: 1->2, 2->3.  It cannot match patterns
-     *     like 1->2, 1->3.
-     * 2)  The pattern must always begin with the first node in the nodes array.
-     *     After that it can go where it wants.  So 1->3, 3->4 is legal.  A
-     *     pattern of 2->1 is not.
+     * 1)  The pattern to be matched must be expressible as a graph.
+     * 2)  The pattern must always begin with one of the root nodes in the rule plan.
+     *     After that it can go where it wants.
      *
      */
     private boolean beginMatch(O node) {
         if (node == null) return false;
         
-        int sz = mRule.nodes.size();
-        mMatch = new ArrayList<O>(sz);
-        // Add sufficient slots in the matches array
-        for (int i = 0; i < sz; i++) mMatch.add(null);
-        
-        List<O> successors = new ArrayList<O>();
-        if (node.getClass().getName().equals(mRule.nodes.get(0)) || 
-                mRule.nodes.get(0).equals("any")) {
-            mMatch.set(0, node);
-            // Follow the edge to see the next node we should be looking for.
-            Integer nextOpNum = mRule.edges.get(0);
-            if (nextOpNum == null) {
-                // This was looking for a single node
-                return true;
-            }
-            successors = mPlan.getSuccessors(node);
-            if (successors == null) return false;
-            for (O successorOp : successors) {
-                if (continueMatch(successorOp, nextOpNum)) return true;
+        mMatch = new ArrayList<Pair<O, RuleOperator.NodeType>>();
+        
+        List<O> nodeSuccessors;
+        List<RuleOperator> ruleRoots = mRule.getPlan().getRoots();
+        for(RuleOperator ruleRoot: ruleRoots) {
+            if (node.getClass().getName().equals(ruleRoot.getNodeClass().getName()) || 
+                    ruleRoot.getNodeType().equals(RuleOperator.NodeType.ANY_NODE)) {
+                mMatch.add(new Pair<O, RuleOperator.NodeType>(node, ruleRoot.getNodeType()));
+                // Follow the edge to see the next node we should be looking for.
+                List<RuleOperator> ruleRootSuccessors = mRule.getPlan().getSuccessors(ruleRoot);
+                if (ruleRootSuccessors == null) {
+                    // This was looking for a single node
+                    return true;
+                }
+                nodeSuccessors = mPlan.getSuccessors(node);
+                if ((nodeSuccessors == null) || (nodeSuccessors.size() != ruleRootSuccessors.size())) {
+                    //the ruleRoot has successors but the node does not
+                    //OR
+                    //the number of successors for the ruleRoot does not match 
+                    //the number of successors for the node
+                    return false; 
+                }
+                boolean foundMatch = false;
+                for (O nodeSuccessor : nodeSuccessors) {
+                    foundMatch |= continueMatch(nodeSuccessor, ruleRootSuccessors);
+                }
+                return foundMatch;
             }
         }
         // If we get here we haven't found it.
         return false;
     }
 
-    private boolean continueMatch(O current, Integer nodeNumber) {
-        if (current.getClass().getName().equals(mRule.nodes.get(nodeNumber)) || 
-                mRule.nodes.get(nodeNumber).equals("any")) {
-            mMatch.set(nodeNumber, current);
-
-            // Follow the edge to see the next node we should be looking for.
-            Integer nextOpNum = mRule.edges.get(nodeNumber);
-            if (nextOpNum == null) {
-                // We've comleted the match
-                return true;
-            }
-            List<O> successors = new ArrayList<O>();
-            successors = mPlan.getSuccessors(current);
-            if (successors == null) return false;
-            for (O successorOp : successors) {
-                if (continueMatch(successorOp, nextOpNum)) return true;
-            }
-        } else if (!mRule.required.get(nodeNumber)) {
-            // This node was optional, so it's okay if we don't match, keep
-            // going anyway.  Keep looking for the current node (don't find our
-            // successors, but look for the next edge.
-            Integer nextOpNum = mRule.edges.get(nodeNumber);
-            if (nextOpNum == null) {
-                // We've comleted the match
-                return true;
-            }
-            if (continueMatch(current, nextOpNum)) return true;
-        }
-
-        // We can arrive here either because we didn't match at this node or
-        // further down the line.  One way or another we need to remove ourselves
-        // from the match vector and return false.
-        mMatch.set(nodeNumber, null);
+    private boolean continueMatch(O node, List<RuleOperator> ruleOperators) {
+        for(RuleOperator ruleOperator: ruleOperators) {
+            if (node.getClass().getName().equals(ruleOperator.getNodeClass().getName()) || 
+                    ruleOperator.getNodeType().equals(RuleOperator.NodeType.ANY_NODE)) {
+                mMatch.add(new Pair<O, RuleOperator.NodeType>(node,ruleOperator.getNodeType()));
+    
+                // Follow the edge to see the next node we should be looking for.
+                List<RuleOperator> ruleOperatorSuccessors = mRule.getPlan().getSuccessors(ruleOperator);
+                if (ruleOperatorSuccessors == null) {
+                    // We've completed the match
+                    return true;
+                }
+                List<O> nodeSuccessors;
+                nodeSuccessors = mPlan.getSuccessors(node);
+                if ((nodeSuccessors == null) || 
+                        (nodeSuccessors.size() != ruleOperatorSuccessors.size())) {
+                    //the ruleOperator has successors but the node does not
+                    //OR
+                    //the number of successors for the ruleOperator does not match 
+                    //the number of successors for the node
+                    return false;
+                }
+                boolean foundMatch = false;
+                for (O nodeSuccessor : nodeSuccessors) {
+                    foundMatch |= continueMatch(nodeSuccessor, ruleOperatorSuccessors);
+                }
+                return foundMatch;
+            }
+    
+            // We can arrive here either because we didn't match at this node or
+            // further down the line.  One way or another we need to remove ourselves
+            // from the match vector and return false.
+            //SMS - I don't think we need this as mMatch will be discarded anyway
+            //mMatch.set(nodeNumber, null);
+            return false;
+        }
         return false;
     }
 

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RuleOperator.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.plan.optimizer;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Parent for all Logical operators.
+ */
+public class RuleOperator extends Operator<RulePlanVisitor> {
+    private static final long serialVersionUID = 2L;
+
+    private static Log log = LogFactory.getLog(RuleOperator.class);
+    
+    public enum NodeType {
+        ANY_NODE,
+        SIMPLE_NODE,
+        MULTI_NODE,
+        COMMON_NODE;
+    }
+
+    private Class mNodeClass;
+    private NodeType mNodeType = NodeType.SIMPLE_NODE;
+    
+    /**
+     * @param clazz
+     *            Class type of this node, e.g.: LOFilter.class
+     * @param k
+     *            Operator key to assign to this node.
+     */
+    public RuleOperator(Class clazz, OperatorKey k) {
+        super(k);
+        mNodeClass = clazz;
+    }
+
+    /**
+     * @param clazz
+     *            Class type of this node, e.g.: LOFilter.class
+     * @param nodeType
+     *            Node type of this node             
+     * @param k
+     *            Operator key to assign to this node.
+     */
+    public RuleOperator(Class clazz, NodeType nodeType, OperatorKey k) {
+        super(k);
+        mNodeType = nodeType;
+        mNodeClass = clazz;
+    }
+    
+    /**
+     * Set the node type of this rule operator.
+     * 
+     * @param type 
+     *            Node type to set this operator to.
+     */
+    final public void setNodeType(NodeType type) {
+        mNodeType = type;
+    }
+
+    /**
+     * Get the node type of this operator.
+     */
+    public NodeType getNodeType() {
+        return mNodeType;
+    }
+
+    /**
+     * Get the node class of this operator.
+     */
+    public Class getNodeClass() {
+        return mNodeClass;
+    }
+
+    @Override
+    public String toString() {
+        return name();
+    }
+
+    /**
+     * Visit this node with the provided visitor. This should only be called by
+     * the visitor class itself, never directly.
+     * 
+     * @param v
+     *            Visitor to visit with.
+     * @throws VisitException
+     *             if the visitor has a problem.
+     */
+    public void visit(RulePlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    /**
+     * @see org.apache.pig.impl.plan.Operator#clone()
+     * Do not use the clone method directly. Operators are cloned when logical plans
+     * are cloned using {@link LogicalPlanCloner}
+     */
+    @Override
+    protected Object clone() throws CloneNotSupportedException {
+        RuleOperator ruleOpClone = (RuleOperator)super.clone();
+        return ruleOpClone;
+    }
+
+    @Override
+    public String name() {
+        StringBuffer msg = new StringBuffer();
+        msg.append("(Name: " + mNodeClass.getSimpleName() + " Node Type: " + mNodeType + "[" + this.mKey + "]" + ")");
+        return msg.toString();
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlan.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+
+public class RulePlan extends OperatorPlan<RuleOperator> {
+    private static final long serialVersionUID = 2L;
+
+    public RulePlan() {
+        super();
+    }
+
+    public void explain(
+            OutputStream out,
+            PrintStream ps) throws VisitorException, IOException {
+        RulePlanPrinter rpp = new RulePlanPrinter(ps, this);
+
+        rpp.print(out);
+    }
+    
+    /**
+     * Do not use the clone method directly. Use {@link LogicalPlanCloner} instead.
+     */
+/*    @Override
+    public RulePlan clone() throws CloneNotSupportedException {
+        RulePlan clone = new RulePlan();
+
+        // Get all the nodes in this plan, and clone them.  As we make
+        // clones, create a map between clone and original.  Then walk the
+        // connections in this plan and create equivalent connections in the
+        // clone.
+        Map<RuleOperator, RuleOperator> matches = 
+            //new HashMap<LogicalOperator, LogicalOperator>(mOps.size());
+            LogicalPlanCloneHelper.mOpToCloneMap;
+        for (RuleOperator op : mOps.keySet()) {
+            try {
+            RuleOperator c = (RuleOperator)op.clone();
+            clone.add(c);
+            matches.put(op, c);
+            } catch (CloneNotSupportedException cnse) {
+                cnse.printStackTrace();
+                throw cnse;
+            }
+        }
+
+        // Build the edges
+        for (RuleOperator op : mToEdges.keySet()) {
+            RuleOperator cloneTo = matches.get(op);
+            if (cloneTo == null) {
+                String msg = new String("Unable to find clone for op "
+                    + op.name());
+                log.error(msg);
+                throw new RuntimeException(msg);
+            }
+            Collection<RuleOperator> fromOps = mToEdges.get(op);
+            for (RuleOperator fromOp : fromOps) {
+                RuleOperator cloneFrom = matches.get(fromOp);
+                if (cloneFrom == null) {
+                    String msg = new String("Unable to find clone for op "
+                        + fromOp.name());
+                    log.error(msg);
+                    throw new RuntimeException(msg);
+                }
+                try {
+                    clone.connect(cloneFrom, cloneTo);
+                } catch (PlanException pe) {
+                    throw new RuntimeException(pe);
+                }
+            }
+        }
+
+        return clone;
+    }
+*/    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanPrinter.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.io.PrintStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class RulePlanPrinter extends RulePlanVisitor {
+
+    private PrintStream mStream = null;
+    private String TAB1 = "    ";
+    private String TABMore = "|   ";
+    private String LSep = "|\n|---";
+    private String USep = "|   |\n|   ";
+    private int levelCntr = -1;
+    private OutputStream printer;
+
+    /**
+     * @param ps PrintStream to output plan information to
+     * @param plan Logical plan to print
+     */
+    public RulePlanPrinter(PrintStream ps, RulePlan plan) {
+        //super(plan, new DependencyOrderWalker(plan));
+        super(plan, new DepthFirstWalker(plan));
+        mStream = ps;
+    }
+
+    @Override
+    public void visit() throws VisitorException {
+        try {
+            mStream.write(depthFirst().getBytes());
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
+    }
+
+    public void print(OutputStream printer) throws VisitorException, IOException {
+        this.printer = printer;
+        printer.write(depthFirst().getBytes());
+    }
+
+
+    protected String depthFirst() throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder();
+        List<RuleOperator> leaves = mPlan.getLeaves();
+        Collections.sort(leaves);
+        for (RuleOperator leaf : leaves) {
+            sb.append(depthFirst(leaf));
+            sb.append("\n");
+        }
+        //sb.delete(sb.length() - "\n".length(), sb.length());
+        //sb.delete(sb.length() - "\n".length(), sb.length());
+        return sb.toString();
+    }
+    
+    private String planString(RulePlan rulePlan) throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        if(rulePlan!=null)
+            rulePlan.explain(baos, mStream);
+        else
+            return "";
+        sb.append(USep);
+        sb.append(shiftStringByTabs(baos.toString(), 2));
+        return sb.toString();
+    }
+    
+    private String planString(
+            List<RulePlan> rulePlans) throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder();
+        if(rulePlans!=null)
+            for (RulePlan rulePlan : rulePlans) {
+                sb.append(planString(rulePlan));
+            }
+        return sb.toString();
+    }
+
+    private String depthFirst(RuleOperator node) throws VisitorException, IOException {
+        StringBuilder sb = new StringBuilder(node.name());
+        sb.append("\n");
+        
+        List<RuleOperator> originalPredecessors =  mPlan.getPredecessors(node);
+        if (originalPredecessors == null)
+            return sb.toString();
+        
+        List<RuleOperator> predecessors =  new ArrayList<RuleOperator>(originalPredecessors);
+        
+        Collections.sort(predecessors);
+        int i = 0;
+        for (RuleOperator pred : predecessors) {
+            i++;
+            String DFStr = depthFirst(pred);
+            if (DFStr != null) {
+                sb.append(LSep);
+                if (i < predecessors.size())
+                    sb.append(shiftStringByTabs(DFStr, 2));
+                else
+                    sb.append(shiftStringByTabs(DFStr, 1));
+            }
+        }
+        return sb.toString();
+    }
+
+    private String shiftStringByTabs(String DFStr, int TabType) {
+        StringBuilder sb = new StringBuilder();
+        String[] spl = DFStr.split("\n");
+
+        String tab = (TabType == 1) ? TAB1 : TABMore;
+
+        sb.append(spl[0] + "\n");
+        for (int i = 1; i < spl.length; i++) {
+            sb.append(tab);
+            sb.append(spl[i]);
+            sb.append("\n");
+        }
+        return sb.toString();
+    }
+
+    private void dispTabs() {
+        for (int i = 0; i < levelCntr; i++)
+            System.out.print(TAB1);
+    }
+}
+
+        

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java?rev=765073&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/optimizer/RulePlanVisitor.java Wed Apr 15 07:44:59 2009
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.plan.optimizer;
+
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+abstract public class RulePlanVisitor extends
+        PlanVisitor<RuleOperator, RulePlan> {
+
+    public RulePlanVisitor(RulePlan plan,
+                     PlanWalker<RuleOperator, RulePlan> walker) {
+        super(plan, walker);
+    }
+
+    /**
+     * @param ruleOp
+     *            the rule operator that has to be visited
+     * @throws VisitorException
+     */
+    protected void visit(RuleOperator ruleOp)
+            throws VisitorException {
+        //
+        // Do Nothing
+        //
+    }
+
+}
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java?rev=765073&r1=765072&r2=765073&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java Wed Apr 15 07:44:59 2009
@@ -23,25 +23,34 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.optimizer.ImplicitSplitInserter;
 import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer;
 import org.apache.pig.impl.logicalLayer.optimizer.StreamOptimizer;
 import org.apache.pig.impl.logicalLayer.optimizer.TypeCastInserter;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.optimizer.PlanOptimizer;
 import org.apache.pig.impl.plan.optimizer.Rule;
+import org.apache.pig.impl.plan.optimizer.RuleOperator;
+import org.apache.pig.impl.plan.optimizer.RulePlan;
+import org.apache.pig.impl.util.MultiMap;
 
 //This optimiser puts in the bare minimum modifications needed to make sure the plan is functional
 public class FunctionalLogicalOptimizer extends
         PlanOptimizer<LogicalOperator, LogicalPlan> {
 
-    public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
-    public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
-
+    private static final String SCOPE = "RULE";
+    private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
+    
     public FunctionalLogicalOptimizer(LogicalPlan plan) {
         super(plan);
 
+        RulePlan rulePlan;        
+
         // List of rules for the logical optimizer
 
         // This one has to be first, as the type cast inserter expects the
@@ -50,33 +59,32 @@
         // it explicit. Since the RuleMatcher doesn't handle trees properly,
         // we cheat and say that we match any node. Then we'll do the actual
         // test in the transformers check method.
-        List<String> nodes = new ArrayList<String>(1);
-        Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
-        List<Boolean> required = new ArrayList<Boolean>(1);
-        nodes.add("any");
-        required.add(true);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
-                required, new ImplicitSplitInserter(plan)));
+        
+        rulePlan = new RulePlan();
+        RuleOperator anyLogicalOperator = new RuleOperator(LogicalOperator.class, RuleOperator.NodeType.ANY_NODE, 
+                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+        rulePlan.add(anyLogicalOperator);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+                new ImplicitSplitInserter(plan), "ImplicitSplitInserter"));
+
 
         // Add type casting to plans where the schema has been declared (by
         // user, data, or data catalog).
-        nodes = new ArrayList<String>(1);
-        nodes.add(LOLOAD_CLASSNAME);
-        edges = new HashMap<Integer, Integer>();
-        required = new ArrayList<Boolean>(1);
-        required.add(true);
-        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
-                required, new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+        rulePlan = new RulePlan();
+        RuleOperator loLoad = new RuleOperator(LOLoad.class, 
+                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+        rulePlan.add(loLoad);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+                new TypeCastInserter(plan, LOLoad.class.getName()), "LoadTypeCastInserter"));
 
         // Add type casting to plans where the schema has been declared by
         // user in a statement with stream operator.
-        nodes = new ArrayList<String>(1);
-        nodes.add(LOSTREAM_CLASSNAME);
-        edges = new HashMap<Integer, Integer>();
-        required = new ArrayList<Boolean>(1);
-        required.add(true);
-        mRules.add(new Rule(nodes, edges, required, new TypeCastInserter(plan,
-                LOSTREAM_CLASSNAME)));
+        rulePlan = new RulePlan();
+        RuleOperator loStream= new RuleOperator(LOStream.class, 
+                new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+        rulePlan.add(loStream);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
+                LOStream.class.getName()), "StreamTypeCastInserter"));
 
     }