You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC

svn commit: r1045314 [1/5] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...

Author: yanz
Date: Mon Dec 13 19:11:00 2010
New Revision: 1045314

URL: http://svn.apache.org/viewvc?rev=1045314&view=rev
Log:
PIG-1712: ILLUSTRATE rework (yanz)

Added:
    pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java
    pig/trunk/src/org/apache/pig/pen/Illustrable.java
    pig/trunk/src/org/apache/pig/pen/Illustrator.java
    pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
    pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
    pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
    pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java
    pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java
    pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput.txt
    pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt
    pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput_invalid.txt
    pig/trunk/test/org/apache/pig/test/data/illustrate.pig
    pig/trunk/test/org/apache/pig/test/data/illustrate2.pig
    pig/trunk/test/org/apache/pig/test/data/illustrate3.pig
    pig/trunk/test/org/apache/pig/test/data/illustrate4.pig
    pig/trunk/test/org/apache/pig/test/data/illustrate5.pig
    pig/trunk/test/org/apache/pig/test/data/illustrate6.pig
Removed:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrangeForIllustrate.java
    pig/trunk/src/org/apache/pig/pen/LocalLogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/pen/physicalOperators/
    pig/trunk/test/org/apache/pig/test/TestLocalPOSplit.java
    pig/trunk/test/org/apache/pig/test/TestPOCogroup.java
    pig/trunk/test/org/apache/pig/test/TestPOCross.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
    pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
    pig/trunk/src/org/apache/pig/data/TupleFactory.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
    pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
    pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
    pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
    pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
    pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
    pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
    pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java
    pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java
    pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java
    pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java
    pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
    pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    pig/trunk/test/org/apache/pig/test/TestStore.java
    pig/trunk/test/org/apache/pig/test/utils/POCastDummy.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Dec 13 19:11:00 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1712: ILLUSTRATE rework (yanz)
+
 PIG-1758: Deep cast of complex type (daijy)
 
 PIG-1728: doc updates (chandec via olgan)

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Mon Dec 13 19:11:00 2010
@@ -1125,23 +1125,32 @@ public class PigServer {
         return currDAG.getAliasOp().keySet();
     }
 
-    public Map<LogicalOperator, DataBag> getExamples(String alias) {
+    public Map<LogicalOperator, DataBag> getExamples(String alias) throws IOException {
         LogicalPlan plan = null;
-
         try {        
-            if (currDAG.isBatchOn()) {
+            if (currDAG.isBatchOn() && alias != null) {
                 currDAG.execute();
             }
-            
-            plan = getClonedGraph().getPlan(alias);
+            Graph g = getClonedGraph();
+            plan = g.getPlan(alias);
+            plan = compileLp(plan, g, false);
         } catch (IOException e) {
             //Since the original script is parsed anyway, there should not be an
             //error in this parsing. The only reason there can be an error is when
             //the files being loaded in load don't exist anymore.
             e.printStackTrace();
         }
+
         ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
-        return exgen.getExamples();
+        try {
+            return exgen.getExamples();
+        } catch (ExecException e) {
+            e.printStackTrace(System.out);
+            throw new IOException("ExecException : "+ e.getMessage());
+        } catch (Exception e) {
+            e.printStackTrace(System.out);
+            throw new IOException("Exception : "+ e.getMessage());
+        }
     }
 
     private LogicalPlan getStorePlan(String alias) throws IOException {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Dec 13 19:11:00 2010
@@ -57,6 +57,8 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -69,17 +71,17 @@ import org.apache.pig.newplan.logical.Lo
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
-import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher.ProjectionFinder;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LOStore;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.rules.InputOutputFileValidator;
-import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.pen.POOptimizeDisabler;
 
 public class HExecutionEngine {
     
@@ -105,6 +107,11 @@ public class HExecutionEngine {
     // map from LOGICAL key to into about the execution
     protected Map<OperatorKey, MapRedResult> materializedResults;
     
+    protected Map<LogicalOperator, PhysicalOperator> logToPhyMap;
+    protected Map<LogicalOperator, LogicalRelationalOperator> opsMap;
+    protected Map<Operator, PhysicalOperator> newLogToPhyMap;
+    private Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> forEachInnerOpMap;
+    
     public HExecutionEngine(PigContext pigContext) {
         this.pigContext = pigContext;
         this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();      
@@ -255,8 +262,16 @@ public class HExecutionEngine {
                 // translate old logical plan to new plan
                 LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
                 visitor.visit();
+                opsMap = visitor.getOldToNewLOOpMap();
+                forEachInnerOpMap = visitor.getForEachInnerMap();
                 org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
                 
+                if (pigContext.inIllustrator) {
+                    // disable all PO-specific optimizations
+                    POOptimizeDisabler pod = new POOptimizeDisabler(newPlan);
+                    pod.visit();
+                }
+                
                 SchemaResetter schemaResetter = new SchemaResetter(newPlan);
                 schemaResetter.visit();
                 
@@ -271,6 +286,22 @@ public class HExecutionEngine {
                     throw new FrontendException(msg, errCode, PigException.BUG, ioe);
                 }
                 
+                if (pigContext.inIllustrator) {
+                    // disable MergeForEach in illustrator
+                    if (optimizerRules == null)
+                        optimizerRules = new HashSet<String>();
+                    optimizerRules.add("MergeForEach");
+                    optimizerRules.add("PartitionFilterOptimizer");
+                    optimizerRules.add("LimitOptimizer");
+                    optimizerRules.add("SplitFilter");
+                    optimizerRules.add("PushUpFilter");
+                    optimizerRules.add("MergeFilter");
+                    optimizerRules.add("PushDownForEachFlatten");
+                    optimizerRules.add("ColumnMapKeyPrune");
+                    optimizerRules.add("AddForEach");
+                    optimizerRules.add("GroupByConstParallelSetter");
+                }
+                
                 // run optimizer
                 org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer = 
                     new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(newPlan, 100, optimizerRules);
@@ -294,6 +325,7 @@ public class HExecutionEngine {
                 
                 translator.setPigContext(pigContext);
                 translator.visit();
+                newLogToPhyMap = translator.getLogToPhyMap();
                 return translator.getPhysicalPlan();
                 
             }else{       
@@ -303,13 +335,40 @@ public class HExecutionEngine {
                 translator.visit();
                 return translator.getPhysicalPlan();
             }
-        } catch (Exception ve) {
+        } catch (ExecException ve) {
             int errCode = 2042;
             String msg = "Error in new logical plan. Try -Dpig.usenewlogicalplan=false.";
             throw new FrontendException(msg, errCode, PigException.BUG, ve);
         }
     }
     
+    public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+        if (logToPhyMap != null)
+            return logToPhyMap;
+        else if (newLogToPhyMap != null) {
+            Map<LogicalOperator, PhysicalOperator> result = new HashMap<LogicalOperator, PhysicalOperator>();
+            for (LogicalOperator lo: opsMap.keySet()) {
+                result.put(lo, newLogToPhyMap.get(opsMap.get(lo))); 
+            }
+            return result;
+        } else
+            return null;
+    }
+    
+    public Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap() {
+        Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> result =
+            new HashMap<LOForEach, Map<LogicalOperator, PhysicalOperator>>();
+        for (Map.Entry<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> entry :
+            forEachInnerOpMap.entrySet()) {
+            Map<LogicalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalOperator, PhysicalOperator>();
+            for (Map.Entry<LogicalOperator, LogicalRelationalOperator> innerEntry : entry.getValue().entrySet()) {
+                innerOpMap.put(innerEntry.getKey(), newLogToPhyMap.get(innerEntry.getValue()));
+            }
+            result.put(entry.getKey(), innerOpMap);
+        }
+        return result;
+    }
+    
     public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
 
         public SortInfoSetter(OperatorPlan plan) throws FrontendException {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Dec 13 19:11:00 2010
@@ -174,7 +174,10 @@ public class JobControlCompiler{
         UDFContext.getUDFContext().reset();
     }
 
-    Map<Job, MapReduceOper> getJobMroMap() {
+    /**
+     * Gets the map of Job and the MR Operator
+     */
+    public Map<Job, MapReduceOper> getJobMroMap() {
         return Collections.unmodifiableMap(jobMroMap);
     }
     
@@ -378,19 +381,23 @@ public class JobControlCompiler{
                     inpTargets.add(ldSucKeys);
                     inpSignatureLists.add(ld.getSignature());
                     //Remove the POLoad from the plan
-                    mro.mapPlan.remove(ld);
+                    if (!pigContext.inIllustrator)
+                        mro.mapPlan.remove(ld);
                 }
             }
 
-            //Create the jar of all functions and classes required
-            File submitJarFile = File.createTempFile("Job", ".jar");
-            // ensure the job jar is deleted on exit
-            submitJarFile.deleteOnExit();
-            FileOutputStream fos = new FileOutputStream(submitJarFile);
-            JarManager.createJar(fos, mro.UDFs, pigContext);
+            if (!pigContext.inIllustrator) 
+            {
+                //Create the jar of all functions and classes required
+                File submitJarFile = File.createTempFile("Job", ".jar");
+                // ensure the job jar is deleted on exit
+                submitJarFile.deleteOnExit();
+                FileOutputStream fos = new FileOutputStream(submitJarFile);
+                JarManager.createJar(fos, mro.UDFs, pigContext);
             
-            //Start setting the JobConf properties
-            conf.set("mapred.jar", submitJarFile.getPath());
+                //Start setting the JobConf properties
+                conf.set("mapred.jar", submitJarFile.getPath());
+            }
             conf.set("pig.inputs", ObjectSerializer.serialize(inp));
             conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
             conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
@@ -457,22 +464,23 @@ public class JobControlCompiler{
                 POStore st;
                 if (reduceStores.isEmpty()) {
                     st = mapStores.get(0);
-                    mro.mapPlan.remove(st);
+                    if(!pigContext.inIllustrator)
+                        mro.mapPlan.remove(st);
                 }
                 else {
                     st = reduceStores.get(0);
-                    mro.reducePlan.remove(st);
+                    if(!pigContext.inIllustrator)
+                        mro.reducePlan.remove(st);
                 }
 
                 // set out filespecs
                 String outputPath = st.getSFile().getFileName();
-                FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
                 
                 conf.set("pig.streaming.log.dir", 
                             new Path(outputPath, LOG_DIR).toString());
                 conf.set("pig.streaming.task.output.dir", outputPath);
             } 
-           else { // multi store case
+           else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
                 log.info("Setting up multi store job");
                 String tmpLocationStr =  FileLocalizer
                 .getTemporaryPath(pigContext).toString();
@@ -513,7 +521,8 @@ public class JobControlCompiler{
                 //MapOnly Job
                 nwJob.setMapperClass(PigMapOnly.Map.class);
                 nwJob.setNumReduceTasks(0);
-                conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+                if(!pigContext.inIllustrator)
+                    conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                 if(mro.isEndOfAllInputSetInMap()) {
                     // this is used in Map.close() to decide whether the
                     // pipeline needs to be rerun one more time in the close()
@@ -535,7 +544,8 @@ public class JobControlCompiler{
                     log.info("Setting identity combiner class.");
                 }
                 pack = (POPackage)mro.reducePlan.getRoots().get(0);
-                mro.reducePlan.remove(pack);
+                if(!pigContext.inIllustrator)
+                    mro.reducePlan.remove(pack);
                 nwJob.setMapperClass(PigMapReduce.Map.class);
                 nwJob.setReducerClass(PigMapReduce.Reduce.class);
                 
@@ -550,21 +560,24 @@ public class JobControlCompiler{
                 if (mro.customPartitioner != null)
                 	nwJob.setPartitionerClass(PigContext.resolveClassName(mro.customPartitioner));
 
-                conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+                if(!pigContext.inIllustrator)
+                    conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                 if(mro.isEndOfAllInputSetInMap()) {
                     // this is used in Map.close() to decide whether the
                     // pipeline needs to be rerun one more time in the close()
                     // The pipeline is rerun only if there was a stream or merge-join.
                     conf.set(END_OF_INP_IN_MAP, "true");
                 }
-                conf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
+                if(!pigContext.inIllustrator)
+                    conf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
                 if(mro.isEndOfAllInputSetInReduce()) {
                     // this is used in Map.close() to decide whether the
                     // pipeline needs to be rerun one more time in the close()
                     // The pipeline is rerun only if there was a stream
                     conf.set("pig.stream.in.reduce", "true");
                 }
-                conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
+                if (!pigContext.inIllustrator)
+                    conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
                 conf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType())); 
                 
                 if (mro.getUseSecondaryKey()) {
@@ -631,9 +644,14 @@ public class JobControlCompiler{
                 nwJob.setGroupingComparatorClass(PigGroupingPartitionWritableComparator.class);
             }
             
-            // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized 
-            for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);}
-            for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);}
+            if (!pigContext.inIllustrator)
+            {
+                // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized 
+                for (POStore st: mapStores) { st.setInputs(null); st.setParentPlan(null);}
+                for (POStore st: reduceStores) { st.setInputs(null); st.setParentPlan(null);}
+                conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
+                conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
+            }
 
             // tmp file compression setups
             if (Utils.tmpFileCompression(pigContext)) {
@@ -641,8 +659,6 @@ public class JobControlCompiler{
                 conf.set("pig.tmpfilecompression.codec", Utils.tmpFileCompressionCodec(pigContext));
             }
 
-            conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
-            conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
             String tmp;
             long maxCombinedSplitSize = 0;
             if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
@@ -661,7 +677,6 @@ public class JobControlCompiler{
             UDFContext.getUDFContext().serialize(conf);
             Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList());
             jobStoreMap.put(cjob,new Pair<List<POStore>, Path>(storeLocations, tmpLocation));
-            
             return cjob;
             
         } catch (JobCreationException jce) {
@@ -1142,7 +1157,7 @@ public class JobControlCompiler{
             PigContext pigContext, Configuration conf, String filename,
             String prefix) throws IOException {
 
-        if (!FileLocalizer.fileExists(filename, pigContext)) {
+        if (!pigContext.inIllustrator && !FileLocalizer.fileExists(filename, pigContext)) {
             throw new IOException(
                     "Internal error: skew join partition file "
                     + filename + " does not exist");

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Dec 13 19:11:00 2010
@@ -109,7 +109,6 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UriUtil;
 import org.apache.pig.impl.util.Utils;
-import org.mortbay.util.URIUtil;
 
 /**
  * The compiler that compiles a given physical plan
@@ -311,6 +310,7 @@ public class MRCompiler extends PhyPlanV
     public MROperPlan compile() throws IOException, PlanException, VisitorException {
         List<PhysicalOperator> leaves = plan.getLeaves();
 
+        if (!pigContext.inIllustrator)
         for (PhysicalOperator op : leaves) {
             if (!(op instanceof POStore)) {
                 int errCode = 2025;
@@ -324,8 +324,14 @@ public class MRCompiler extends PhyPlanV
         // and compile their plans
         List<POStore> stores = PlanHelper.getStores(plan);
         List<PONative> nativeMRs= PlanHelper.getNativeMRs(plan);
-        List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(stores.size() + nativeMRs.size());
-        ops.addAll(stores);
+        List<PhysicalOperator> ops;
+        if (!pigContext.inIllustrator) {
+            ops = new ArrayList<PhysicalOperator>(stores.size() + nativeMRs.size());
+            ops.addAll(stores);
+        } else {
+            ops = new ArrayList<PhysicalOperator>(leaves.size() + nativeMRs.size());
+            ops.addAll(leaves);
+        }
         ops.addAll(nativeMRs);
         Collections.sort(ops);
         
@@ -1005,16 +1011,23 @@ public class MRCompiler extends PhyPlanV
             if (!mro.isMapDone()) {
             	// if map plan is open, add a limit for optimization, eventually we
             	// will add another limit to reduce plan
-                mro.mapPlan.addAsLeaf(op);
-                mro.setMapDone(true);
+                if (!pigContext.inIllustrator)
+                {
+                    mro.mapPlan.addAsLeaf(op);
+                    mro.setMapDone(true);
+                }
                 
                 if (mro.reducePlan.isEmpty())
                 {
                     simpleConnectMapToReduce(mro);
                     mro.requestedParallelism = 1;
-                    POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    pLimit2.setLimit(op.getLimit());
-                    mro.reducePlan.addAsLeaf(pLimit2);
+                    if (!pigContext.inIllustrator) {
+                        POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                        pLimit2.setLimit(op.getLimit());
+                        mro.reducePlan.addAsLeaf(pLimit2);
+                    } else {
+                        mro.reducePlan.addAsLeaf(op);
+                    }
                 }
                 else
                 {
@@ -1848,6 +1861,7 @@ public class MRCompiler extends PhyPlanV
             curMROp.reducePlan.addAsLeaf(nfe1);
             curMROp.setNeedsDistinctCombiner(true);
             phyToMROpMap.put(op, curMROp);
+            curMROp.phyToMRMap.put(op, nfe1);
         }catch(Exception e){
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -2221,12 +2235,13 @@ public class MRCompiler extends PhyPlanV
         POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps2,flattened);
         mro.reducePlan.add(nfe1);
         mro.reducePlan.connect(pkg, nfe1);
-        
+        mro.phyToMRMap.put(sort, nfe1);
         if (limit!=-1)
         {
 	        POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
 	    	pLimit2.setLimit(limit);
 	    	mro.reducePlan.addAsLeaf(pLimit2);
+	    	mro.phyToMRMap.put(sort, pLimit2);
         }
 
 //        ep1.add(innGen);
@@ -2649,7 +2664,7 @@ public class MRCompiler extends PhyPlanV
             POPackage pack = (POPackage)op;
             
             List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(pack);
-            if (sucs.size()!=1) {
+            if (sucs == null || sucs.size()!=1) {
                 return;
             }
             
@@ -2739,12 +2754,12 @@ public class MRCompiler extends PhyPlanV
                 {
                     // Now we can optimize the map-reduce plan
                     // Replace POPackage->POForeach to POJoinPackage
-                    replaceWithPOJoinPackage(mr.reducePlan, pack, forEach, chunkSize);
+                    replaceWithPOJoinPackage(mr.reducePlan, mr, pack, forEach, chunkSize);
                 }
             }
         }
 
-        public static void replaceWithPOJoinPackage(PhysicalPlan plan,
+        public static void replaceWithPOJoinPackage(PhysicalPlan plan, MapReduceOper mr,
                 POPackage pack, POForEach forEach, String chunkSize) throws VisitorException {
             String scope = pack.getOperatorKey().scope;
             NodeIdGenerator nig = NodeIdGenerator.getGenerator();
@@ -2772,7 +2787,7 @@ public class MRCompiler extends PhyPlanV
                 String msg = "Error rewriting POJoinPackage.";
                 throw new MRCompilerException(msg, errCode, PigException.BUG, e);
             }
-            
+            mr.phyToMRMap.put(forEach, joinPackage);
             LogFactory.
             getLog(LastInputStreamingOptimizer.class).info("Rewrite: POPackage->POForEach to POJoinPackage");
         }
@@ -2800,6 +2815,7 @@ public class MRCompiler extends PhyPlanV
                 throw new MRCompilerException(msg, errCode, PigException.BUG);
             }
             PhysicalOperator mpLeaf = mpLeaves.get(0);
+            if (!pigContext.inIllustrator)
             if (!(mpLeaf instanceof POStore)) {
                 int errCode = 2025;
                 String msg = "Expected leaf of reduce plan to " +
@@ -2885,6 +2901,7 @@ public class MRCompiler extends PhyPlanV
                     throw new MRCompilerException(msg, errCode, PigException.BUG);
                 }
                 PhysicalOperator mpLeaf = mpLeaves.get(0);
+                if (!pigContext.inIllustrator)
                 if (!(mpLeaf instanceof POStore)) {
                     int errCode = 2025;
                     String msg = "Expected leaf of reduce plan to " +

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Dec 13 19:11:00 2010
@@ -461,7 +461,7 @@ public class MapReduceLauncher extends L
         }
     }
 
-    private MROperPlan compile(
+    public MROperPlan compile(
             PhysicalPlan php,
             PigContext pc) throws PlanException, IOException, VisitorException {
         MRCompiler comp = new MRCompiler(php, pc);
@@ -479,7 +479,7 @@ public class MapReduceLauncher extends L
         
         //String prop = System.getProperty("pig.exec.nocombiner");
         String prop = pc.getProperties().getProperty("pig.exec.nocombiner");
-        if (!("true".equals(prop)))  {
+        if (!pc.inIllustrator && !("true".equals(prop)))  {
             CombinerOptimizer co = new CombinerOptimizer(plan, lastInputChunkSize);
             co.visit();
             //display the warning message(s) from the CombinerOptimizer
@@ -493,7 +493,7 @@ public class MapReduceLauncher extends L
         
         // Optimize to use secondary sort key if possible
         prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
-        if (!("true".equals(prop)))  {
+        if (!pc.inIllustrator && !("true".equals(prop)))  {
             SecondaryKeyOptimizer skOptimizer = new SecondaryKeyOptimizer(plan);
             skOptimizer.visit();
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Dec 13 19:11:00 2010
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -146,6 +148,10 @@ public class MapReduceOper extends Opera
 	// are NOT combinable for correctness.
 	private boolean combineSmallSplits = true;
 	
+	// Map of the physical operator in physical plan to the one in MR plan: only needed
+	// if the physical operator is changed/replaced in MR compilation due to, e.g., optimization
+	public Map<PhysicalOperator, PhysicalOperator> phyToMRMap;
+	
 	private static enum OPER_FEATURE {
 	    NONE,
 	    // Indicate if this job is a sampling job
@@ -169,6 +175,7 @@ public class MapReduceOper extends Opera
         scalars = new HashSet<PhysicalOperator>();
         nig = NodeIdGenerator.getGenerator();
         scope = k.getScope();
+        phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();
     }
 
     /*@Override

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Mon Dec 13 19:11:00 2010
@@ -276,13 +276,6 @@ public class PhyPlanSetter extends PhyPl
         stream.setParentPlan(parent);
     }
 
-    @Override
-    public void visitLocalRearrangeForIllustrate(
-            POLocalRearrangeForIllustrate lrfi) throws VisitorException {
-        super.visitLocalRearrangeForIllustrate(lrfi);
-        lrfi.setParentPlan(parent);
-    }
-
 /*
     @Override
     public void visitPartitionRearrange(POPartitionRearrange lrfi) throws VisitorException {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Mon Dec 13 19:11:00 2010
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +29,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -40,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +52,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public abstract class PigMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
@@ -58,13 +63,15 @@ public abstract class PigMapBase extends
     protected byte keyType;
         
     //Map Plan
-    protected PhysicalPlan mp;
+    protected PhysicalPlan mp = null;
 
     // Store operators
     protected List<POStore> stores;
 
     protected TupleFactory tf = TupleFactory.getInstance();
     
+    boolean inIllustrator = false;
+    
     Context outputCollector;
     
     // Reporter that will be used by operators
@@ -81,6 +88,14 @@ public abstract class PigMapBase extends
     private volatile boolean initialized = false;
     
     /**
+     * for local map/reduce simulation
+     * @param plan the map plan
+     */
+    public void setMapPlan(PhysicalPlan plan) {
+        mp = plan;
+    }
+    
+    /**
      * Will be called when all the tuples in the input
      * are done. So reporter thread should be closed.
      */
@@ -142,14 +157,16 @@ public abstract class PigMapBase extends
         SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
         PigMapReduce.sJobContext = context;
         PigMapReduce.sJobConf = context.getConfiguration();
+        inIllustrator = (context instanceof IllustratorContext);
         
         PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
         pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
         if (pigContext.getLog4jProperties()!=null)
             PropertyConfigurator.configure(pigContext.getLog4jProperties());
         
-        mp = (PhysicalPlan) ObjectSerializer.deserialize(
-            job.get("pig.mapPlan"));
+        if (mp == null)
+            mp = (PhysicalPlan) ObjectSerializer.deserialize(
+                job.get("pig.mapPlan"));
         stores = PlanHelper.getStores(mp);
         
         // To be removed
@@ -207,7 +224,8 @@ public abstract class PigMapBase extends
                 MapReducePOStoreImpl impl 
                     = new MapReducePOStoreImpl(context);
                 store.setStoreImpl(impl);
-                store.setUp();
+                if (!pigContext.inIllustrator)
+                    store.setUp();
             }
             
             boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
@@ -225,7 +243,13 @@ public abstract class PigMapBase extends
         }
         
         for (PhysicalOperator root : roots) {
-            root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
+            if (inIllustrator) {
+                if (root != null) {
+                    root.attachInput(inpTuple);
+                }
+            } else {
+                root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
+            }
         }
             
         runPipeline(leaf);
@@ -284,4 +308,76 @@ public abstract class PigMapBase extends
         this.keyType = keyType;
     }
     
+    /**
+     * 
+     * Get mapper's illustrator context
+     * 
+     * @param conf  Configuration
+     * @param input Input bag to serve as data source
+     * @param output Map output buffer
+     * @param split the split
+     * @return Illustrator's context
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public Context getIllustratorContext(Configuration conf, DataBag input,
+          List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+          throws IOException, InterruptedException {
+        return new IllustratorContext(conf, input, output, split);
+    }
+    
+    public class IllustratorContext extends Context {
+        private DataBag input;
+        List<Pair<PigNullableWritable, Writable>> output;
+        private Iterator<Tuple> it = null;
+        private Tuple value = null;
+        private boolean init  = false;
+
+        public IllustratorContext(Configuration conf, DataBag input,
+              List<Pair<PigNullableWritable, Writable>> output,
+              InputSplit split) throws IOException, InterruptedException {
+              super(conf, new TaskAttemptID(), null, null, null, null, split);
+              if (output == null)
+                  throw new IOException("Null output can not be used");
+              this.input = input; this.output = output;
+        }
+        
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            if (input == null) {
+                if (!init) {
+                    init = true;
+                    return true;
+                }
+                return false;
+            }
+            if (it == null)
+                it = input.iterator();
+            if (!it.hasNext())
+                return false;
+            value = it.next();
+            return true;
+        }
+        
+        @Override
+        public Text getCurrentKey() {
+          return null;
+        }
+        
+        @Override
+        public Tuple getCurrentValue() {
+          return value;
+        }
+        
+        @Override
+        public void write(PigNullableWritable key, Writable value) 
+            throws IOException, InterruptedException {
+            output.add(new Pair<PigNullableWritable, Writable>(key, value));
+        }
+        
+        @Override
+        public void progress() {
+          
+        }
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Mon Dec 13 19:11:00 2010
@@ -18,18 +18,24 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Collections;
+import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.log4j.PropertyConfigurator;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -43,6 +49,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -55,6 +62,7 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
@@ -111,8 +119,8 @@ public class PigMapReduce {
             // value.  The value needs it so that POPackage can properly
             // assign the tuple to its slot in the projection.
             key.setIndex(index);
-            val.setIndex(index);         	
-            	
+            val.setIndex(index);
+
             oc.write(key, val);
         }
     }
@@ -194,7 +202,6 @@ public class PigMapReduce {
             // set the partition
             wrappedKey.setPartition(partitionIndex);
             val.setIndex(index);
-
             oc.write(wrappedKey, val);
         }
 
@@ -254,7 +261,7 @@ public class PigMapReduce {
         protected final Log log = LogFactory.getLog(getClass());
         
         //The reduce plan
-        protected PhysicalPlan rp;
+        protected PhysicalPlan rp = null;
 
         // Store operators
         protected List<POStore> stores;
@@ -279,6 +286,16 @@ public class PigMapReduce {
         PigContext pigContext = null;
         protected volatile boolean initialized = false;
         
+        private boolean inIllustrator = false;
+        
+        /**
+         * Set the reduce plan: to be used by local runner for illustrator
+         * @param plan Reduce plan
+         */
+        public void setReducePlan(PhysicalPlan plan) {
+            rp = plan;
+        }
+
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -287,7 +304,9 @@ public class PigMapReduce {
         @Override
         protected void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
-            
+            inIllustrator = (context instanceof IllustratorContext);
+            if (inIllustrator)
+                pack = ((IllustratorContext) context).pack;
             Configuration jConf = context.getConfiguration();
             SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
             sJobContext = context;
@@ -296,11 +315,13 @@ public class PigMapReduce {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
                 pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
                 
-                rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
-                        .get("pig.reducePlan"));
+                if (rp == null)
+                    rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
+                            .get("pig.reducePlan"));
                 stores = PlanHelper.getStores(rp);
 
-                pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
+                if (!inIllustrator)
+                    pack = (POPackage)ObjectSerializer.deserialize(jConf.get("pig.reduce.package"));
                 // To be removed
                 if(rp.isEmpty())
                     log.debug("Reduce Plan empty!");
@@ -352,12 +373,13 @@ public class PigMapReduce {
                 
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
 
-                for (POStore store: stores) {
-                    MapReducePOStoreImpl impl 
-                        = new MapReducePOStoreImpl(context);
-                    store.setStoreImpl(impl);
-                    store.setUp();
-                }
+                if (!inIllustrator)
+                    for (POStore store: stores) {
+                        MapReducePOStoreImpl impl 
+                            = new MapReducePOStoreImpl(context);
+                        store.setStoreImpl(impl);
+                        store.setUp();
+                    }
             }
           
             // In the case we optimize the join, we combine
@@ -512,6 +534,127 @@ public class PigMapReduce {
             PhysicalOperator.setReporter(null);
             initialized = false;
         }
+        
+        /**
+         * Get reducer's illustrator context
+         * 
+         * @param input Input buffer as output by maps
+         * @param pkg package
+         * @return reducer's illustrator context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        public Context getIllustratorContext(Job job,
+               List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+            return new IllustratorContext(job, input, pkg);
+        }
+        
+        @SuppressWarnings("unchecked")
+        public class IllustratorContext extends Context {
+            private PigNullableWritable currentKey = null, nextKey = null;
+            private NullableTuple nextValue = null;
+            private List<NullableTuple> currentValues = null;
+            private Iterator<Pair<PigNullableWritable, Writable>> it;
+            private final ByteArrayOutputStream bos;
+            private final DataOutputStream dos;
+            private final RawComparator sortComparator, groupingComparator;
+            POPackage pack = null;
+
+            public IllustratorContext(Job job,
+                  List<Pair<PigNullableWritable, Writable>> input,
+                  POPackage pkg
+                  ) throws IOException, InterruptedException {
+                super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
+                    null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class);
+                bos = new ByteArrayOutputStream();
+                dos = new DataOutputStream(bos);
+                org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+                sortComparator = nwJob.getSortComparator();
+                groupingComparator = nwJob.getGroupingComparator();
+                
+                Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+                        @Override
+                        public int compare(Pair<PigNullableWritable, Writable> o1,
+                                           Pair<PigNullableWritable, Writable> o2) {
+                            try {
+                                o1.first.write(dos);
+                                int l1 = bos.size();
+                                o2.first.write(dos);
+                                int l2 = bos.size();
+                                byte[] bytes = bos.toByteArray();
+                                bos.reset();
+                                return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+                            } catch (IOException e) {
+                                throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+                            }
+                        }
+                    }
+                );
+                currentValues = new ArrayList<NullableTuple>();
+                it = input.iterator();
+                if (it.hasNext()) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    nextKey = entry.first;
+                    nextValue = (NullableTuple) entry.second;
+                }
+                pack = pkg;
+            }
+            
+            @Override
+            public PigNullableWritable getCurrentKey() {
+                return currentKey;
+            }
+            
+            @Override
+            public boolean nextKey() {
+                if (nextKey == null)
+                    return false;
+                currentKey = nextKey;
+                currentValues.clear();
+                currentValues.add(nextValue);
+                nextKey = null;
+                for(; it.hasNext(); ) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    /* Why can't raw comparison be used?
+                    byte[] bytes;
+                    int l1, l2;
+                    try {
+                        currentKey.write(dos);
+                        l1 = bos.size();
+                        entry.first.write(dos);
+                        l2 = bos.size();
+                        bytes = bos.toByteArray();
+                    } catch (IOException e) {
+                        throw new RuntimeException("nextKey exception : "+e.getMessage());
+                    }
+                    bos.reset();
+                    if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+                    */
+                    if (groupingComparator.compare(currentKey, entry.first) == 0)
+                    {
+                        currentValues.add((NullableTuple)entry.second);
+                    } else {
+                        nextKey = entry.first;
+                        nextValue = (NullableTuple) entry.second;
+                        break;
+                    }
+                }
+                return true;
+            }
+            
+            @Override
+            public Iterable<NullableTuple> getValues() {
+                return currentValues;
+            }
+            
+            @Override
+            public void write(PigNullableWritable k, Writable t) {
+            }
+            
+            @Override
+            public void progress() { 
+            }
+        }
     }
     
     /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Dec 13 19:11:00 2010
@@ -91,6 +91,10 @@ public class LogToPhyTranslationVisitor 
         return currentPlan;
     }
 
+    public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+        return logToPhyMap;
+    }
+    
     @Override
     protected void visit(LOGreaterThan op) throws VisitorException {
         String scope = op.getOperatorKey().scope;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Mon Dec 13 19:11:00 2010
@@ -35,6 +35,8 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.pen.Illustrator;
+import org.apache.pig.pen.Illustrable;
 
 /**
  * 
@@ -58,7 +60,7 @@ import org.apache.pig.pen.util.LineageTr
  * only those types that are supported.
  *
  */
-public abstract class PhysicalOperator extends Operator<PhyPlanVisitor> implements Cloneable {
+public abstract class PhysicalOperator extends Operator<PhyPlanVisitor> implements Illustrable, Cloneable {
 
     private Log log = LogFactory.getLog(getClass());
 
@@ -125,8 +127,14 @@ public abstract class PhysicalOperator e
 
     static final protected Map dummyMap = null;
     
+    // TODO: This is not needed. But a lot of tests check serialized physical plans
+    // that are sensitive to the serialized image of the contained physical operators.
+    // So for now, just keep it. Later it'll be cleansed along with those test golden
+    // files
     protected LineageTracer lineageTracer;
 
+    protected transient Illustrator illustrator = null;
+
     private boolean accum;
     private transient boolean accumStart;
 
@@ -149,8 +157,13 @@ public abstract class PhysicalOperator e
         res = new Result();
     }
 
-    public void setLineageTracer(LineageTracer lineage) {
-	this.lineageTracer = lineage;
+    @Override
+    public void setIllustrator(Illustrator illustrator) {
+	      this.illustrator = illustrator;
+    }
+    
+    public Illustrator getIllustrator() {
+        return illustrator;
     }
     
     public int getRequestedParallelism() {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryComparisonOperator.java Mon Dec 13 19:11:00 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
@@ -66,4 +67,11 @@ public abstract class BinaryComparisonOp
         operandType = op.operandType;
         super.cloneHelper(op);
     }
+    
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if (illustrator != null) {
+            illustrator.setSubExpResult(eqClassIndex == 0);
+        }
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/BinaryExpressionOperator.java Mon Dec 13 19:11:00 2010
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * A base class for all Binary expression operators.
@@ -84,4 +86,9 @@ public abstract class BinaryExpressionOp
         rhs = op.rhs;
         super.cloneHelper(op);
     }
-}
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
+    }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ConstantExpression.java Mon Dec 13 19:11:00 2010
@@ -204,4 +204,9 @@ public class ConstantExpression extends 
     public List<ExpressionOperator> getChildExpressions() {		
         return null;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/EqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -185,7 +185,7 @@ public class EqualToExpr extends BinaryC
         }else{
             throw new ExecException("The left side and right side has the different types");
         }
-        
+        illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
         return left;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ExpressionOperator.java Mon Dec 13 19:11:00 2010
@@ -35,6 +35,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.Illustrator;
 
 /**
  * A base class for all types of expressions. All expression
@@ -55,6 +56,11 @@ public abstract class ExpressionOperator
     }
     
     @Override
+    public void setIllustrator(Illustrator illustrator) {
+        this.illustrator = illustrator;
+    }
+    
+    @Override
     public boolean supportsMultipleOutputs() {
         return false;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GTOrEqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -154,6 +154,7 @@ public class GTOrEqualToExpr extends Bin
         } else {
             left.result = falseRef;
         }
+        illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
         return left;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/GreaterThanExpr.java Mon Dec 13 19:11:00 2010
@@ -17,15 +17,11 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -37,7 +33,6 @@ public class GreaterThanExpr extends Bin
      * 
      */
     private static final long serialVersionUID = 1L;
-    transient private final Log log = LogFactory.getLog(getClass());
 
     public GreaterThanExpr(OperatorKey k) {
         this(k, -1);
@@ -60,7 +55,6 @@ public class GreaterThanExpr extends Bin
 
     @Override
     public Result getNext(Boolean bool) throws ExecException {
-        byte status;
         Result left, right;
 
         switch (operandType) {
@@ -154,6 +148,7 @@ public class GreaterThanExpr extends Bin
         } else {
             left.result = falseRef;
         }
+        illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
         return left;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LTOrEqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -154,6 +154,7 @@ public class LTOrEqualToExpr extends Bin
         } else {
             left.result = falseRef;
         }
+        illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
         return left;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/LessThanExpr.java Mon Dec 13 19:11:00 2010
@@ -154,6 +154,7 @@ public class LessThanExpr extends Binary
         } else {
             left.result = falseRef;
         }
+        illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
         return left;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/NotEqualToExpr.java Mon Dec 13 19:11:00 2010
@@ -184,6 +184,7 @@ public class NotEqualToExpr extends Bina
         }else{
             throw new ExecException("The left side and right side has the different types");
         }
+        illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
         return left;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POAnd.java Mon Dec 13 19:11:00 2010
@@ -78,9 +78,20 @@ public class POAnd extends BinaryCompari
         // 3) f    f f f
         
         // Short circuit - if lhs is false, return false; ROW 3 above is handled with this
-        if (left.result != null && !(((Boolean)left.result).booleanValue())) return left;
+        boolean returnLeft = false;
+        if (left.result != null && !(((Boolean)left.result).booleanValue())) {
+          if (illustrator == null) {
+              return left;
+          }
+          illustratorMarkup(null, left.result, 1);
+          returnLeft = true;
+        }
         
         Result right = rhs.getNext(dummyBool);
+        if (returnLeft) {
+            return left;
+        }
+        
         // pass on ERROR and EOP 
         if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
             return right;
@@ -94,6 +105,8 @@ public class POAnd extends BinaryCompari
         
         // No matter what, what we get from the right side is what we'll
         // return, null, true, or false.
+        if (right.result != null)
+            illustratorMarkup(null, right.result, (Boolean) right.result ? 0 : 1);
         return right;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POBinCond.java Mon Dec 13 19:11:00 2010
@@ -32,6 +32,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 public class POBinCond extends ExpressionOperator {
     
@@ -65,7 +66,9 @@ public class POBinCond extends Expressio
         
         Result res = cond.getNext(b);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(b) : rhs.getNext(b);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
         
     }
 
@@ -88,7 +91,9 @@ public class POBinCond extends Expressio
                         
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(db) : rhs.getNext(db);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -109,7 +114,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(ba) : rhs.getNext(ba);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -130,7 +137,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(d) : rhs.getNext(d);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -151,7 +160,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(f) : rhs.getNext(f);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -171,7 +182,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(i) : rhs.getNext(i);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -192,7 +205,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(l) : rhs.getNext(l);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -213,7 +228,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(m) : rhs.getNext(m);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -234,7 +251,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(s) : rhs.getNext(s);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -255,7 +274,9 @@ public class POBinCond extends Expressio
         }
         Result res = cond.getNext(dummyBool);
         if (res.result==null || res.returnStatus != POStatus.STATUS_OK) return res;
-        return ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t);
+        Result result = ((Boolean)res.result) == true ? lhs.getNext(t) : rhs.getNext(t);
+        illustratorMarkup(null, result.result, ((Boolean)res.result) ? 0 : 1);
+        return result;
     }
 
     @Override
@@ -338,4 +359,11 @@ public class POBinCond extends Expressio
         return child;
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            
+        }
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Mon Dec 13 19:11:00 2010
@@ -1327,4 +1327,8 @@ public class POCast extends ExpressionOp
         return funcSpec;
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+      return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POIsNull.java Mon Dec 13 19:11:00 2010
@@ -76,6 +76,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.DOUBLE:
@@ -86,6 +87,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.INTEGER:
@@ -96,6 +98,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.CHARARRAY:
@@ -106,6 +109,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.BOOLEAN:
@@ -116,6 +120,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.LONG:
@@ -126,6 +131,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.FLOAT:
@@ -136,6 +142,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.MAP:
@@ -146,6 +153,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.TUPLE:
@@ -156,6 +164,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;
         case DataType.BAG:
@@ -166,6 +175,7 @@ public class POIsNull extends UnaryCompa
                 } else {
                     res.result = false;
                 }
+                illustratorMarkup(null, res.result, (Boolean) res.result ? 0 : 1);
             }
             return res;        
         default: {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POMapLookUp.java Mon Dec 13 19:11:00 2010
@@ -165,6 +165,8 @@ public class POMapLookUp extends Express
         return null;
     }
     
-    
-
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONegative.java Mon Dec 13 19:11:00 2010
@@ -22,6 +22,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
@@ -101,4 +102,9 @@ public class PONegative extends UnaryExp
         clone.cloneHelper(this);
         return clone;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return (Tuple) out;
+    }
 }