You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC

svn commit: r1045314 [4/5] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...

Added: pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java (added)
+++ pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * The class used to (re)attach illustrators to physical operators
+ * 
+ *
+ */
+public class IllustratorAttacher extends PhyPlanVisitor {
+
+    PigContext pigContext;
+
+    LineageTracer lineage;
+
+    HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap;
+    
+    private HashMap<PhysicalOperator, DataBag> poToDataMap;
+    private int maxRecords;
+    private boolean revisit = false;
+    private ArrayList<Boolean[]> subExpResults = null;
+    private final Map<POLoad, Schema> poloadToSchemaMap;
+    
+    public IllustratorAttacher(PhysicalPlan plan, LineageTracer lineage, int maxRecords,
+        Map<POLoad, Schema> poLoadToSchemaMap, PigContext hadoopPigContext) throws VisitorException {
+        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        pigContext = hadoopPigContext;
+        this.lineage = lineage;
+        poToEqclassesMap = new HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>>();
+        poToDataMap = new HashMap<PhysicalOperator, DataBag>();
+        this.maxRecords = maxRecords;
+        this.poloadToSchemaMap = poLoadToSchemaMap;
+    }
+
+    /**
+     * revisit an enhanced physical plan from MR compilation
+     * @param plan a physical plan to be traversed
+     */
+    public void revisit(PhysicalPlan plan) throws VisitorException {
+        pushWalker(new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        revisit = true;
+        PhysicalPlan oriPlan = mPlan;
+        mPlan = plan;
+        visit();
+        mPlan = oriPlan;
+        popWalker();
+    }
+    
+    private void setIllustrator(PhysicalOperator po, int nEqClasses) {
+        if (revisit && po.getIllustrator() != null)
+            return;
+        LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+        poToEqclassesMap.put(po, eqClasses);
+        for (int i = 0; i < nEqClasses; ++i)
+        {
+            IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+            eqClasses.add(eqClass);
+        }
+        Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext);
+        po.setIllustrator(illustrator);
+        poToDataMap.put(po, illustrator.getData());
+    }
+    
+    private void setIllustrator(PhysicalOperator po, LinkedList<IdentityHashSet<Tuple>> eqClasses) {
+        if (revisit && po.getIllustrator() != null)
+            return;
+        Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext);
+        po.setIllustrator(illustrator);
+        if (eqClasses != null)
+            poToEqclassesMap.put(po, eqClasses);
+        poToDataMap.put(po, illustrator.getData());
+    }
+
+    void setIllustrator(PhysicalOperator po) {
+        if (revisit && po.getIllustrator() != null)
+            return;
+        LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+        IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+        eqClasses.add(eqClass);
+        Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext);
+        po.setIllustrator(illustrator);
+        poToEqclassesMap.put(po, eqClasses);
+        poToDataMap.put(po, illustrator.getData());
+    }
+    
+    public Map<PhysicalOperator, DataBag> getDataMap() {
+        return poToDataMap;
+    }
+    
+    @Override
+    public void visitLoad(POLoad ld) throws VisitorException{
+        // LOAD from temporary files need no illustrator 
+        if (revisit)
+            return;
+        
+        LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+        poToEqclassesMap.put(ld, eqClasses);
+        
+        IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+        eqClasses.add(eqClass);
+        Illustrator illustrator;
+        illustrator = new Illustrator(lineage, eqClasses, maxRecords, this, poloadToSchemaMap.get(ld), pigContext);
+        ld.setIllustrator(illustrator);
+        poToDataMap.put(ld, illustrator.getData());
+    }
+    
+    @Override
+    public void visitStore(POStore st) throws VisitorException{
+        setIllustrator(st, 1);
+    }
+    
+    @Override
+    public void visitFilter(POFilter fl) throws VisitorException{
+        setIllustrator(fl, 0);
+        subExpResults = fl.getIllustrator().getSubExpResults();
+        innerPlanAttach(fl, fl.getPlan());
+        subExpResults = null;
+    }
+ 
+    @Override
+    public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
+        setIllustrator(mg, 1);
+    }
+    
+    @Override
+    public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
+      super.visitLocalRearrange(lr);
+      setIllustrator(lr);
+    }
+    
+    @Override
+    public void visitPackage(POPackage pkg) throws VisitorException{
+        if (!(pkg instanceof POPackageLite) && pkg.isDistinct())
+            setIllustrator(pkg, 1);
+        else
+            setIllustrator(pkg, null);
+    }
+    
+    @Override
+    public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
+        setIllustrator(pkg);
+    }
+ 
+    @Override
+    public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
+      setIllustrator(pkg);
+    }
+    
+    @Override
+    public void visitPOForEach(POForEach nfe) throws VisitorException {
+        if (revisit && nfe.getIllustrator() != null)
+            return;
+        List<PhysicalPlan> innerPlans = nfe.getInputPlans();
+        for (PhysicalPlan innerPlan : innerPlans)
+          innerPlanAttach(nfe, innerPlan);
+        List<PhysicalOperator> preds = mPlan.getPredecessors(nfe);
+        if (preds != null && preds.size() == 1 &&
+            preds.get(0) instanceof POPackage &&
+            !(preds.get(0) instanceof POPackageLite) &&
+            ((POPackage) preds.get(0)).isDistinct()) {
+            // equivalence class of POPackage for DISTINCT needs to be used
+            //instead of the succeeding POForEach's equivalence class
+            setIllustrator(nfe, preds.get(0).getIllustrator().getEquivalenceClasses());
+            nfe.getIllustrator().setEqClassesShared();
+        } else
+            setIllustrator(nfe, 1);
+    }
+    
+    @Override
+    public void visitUnion(POUnion un) throws VisitorException{
+        if (revisit && un.getIllustrator() != null)
+          return;
+        setIllustrator(un, null);
+    }
+    
+    @Override
+    public void visitSplit(POSplit spl) throws VisitorException{
+        if (revisit && spl.getIllustrator() != null)
+            return;
+        for (PhysicalPlan poPlan : spl.getPlans())
+          innerPlanAttach(spl, poPlan);
+        setIllustrator(spl);
+    }
+
+    @Override
+    public void visitDemux(PODemux demux) throws VisitorException{
+      if (revisit && demux.getIllustrator() != null)
+          return;
+      List<PhysicalPlan> innerPlans = demux.getPlans();
+      for (PhysicalPlan innerPlan : innerPlans)
+        innerPlanAttach(demux, innerPlan);
+      setIllustrator(demux);
+    }
+    
+    @Override
+	public void visitDistinct(PODistinct distinct) throws VisitorException {
+        setIllustrator(distinct, 1);
+	}
+    
+    @Override
+	public void visitSort(POSort sort) throws VisitorException {
+        setIllustrator(sort, 1);
+	}
+    
+    @Override
+    public void visitProject(POProject proj) throws VisitorException{
+    }
+    
+    @Override
+    public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
+        setIllustrator(grt, 0);
+        if (!revisit)
+            subExpResults.add(grt.getIllustrator().getSubExpResult());
+    }
+    
+    @Override
+    public void visitLessThan(LessThanExpr lt) throws VisitorException{
+        setIllustrator(lt, 0);
+        if (!revisit)
+            subExpResults.add(lt.getIllustrator().getSubExpResult());
+    }
+    
+    @Override
+    public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
+        setIllustrator(gte, 0);
+        if (!revisit)
+            subExpResults.add(gte.getIllustrator().getSubExpResult());
+    }
+    
+    @Override
+    public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
+        setIllustrator(lte, 0);
+        if (!revisit)
+            subExpResults.add(lte.getIllustrator().getSubExpResult());
+    }
+    
+    @Override
+    public void visitEqualTo(EqualToExpr eq) throws VisitorException{
+        setIllustrator(eq, 0);
+        if (!revisit)
+            subExpResults.add(eq.getIllustrator().getSubExpResult());
+    }
+    
+    @Override
+    public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
+        setIllustrator(eq, 0);
+        if (!revisit)
+            subExpResults.add(eq.getIllustrator().getSubExpResult());
+    }
+    
+    @Override
+    public void visitRegexp(PORegexp re) throws VisitorException{
+        setIllustrator(re, 0);
+        if (!revisit)
+            subExpResults.add(re.getIllustrator().getSubExpResult());
+    }
+
+    @Override
+    public void visitIsNull(POIsNull isNull) throws VisitorException {
+        setIllustrator(isNull, 0);
+        if (!revisit)
+            subExpResults.add(isNull.getIllustrator().getSubExpResult());
+    }
+    
+    @Override
+    public void visitAnd(POAnd and) throws VisitorException {
+        setIllustrator(and, 0);
+    }
+    
+    @Override
+    public void visitOr(POOr or) throws VisitorException {
+        setIllustrator(or, 0);
+    }
+
+    @Override
+    public void visitNot(PONot not) throws VisitorException {
+        setIllustrator(not, 0);
+        if (!revisit)
+            subExpResults.add(not.getIllustrator().getSubExpResult());
+    }
+
+    @Override
+    public void visitBinCond(POBinCond binCond) {
+        setIllustrator(binCond, 0);
+        if (!revisit)
+            subExpResults.add(binCond.getIllustrator().getSubExpResult());
+    }
+
+    @Override
+    public void visitNegative(PONegative negative) {
+      setIllustrator(negative, 1);
+    }
+    
+    @Override
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+    }
+    
+    @Override
+    public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
+        // one each for >, ==, and <
+        setIllustrator(compFunc, 3);
+    }
+
+    @Override
+    public void visitMapLookUp(POMapLookUp mapLookUp) {
+      setIllustrator(mapLookUp, 1);
+    }
+    
+    @Override
+    public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
+        if (revisit &&  joinPackage.getIllustrator() != null)
+            return;
+        setIllustrator(joinPackage);
+        joinPackage.getForEach().setIllustrator(joinPackage.getIllustrator());
+    }
+
+    @Override
+    public void visitCast(POCast cast) {
+    }
+    
+    @Override
+    public void visitLimit(POLimit lim) throws VisitorException {
+        setIllustrator(lim, 1);
+    }
+    
+    @Override
+    public void visitFRJoin(POFRJoin join) throws VisitorException {
+        // one eq. class per input
+        setIllustrator(join, join.getInputs().size());
+    }
+    
+    @Override
+    public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+        // one eq. class per input
+        setIllustrator(join, join.getInputs().size());
+    }
+    
+    @Override
+    public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
+        // one eq. class per input
+        setIllustrator(mergeCoGrp, mergeCoGrp.getInputs().size());
+    }
+
+    @Override
+    public void visitStream(POStream stream) throws VisitorException {
+        setIllustrator(stream, 1);
+    }
+
+    @Override
+	public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+        // one eq. class per input
+        // do not go to inner plans as they are not booleans so no use of eq. classes
+        setIllustrator(sk, sk.getInputs().size());
+	}
+
+    @Override
+	public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+	}
+
+    /**
+     * @param optimizedForEach
+     */
+    @Override
+    public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
+        visitPOForEach(optimizedForEach);
+    }
+
+    /**
+     * @param preCombinerLocalRearrange
+     */
+    @Override
+    public void visitPreCombinerLocalRearrange(
+            POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+    }
+    
+    private void innerPlanAttach(PhysicalOperator po, PhysicalPlan plan) throws VisitorException {
+        PlanWalker<PhysicalOperator, PhysicalPlan> childWalker =
+              mCurrentWalker.spawnChildWalker(plan);
+        pushWalker(childWalker);
+        childWalker.walk(this);
+        popWalker();
+        LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+        if (subExpResults != null && !revisit) {
+            int size = 1 << subExpResults.size();
+            for (int i = 0; i < size; ++i) {
+                eqClasses.add(new IdentityHashSet<Tuple>());
+            }
+            po.getIllustrator().setEquivalenceClasses(eqClasses, po);
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@
 package org.apache.pig.pen;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -27,10 +28,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Comparator;
+import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.BagFactory;
@@ -38,6 +40,7 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOJoin;
 import org.apache.pig.impl.logicalLayer.LOCross;
 import org.apache.pig.impl.logicalLayer.LODistinct;
 import org.apache.pig.impl.logicalLayer.LOFilter;
@@ -47,34 +50,41 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOSort;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.pen.util.LineageTracer;
 import org.apache.pig.pen.util.MetricEvaluation;
 import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
+import org.apache.pig.pen.util.ExampleTuple;
 
 public class LineageTrimmingVisitor extends LOVisitor {
 
     LogicalPlan plan = null;
-    Map<LOLoad, DataBag> baseData = new HashMap<LOLoad, DataBag>();
+    Map<LOLoad, DataBag> baseData;
+    Map<FileSpec, DataBag> inputToDataMap;
     Map<LogicalOperator, PhysicalOperator> LogToPhyMap = null;
     PhysicalPlan physPlan = null;
     double completeness = 100.0;
     Log log = LogFactory.getLog(getClass());
 
-    Map<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>> AffinityGroups = new HashMap<LogicalOperator, Map<IdentityHashSet<Tuple>, Integer>>();
+    Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> AffinityGroups = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
     Map<LogicalOperator, LineageTracer> Lineage = new HashMap<LogicalOperator, LineageTracer>();
 
     boolean continueTrimming;
     PigContext pc;
+    private ExampleGenerator eg;
 
     public LineageTrimmingVisitor(LogicalPlan plan,
             Map<LOLoad, DataBag> baseData,
+            ExampleGenerator eg,
             Map<LogicalOperator, PhysicalOperator> LogToPhyMap,
-            PhysicalPlan physPlan, PigContext pc) {
+            PhysicalPlan physPlan, PigContext pc) throws IOException, InterruptedException {
         super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
                 plan));
         // this.baseData.putAll(baseData);
@@ -83,149 +93,66 @@ public class LineageTrimmingVisitor exte
         this.LogToPhyMap = LogToPhyMap;
         this.pc = pc;
         this.physPlan = physPlan;
+        this.eg = eg;
+        this.inputToDataMap = new HashMap<FileSpec, DataBag>();
         init();
     }
 
-    public void init() {
+    public void init() throws IOException, InterruptedException {
 
-        DerivedDataVisitor visitor = new DerivedDataVisitor(plan, pc, baseData,
-                LogToPhyMap, physPlan);
-        try {
-            visitor.visit();
-        } catch (VisitorException e) {
-            log.error(e.getMessage());
-        }
+        Map<LogicalOperator, DataBag> data = eg.getData();
 
-        LineageTracer lineage = visitor.lineage;
-        Lineage.put(plan.getLeaves().get(0), lineage);
-        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = visitor.OpToEqClasses;
-        Collection<IdentityHashSet<Tuple>> EqClasses = visitor.EqClasses;
-        Map<IdentityHashSet<Tuple>, Integer> affinityGroup = new HashMap<IdentityHashSet<Tuple>, Integer>();
-        for (IdentityHashSet<Tuple> set : EqClasses) {
-            affinityGroup.put(set, 1);
+        LineageTracer lineage = eg.getLineage();
+        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = eg.getLoToEqClassMap();
+        for (LogicalOperator leaf : plan.getLeaves()) {
+            Lineage.put(leaf, lineage);
+            AffinityGroups.put(leaf, eg.getEqClasses());
         }
-        AffinityGroups.put(plan.getLeaves().get(0), affinityGroup);
         completeness = MetricEvaluation.getCompleteness(null,
-                visitor.derivedData, OpToEqClasses, true);
-        LogToPhyMap = visitor.LogToPhyMap;
+                data, OpToEqClasses, true);
+        LogToPhyMap = eg.getLogToPhyMap();
         continueTrimming = true;
 
     }
 
     @Override
     protected void visit(LOCogroup cg) throws VisitorException {
+        // can't separate CoGroup from succeeding ForEach
+        if (plan.getSuccessors(cg) != null && plan.getSuccessors(cg).get(0) instanceof LOForEach)
+            return;
+        
         if (continueTrimming) {
-            Map<IdentityHashSet<Tuple>, Integer> affinityGroups = null;
-
-            continueTrimming = checkCompleteness(cg);
-            
-            DerivedDataVisitor visitor = null;
-            LineageTracer lineage = null;
-            // create affinity groups
-            if (cg.getInputs().size() == 1) {
-                affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
-                LogicalOperator childOp = cg.getInputs().get(0);
-                visitor = new DerivedDataVisitor(childOp, null, baseData,
-                        LogToPhyMap, physPlan);
-                try {
-                    visitor.visit();
-                } catch (VisitorException e) {
-                    log.error(e.getMessage());
-                }
-
-                lineage = visitor.lineage;
-
-                DataBag bag = visitor.evaluateIsolatedOperator(cg);
-                for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
-                    DataBag field;
-                    try {
-                        field = (DataBag) it.next().get(1);
-                    } catch (ExecException e) {
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
-                        log.error(e.getMessage());
-                        throw new VisitorException(
-                                "Error trimming operator COGROUP operator "
-                                        + cg.getAlias()
-                                        + "in example generator");
-                    }
-                    IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
-                    affinityGroups.put(set, 2);
-                    for (Iterator<Tuple> it1 = field.iterator(); it1.hasNext();) {
-                        set.add(it1.next());
-                    }
-                }
-
-                // add the equivalence classes obtained from derived data
-                // creation
-                for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
-                    affinityGroups.put(set, 1);
-                }
-                AffinityGroups.put(cg.getInputs().get(0), affinityGroups);
-                Lineage.put(cg.getInputs().get(0), lineage);
+            try {
 
-            } else {
-                List<DataBag> inputs = new LinkedList<DataBag>();
-                visitor = new DerivedDataVisitor(cg, null, baseData,
-                        LogToPhyMap, physPlan);
-                affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
-                for (int i = 0; i < cg.getInputs().size(); i++) {
-                    // affinityGroups = new HashMap<IdentityHashSet<Tuple>,
-                    // Integer>();
-                    LogicalOperator childOp = cg.getInputs().get(i);
-                    // visitor = new DerivedDataVisitor(cg.getInputs().get(i),
-                    // null, baseData, LogToPhyMap, physPlan);
-                    visitor.setOperatorToEvaluate(childOp);
-                    try {
-                        visitor.visit();
-                    } catch (VisitorException e) {
-                        log.error(e.getMessage());
+                continueTrimming = checkCompleteness(cg);
+                
+                LineageTracer lineage = null;
+                // create affinity groups
+                if (cg.getInputs().size() == 1) {
+                    lineage = eg.getLineage();
+                    AffinityGroups.put(cg.getInputs().get(0), eg.getEqClasses());
+                    Lineage.put(cg.getInputs().get(0), lineage);
+
+                } else {
+                    for (LogicalOperator input : cg.getInputs()) {
+                        Lineage.put(input, eg.getLineage());
+                        AffinityGroups.put(input, eg.getEqClasses());
                     }
-                    // Lineage.put(childOp, visitor.lineage);
-                    inputs.add(visitor.derivedData.get(childOp));
-
-                    for (IdentityHashSet<Tuple> set : visitor.EqClasses)
-                        affinityGroups.put(set, 1);
-
-                    // AffinityGroups.put(cg.getInputs().get(i),
-                    // affinityGroups);
                 }
-                for (LogicalOperator input : cg.getInputs()) {
-                    Lineage.put(input, visitor.lineage);
-                    AffinityGroups.put(input, affinityGroups);
-                }
-
-                visitor = new DerivedDataVisitor(cg, null, baseData,
-                        LogToPhyMap, physPlan);
-                DataBag output = visitor.evaluateIsolatedOperator(cg, inputs);
-
-                for (int i = 1; i <= cg.getInputs().size(); i++) {
-                    affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
-                    for (Iterator<Tuple> it = output.iterator(); it.hasNext();) {
-                        DataBag bag = null;
-                        try {
-                            bag = (DataBag) it.next().get(i);
-                        } catch (ExecException e) {
-                            // TODO Auto-generated catch block
-                            log.error(e.getMessage());
-                        }
-                        IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
-                        affinityGroups.put(set, 1);
-                        for (Iterator<Tuple> it1 = bag.iterator(); it1
-                                .hasNext();) {
-                            set.add(it1.next());
-                        }
-                    }
-                    AffinityGroups.get(cg.getInputs().get(i - 1)).putAll(
-                            affinityGroups);
-
-                }
-                AffinityGroups = AffinityGroups;
+            } catch (Exception e) {
+                throw new VisitorException("Exception : "+e.getMessage());
             }
         }
     }
 
     @Override
+    protected void visit(LOJoin join) throws VisitorException {
+        if (continueTrimming) {
+          processOperator(join);
+        }
+    }
+
+    @Override
     protected void visit(LOCross cs) throws VisitorException {
         if(continueTrimming)
             processOperator(cs);
@@ -244,6 +171,12 @@ public class LineageTrimmingVisitor exte
         if (continueTrimming)
             processOperator(filter);
     }
+    
+    @Override
+    protected void visit(LOStore store) throws VisitorException {
+        if (continueTrimming)
+            processOperator(store);
+    }
 
     @Override
     protected void visit(LOForEach forEach) throws VisitorException {
@@ -286,9 +219,9 @@ public class LineageTrimmingVisitor exte
     }
 
     private Map<LOLoad, DataBag> PruneBaseDataConstrainedCoverage(
-            Map<LOLoad, DataBag> baseData, DataBag rootOutput,
+            Map<LOLoad, DataBag> baseData,
             LineageTracer lineage,
-            Map<IdentityHashSet<Tuple>, Integer> equivalenceClasses) {
+            Collection<IdentityHashSet<Tuple>> equivalenceClasses) {
 
         IdentityHashMap<Tuple, Collection<Tuple>> membershipMap = lineage
                 .getMembershipMap();
@@ -300,10 +233,9 @@ public class LineageTrimmingVisitor exte
         // IdentityHashMap<Tuple, Set<Integer>> lineageGroupToEquivClasses = new
         // IdentityHashMap<Tuple, Set<Integer>>();
         IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>> lineageGroupToEquivClasses = new IdentityHashMap<Tuple, Set<IdentityHashSet<Tuple>>>();
-        int equivClassId = 0;
-        for (IdentityHashSet<Tuple> equivClass : equivalenceClasses.keySet()) {
-            for (Tuple t : equivClass) {
-                Tuple lineageGroup = lineage.getRepresentative(t);
+        for (IdentityHashSet<Tuple> equivClass : equivalenceClasses) {
+            for (Object t : equivClass) {
+                Tuple lineageGroup = lineage.getRepresentative((Tuple) t);
                 // Set<Integer> entry =
                 // lineageGroupToEquivClasses.get(lineageGroup);
                 Set<IdentityHashSet<Tuple>> entry = lineageGroupToEquivClasses
@@ -316,8 +248,6 @@ public class LineageTrimmingVisitor exte
                 // entry.add(equivClassId);
                 entry.add(equivClass);
             }
-
-            equivClassId++;
         }
 
         // select lineage groups such that we cover all equivalence classes
@@ -325,18 +255,19 @@ public class LineageTrimmingVisitor exte
         while (!lineageGroupToEquivClasses.isEmpty()) {
             // greedily find the lineage group with the best "score", where
             // score = # equiv classes covered / group weight
-            double bestScore = -1;
+            double bestWeight = -1;
             Tuple bestLineageGroup = null;
             Set<IdentityHashSet<Tuple>> bestEquivClassesCovered = null;
+            int bestNumEquivClassesCovered = 0;
             for (Tuple lineageGroup : lineageGroupToEquivClasses.keySet()) {
                 double weight = lineageGroupWeights.get(lineageGroup);
 
                 Set<IdentityHashSet<Tuple>> equivClassesCovered = lineageGroupToEquivClasses
                         .get(lineageGroup);
                 int numEquivClassesCovered = equivClassesCovered.size();
-                double score = ((double) numEquivClassesCovered) / weight;
 
-                if (score > bestScore) {
+                if ((numEquivClassesCovered > bestNumEquivClassesCovered) ||
+                    (numEquivClassesCovered == bestNumEquivClassesCovered && weight < bestWeight)) {
 
                     if (selectedLineageGroups.contains(lineageGroup)) {
                         bestLineageGroup = lineageGroup;
@@ -344,8 +275,9 @@ public class LineageTrimmingVisitor exte
                         continue;
                     }
 
-                    bestScore = score;
+                    bestWeight = weight;
                     bestLineageGroup = lineageGroup;
+                    bestNumEquivClassesCovered = numEquivClassesCovered;
                     bestEquivClassesCovered = equivClassesCovered;
                 }
             }
@@ -371,10 +303,7 @@ public class LineageTrimmingVisitor exte
                         .iterator(); it.hasNext();) {
                     IdentityHashSet<Tuple> equivClass = it.next();
                     if (bestEquivClassesCovered.contains(equivClass)) {
-                        if ((equivalenceClasses.get(equivClass) - 1) <= 0) {
-                            // equivClasses.remove(equivClass);
-                            it.remove();
-                        }
+                        it.remove();
                     }
                 }
                 if (equivClasses.size() == 0)
@@ -383,11 +312,6 @@ public class LineageTrimmingVisitor exte
             }
             for (Tuple removeMe : toRemove)
                 lineageGroupToEquivClasses.remove(removeMe);
-
-            for (IdentityHashSet<Tuple> equivClass : bestEquivClassesCovered) {
-                equivalenceClasses.put(equivClass, equivalenceClasses
-                        .get(equivClass) - 1);
-            }
         }
 
         // revise baseData to only contain the tuples that are part of
@@ -414,65 +338,150 @@ public class LineageTrimmingVisitor exte
         return newBaseData;
     }
 
-    private void processOperator(LogicalOperator op) {
-        if (op instanceof LOLoad) return;
+    private void processLoad(LOLoad ld) throws VisitorException {
+        // prune base records
+        if (inputToDataMap.get(ld.getInputFile()) != null) {
+            baseData.put(ld, inputToDataMap.get(ld.getInputFile()));
+            return;
+        }
         
-        continueTrimming = checkCompleteness(op);
-
-        if (continueTrimming == false)
+        DataBag data = baseData.get(ld);
+        if (data == null || data.size() < 2)
             return;
+        Set<Tuple> realData = new HashSet<Tuple>(), syntheticData = new HashSet<Tuple>();
 
-        LogicalOperator childOp = plan.getPredecessors(op).get(0);
-
-        DerivedDataVisitor visitor = new DerivedDataVisitor(childOp, null,
-                baseData, LogToPhyMap, physPlan);
-        try {
-            visitor.visit();
-        } catch (VisitorException e) {
-            log.error(e.getMessage());
+        for (Iterator<Tuple> it = data.iterator(); it.hasNext(); ) {
+            Tuple t = it.next();
+            if (((ExampleTuple)t).synthetic)
+                syntheticData.add(t);
+            else
+              realData.add(t);
         }
+        
+        Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
+        DataBag newData = BagFactory.getInstance().newDefaultBag();
+        newBaseData.put(ld, newData);
+        for (Map.Entry<LOLoad, DataBag> entry : baseData.entrySet()) {
+            if (entry.getKey() != ld) {
+                if (!entry.getKey().getInputFile().equals(ld.getInputFile()))
+                    newBaseData.put(entry.getKey(), entry.getValue());
+                else
+                    newBaseData.put(entry.getKey(), newData);
+            }
+        }
+        
+        if (checkNewBaseData(newData, newBaseData, realData))
+            checkNewBaseData(newData, newBaseData, syntheticData);
+        
+        inputToDataMap.put(ld.getInputFile(), baseData.get(ld));
+    }
+    
+    private boolean checkNewBaseData(DataBag data, Map<LOLoad, DataBag> newBaseData, Set<Tuple> loadData) throws VisitorException {
+        List<Pair<Tuple, Double>> sortedBase = new LinkedList<Pair<Tuple, Double>>();
+        DataBag oldData = BagFactory.getInstance().newDefaultBag();
+        oldData.addAll(data);
+        double tmpCompleteness = completeness;
+        for (Tuple t : loadData) {
+            data.add(t);
+            // obtain the derived data 
+            Map<LogicalOperator, DataBag> derivedData;
+            try {
+                derivedData = eg.getData(newBaseData);
+            } catch (Exception e) {
+                throw new VisitorException("Exception: "+e.getMessage());
+            }
+            double newCompleteness = MetricEvaluation.getCompleteness(null,
+                    derivedData, eg.getLoToEqClassMap(), true);
 
-        DataBag bag = visitor.derivedData.get(childOp);
-        Map<IdentityHashSet<Tuple>, Integer> affinityGroups = new HashMap<IdentityHashSet<Tuple>, Integer>();
-
-        for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
-            IdentityHashSet<Tuple> set = new IdentityHashSet<Tuple>();
-            affinityGroups.put(set, 1);
-            set.add(it.next());
+            sortedBase.add(new Pair<Tuple, Double>(t, Double.valueOf(newCompleteness)));
+            if (newCompleteness >= tmpCompleteness)
+                break;
         }
+        
+        Collections.sort(sortedBase, new Comparator<Pair<Tuple, Double>>() {
+            @Override
+            public int compare(Pair<Tuple, Double> o1,
+                               Pair<Tuple, Double> o2) {
+                return o1.second > o2.second ? -1 : o1.second == o2.second ? 0 : 1;
+            }
+        }
+        );
 
-        for (IdentityHashSet<Tuple> set : visitor.EqClasses) {
-            // newEquivalenceClasses.put(set, 1);
-            affinityGroups.put(set, 1);
+        data.clear();
+        data.addAll(oldData);
+        for (Pair<Tuple, Double> p : sortedBase) {
+            data.add(p.first);
+            // obtain the derived data 
+            Map<LogicalOperator, DataBag> derivedData;
+            try {
+                derivedData = eg.getData(newBaseData);
+            } catch (Exception e) {
+                throw new VisitorException("Exception: "+e.getMessage());
+            }
+            double newCompleteness = MetricEvaluation.getCompleteness(null,
+                    derivedData, eg.getLoToEqClassMap(), true);
+
+            if (newCompleteness >= completeness) {
+                completeness = newCompleteness;
+                baseData.putAll(newBaseData);
+                return false;
+            }
         }
+        return true;
+    }
+    
+    private void processOperator(LogicalOperator op) throws VisitorException {
+        
+        try {
+            if (op instanceof LOLoad) {
+                processLoad((LOLoad) op);
+                return;
+            }
+            
+            continueTrimming = checkCompleteness(op);
 
-        AffinityGroups.put(childOp, affinityGroups);
-        Lineage.put(childOp, visitor.lineage);
+            if (plan.getPredecessors(op) == null)
+                return;
+            
+            if (continueTrimming == false)
+                return;
 
+            LogicalOperator childOp = plan.getPredecessors(op).get(0);
+            if (op instanceof LOForEach && childOp instanceof LOCogroup)
+            {
+                LOCogroup cg = (LOCogroup) childOp;
+                for (LogicalOperator input : cg.getInputs()) {
+                    AffinityGroups.put(input, eg.getEqClasses());
+                    Lineage.put(input, eg.getLineage());
+                }
+            } else {
+                List<LogicalOperator> childOps = plan.getPredecessors(op);
+                for (LogicalOperator lo : childOps) {
+                    AffinityGroups.put(lo, eg.getEqClasses());
+                    Lineage.put(lo, eg.getLineage());
+                }
+            }
+        } catch (Exception e) {
+          e.printStackTrace(System.out);
+          throw new VisitorException("Exception: "+e.getMessage());
+        }
     }
 
-    private boolean checkCompleteness(LogicalOperator op) {
+    private boolean checkCompleteness(LogicalOperator op) throws Exception, VisitorException {
         LineageTracer lineage = Lineage.get(op);
         Lineage.remove(op);
 
-        Map<IdentityHashSet<Tuple>, Integer> affinityGroups = AffinityGroups
+        Collection<IdentityHashSet<Tuple>> affinityGroups = AffinityGroups
                 .get(op);
         AffinityGroups.remove(op);
 
         Map<LOLoad, DataBag> newBaseData = PruneBaseDataConstrainedCoverage(
-                baseData, null, lineage, affinityGroups);
+                baseData, lineage, affinityGroups);
 
         // obtain the derived data
-        DerivedDataVisitor visitor = new DerivedDataVisitor(plan, null,
-                newBaseData, LogToPhyMap, physPlan);
-        try {
-            visitor.visit();
-        } catch (VisitorException e) {
-            log.error(e.getMessage());
-        }
-
+        Map<LogicalOperator, DataBag> derivedData = eg.getData(newBaseData);
         double newCompleteness = MetricEvaluation.getCompleteness(null,
-                visitor.derivedData, visitor.OpToEqClasses, true);
+                derivedData, eg.getLoToEqClassMap(), true);
 
         if (newCompleteness >= completeness) {
             completeness = newCompleteness;

Added: pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java (added)
+++ pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ConfigurationValidator;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.builtin.ReadScalars;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.PigException;
+
+
+
+/**
+ * Main class that launches pig for Map Reduce
+ *
+ */
+public class LocalMapReduceSimulator {
+    
+    private MapReduceLauncher launcher = new MapReduceLauncher();
+    
+    private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
+
+    @SuppressWarnings("unchecked")
+    public void launchPig(PhysicalPlan php, Map<LOLoad, DataBag> baseData,
+                              Map<PhysicalOperator, LogicalOperator> poLoadToLogMap,
+                              LineageTracer lineage,
+                              IllustratorAttacher attacher,
+                              ExampleGenerator eg,
+                              PigContext pc) throws PigException, IOException, InterruptedException {
+        phyToMRMap.clear();
+        MROperPlan mrp = launcher.compile(php, pc);
+                
+        HExecutionEngine exe = pc.getExecutionEngine();
+        ConfigurationValidator.validatePigProperties(exe.getConfiguration());
+        Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
+        
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        
+        JobControl jc;
+        int numMRJobsCompl = 0;
+        DataBag input;
+        List<Pair<PigNullableWritable, Writable>> intermediateData = new ArrayList<Pair<PigNullableWritable, Writable>>();
+
+        Map<Job, MapReduceOper> jobToMroMap = jcc.getJobMroMap();
+        HashMap<String, DataBag> output = new HashMap<String, DataBag>();
+        Configuration jobConf;
+        // jc is null only when mrp.size == 0
+        boolean needFileInput;
+        final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
+        while(mrp.size() != 0) {
+            jc = jcc.compile(mrp, "Illustrator");
+            if(jc == null) {
+                throw new ExecException("Native execution is not supported");
+            }
+            List<Job> jobs = jc.getWaitingJobs();
+            for (Job job : jobs) {
+                jobConf = job.getJobConf();
+                FileLocalizer.setInitialized(false);
+                ArrayList<ArrayList<OperatorKey>> inpTargets =
+                    (ArrayList<ArrayList<OperatorKey>>)
+                      ObjectSerializer.deserialize(jobConf.get("pig.inpTargets"));
+                intermediateData.clear();
+                MapReduceOper mro = jobToMroMap.get(job);
+                PigSplit split = null;
+                List<POStore> stores = null;
+                PhysicalOperator pack = null;
+                // revisit as there are new physical operators from MR compilation 
+                if (!mro.mapPlan.isEmpty())
+                    attacher.revisit(mro.mapPlan);
+                if (!mro.reducePlan.isEmpty()) {
+                    attacher.revisit(mro.reducePlan);
+                    pack = mro.reducePlan.getRoots().get(0);
+                }
+                
+                List<POLoad> lds = PlanHelper.getLoads(mro.mapPlan);
+                if (!mro.mapPlan.isEmpty()) {
+                    stores = PlanHelper.getStores(mro.mapPlan);
+                }
+                if (!mro.reducePlan.isEmpty()) {
+                    if (stores == null)
+                        stores = PlanHelper.getStores(mro.reducePlan);
+                    else
+                        stores.addAll(PlanHelper.getStores(mro.reducePlan));
+                }
+
+                for (POStore store : stores) {
+                    output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
+                }
+               
+                OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
+                oa.visit();
+                
+                if (!mro.reducePlan.isEmpty()) {
+                    oa = new OutputAttacher(mro.reducePlan, output);
+                    oa.visit();
+                }
+                int index = 0;
+                for (POLoad ld : lds) {
+                    input = output.get(ld.getLFile().getFileName());
+                    if (input == null && baseData != null) {
+                        for (LogicalOperator lo : baseData.keySet()) {
+                            if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
+                            {
+                                 input = baseData.get(lo);
+                                 break;
+                            }
+                        }
+                    }
+                    if (input != null)
+                        mro.mapPlan.remove(ld);
+                }
+                for (POLoad ld : lds) {
+                    // check newly generated data first
+                    input = output.get(ld.getLFile().getFileName());
+                    if (input == null && baseData != null) {
+                        if (input == null && baseData != null) {
+                            for (LogicalOperator lo : baseData.keySet()) {
+                                if (((LOLoad) lo).getSchemaFile().equals(ld.getLFile().getFileName()))
+                                {
+                                     input = baseData.get(lo);
+                                     break;
+                                }
+                            }
+                        } 
+                    }
+                    needFileInput = (input == null);
+                    split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
+                    ++index;
+                    Mapper<Text, Tuple, PigNullableWritable, Writable> map;
+                    
+                    if (mro.reducePlan.isEmpty()) {
+                        // map-only
+                        map = new PigMapOnly.Map();
+                        ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapOnly.Map) map)
+                          .getIllustratorContext(jobConf, input, intermediateData, split);
+                        map.run(context);
+                    } else {
+                        if ("true".equals(jobConf.get("pig.usercomparator")))
+                            map = new PigMapReduce.MapWithComparator();
+                        else if (!"".equals(jobConf.get("pig.keyDistFile", "")))
+                            map = new PigMapReduce.MapWithPartitionIndex();
+                        else
+                            map = new PigMapReduce.Map();
+                        Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
+                          .getIllustratorContext(jobConf, input, intermediateData, split);
+                        ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        map.run(context);
+                    }
+                }
+                
+                if (!mro.reducePlan.isEmpty())
+                {
+                    if (pack instanceof POPackage)
+                        mro.reducePlan.remove(pack);
+                    // reducer run
+                    PigMapReduce.Reduce reduce;
+                    if ("true".equals(jobConf.get("pig.usercomparator")))
+                        reduce = new PigMapReduce.ReduceWithComparator();
+                    else
+                        reduce = new PigMapReduce.Reduce();
+                    reduce.setReducePlan(mro.reducePlan);
+                    Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable>.Context
+                        context = reduce.getIllustratorContext(job, intermediateData, (POPackage) pack);
+                    reduce.run(context);
+                }
+                phyToMRMap.putAll(mro.phyToMRMap);
+            }
+            
+            
+            int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
+            
+            numMRJobsCompl += removedMROp;
+        }
+                
+        jcc.reset();
+    }
+
+    private class OutputAttacher extends PhyPlanVisitor {
+        private Map<String, DataBag> outputBuffer;
+        OutputAttacher(PhysicalPlan plan, Map<String, DataBag> output) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.outputBuffer = output;
+        }
+        
+        @Override
+        public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+            if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {
+                ((ReadScalars) userFunc.getFunc()).setOutputBuffer(outputBuffer);
+            }
+        }
+    }
+    public Map<PhysicalOperator, PhysicalOperator> getPhyToMRMap() {
+        return phyToMRMap;
+    }
+}

Added: pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java (added)
+++ pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,27 @@
+package org.apache.pig.pen;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+public class POOptimizeDisabler extends LogicalRelationalNodesVisitor {
+
+    public POOptimizeDisabler(OperatorPlan plan) throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    @Override
+    public void visit(LOJoin join) throws FrontendException {
+        // use HASH join only
+        join.resetJoinType();
+    }
+
+    @Override
+    public void visit(LOCogroup cg) throws FrontendException {
+        // use regular group only
+        cg.resetGroupType();
+    }
+}

Added: pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java (added)
+++ pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This visitor visits the physical plan and resets it for next MRCompilation
+ */
+public class PhysicalPlanResetter extends PhyPlanVisitor {
+    
+    public PhysicalPlanResetter(PhysicalPlan plan) {
+        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+    }
+    
+    @Override
+    public void visitPackage(POPackage pkg) throws VisitorException {
+        pkg.setKeyInfo(null);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java Mon Dec 13 19:11:00 2010
@@ -21,13 +21,18 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.util.IdentityHashSet;
@@ -62,26 +67,36 @@ public class DisplayExamples {
     }
 
     public static String printTabular(LogicalPlan lp,
-            Map<LogicalOperator, DataBag> exampleData) {
+            Map<LogicalOperator, DataBag> exampleData,
+            Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap) {
         StringBuffer output = new StringBuffer();
-
-        LogicalOperator currentOp = lp.getLeaves().get(0);
-        printTabular(currentOp, exampleData, output);
+        Set<LogicalOperator> seen = new HashSet<LogicalOperator>();
+        for (LogicalOperator currentOp : lp.getLeaves())
+            printTabular(currentOp, exampleData, forEachInnerLogToDataMap, seen, output);
         return output.toString();
     }
 
     static void printTabular(LogicalOperator op,
-            Map<LogicalOperator, DataBag> exampleData, StringBuffer output) {
-        DataBag bag = exampleData.get(op);
+            Map<LogicalOperator, DataBag> exampleData,
+            Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap,
+            Set<LogicalOperator> seen,
+            StringBuffer output) {
 
         List<LogicalOperator> inputs = op.getPlan().getPredecessors(op);
         if (inputs != null) { // to avoid an exception when op == LOLoad
             for (LogicalOperator Op : inputs) {
-                printTabular(Op, exampleData, output);
+                if (!seen.contains(Op))
+                  printTabular(Op, exampleData, forEachInnerLogToDataMap, seen, output);
             }
         }
+        seen.add(op);
+        // print inner block first
+        if ((op instanceof LOForEach)) {
+            printNestedTabular((LOForEach)op, forEachInnerLogToDataMap, exampleData.get(op), output);
+        }
+        
         if (op.getAlias() != null) {
-            // printTable(op, bag, output);
+            DataBag bag = exampleData.get(op);
             try {
                 DisplayTable(MakeArray(op, bag), op, bag, output);
             } catch (FrontendException e) {
@@ -95,6 +110,45 @@ public class DisplayExamples {
 
     }
 
+    // print out nested gen block in ForEach
+    static void printNestedTabular(LOForEach foreach,
+            Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap,
+            DataBag foreachData,
+            StringBuffer output) {
+        List<LogicalPlan> plans = foreach.getForEachPlans();
+        if (plans != null) {
+            for (LogicalPlan plan : plans) {
+                printNestedTabular(plan.getLeaves().get(0), foreach.getAlias(), foreachData, forEachInnerLogToDataMap.get(foreach), output);
+            }
+        }
+    }
+
+    static void printNestedTabular(LogicalOperator lo, String foreachAlias, DataBag foreachData, 
+            Map<LogicalOperator, DataBag> logToDataMap, StringBuffer output) {
+        
+        List<LogicalOperator> inputs = lo.getPlan().getPredecessors(lo);
+        if (inputs != null) {
+            for (LogicalOperator op : inputs)
+                printNestedTabular(op, foreachAlias, foreachData, logToDataMap, output);
+        }
+        
+        DataBag bag = logToDataMap.get(lo);
+        if (bag == null)
+          return;
+        
+        if (lo.getAlias() != null) {
+            try {
+              DisplayNestedTable(MakeArray(lo, bag), lo, foreachAlias, foreachData, bag, output);
+            } catch (FrontendException e) {
+              // TODO Auto-generated catch block
+              e.printStackTrace();
+            } catch (Exception e) {
+              // TODO Auto-generated catch block
+              e.printStackTrace();
+            }
+        }
+    }
+    
     public static void printSimple(LogicalOperator op,
             Map<LogicalOperator, DataBag> exampleData) {
         DataBag bag = exampleData.get(op);
@@ -126,6 +180,9 @@ public class DisplayExamples {
 
     static void DisplayTable(String[][] table, LogicalOperator op, DataBag bag,
             StringBuffer output) throws FrontendException {
+        if (op instanceof LOStore && ((LOStore) op).isTmpStore())
+            return;
+        
         int cols = op.getSchema().getFields().size();
         List<FieldSchema> fields = op.getSchema().getFields();
         int rows = (int) bag.size();
@@ -136,7 +193,7 @@ public class DisplayExamples {
                 maxColSizes[i] = 5;
         }
         int total = 0;
-        int aliasLength = op.getAlias().length() + 4;
+        int aliasLength = (op instanceof LOStore ? op.getAlias().length() + 12 : op.getAlias().length() + 4);
         for (int j = 0; j < cols; ++j) {
             for (int i = 0; i < rows; ++i) {
                 int length = table[i][j].length();
@@ -146,12 +203,20 @@ public class DisplayExamples {
             total += maxColSizes[j];
         }
 
+        // Note of limit reset
+        if (op instanceof LOLimit) {
+            output.append("\nThe limit now in use, " + ((LOLimit)op).getLimit() + ", may have been changed for ILLUSTRATE purpose.\n");
+        }
+        
         // Display the schema first
         output
                 .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
                         false)
                         + "\n");
-        output.append("| " + op.getAlias() + AddSpaces(4, true) + " | ");
+        if (op instanceof LOStore)
+            output.append("| Store : " + op.getAlias() + AddSpaces(4, true) + " | ");
+        else
+            output.append("| " + op.getAlias() + AddSpaces(4, true) + " | ");
         for (int i = 0; i < cols; ++i) {
             String field = fields.get(i).toString();
             output.append(field
@@ -178,6 +243,61 @@ public class DisplayExamples {
                         + "\n");
     }
 
+    static void DisplayNestedTable(String[][] table, LogicalOperator op, String foreachAlias, DataBag bag,
+            DataBag foreachData, StringBuffer output) throws FrontendException {
+        int cols = op.getSchema().getFields().size();
+        List<FieldSchema> fields = op.getSchema().getFields();
+        int rows = (int) bag.size();
+        int[] maxColSizes = new int[cols];
+        for (int i = 0; i < cols; ++i) {
+            maxColSizes[i] = fields.get(i).toString().length();
+            if (maxColSizes[i] < 5)
+                maxColSizes[i] = 5;
+        }
+        int total = 0;
+        int aliasLength = op.getAlias().length() + +foreachAlias.length() + 5;
+        for (int j = 0; j < cols; ++j) {
+            for (int i = 0; i < rows; ++i) {
+                int length = table[i][j].length();
+                if (length > maxColSizes[j])
+                    maxColSizes[j] = length;
+            }
+            total += maxColSizes[j];
+        }
+
+        // Display the schema first
+        output
+                .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+                        false)
+                        + "\n");
+        output.append("| " + foreachAlias + "." + op.getAlias() + AddSpaces(4, true) + " | ");
+        for (int i = 0; i < cols; ++i) {
+            String field;
+            field = fields.get(i).toString();
+            output.append(field
+                    + AddSpaces(maxColSizes[i] - field.length(), true) + " | ");
+        }
+        output.append("\n"
+                + AddSpaces(total + 3 * (cols + 1) + aliasLength + 1, false)
+                + "\n");
+        // now start displaying the data
+        for (int i = 0; i < rows; ++i) {
+            output.append("| " + AddSpaces(aliasLength, true) + " | ");
+            for (int j = 0; j < cols; ++j) {
+                String str = table[i][j];
+                output.append(str
+                        + AddSpaces(maxColSizes[j] - str.length(), true)
+                        + " | ");
+            }
+            output.append("\n");
+        }
+        // now display the finish line
+        output
+                .append(AddSpaces(total + 3 * (cols + 1) + aliasLength + 1,
+                        false)
+                        + "\n");
+    }
+
     static String[][] MakeArray(LogicalOperator op, DataBag bag)
             throws Exception {
         int rows = (int) bag.size();
@@ -201,8 +321,12 @@ public class DisplayExamples {
         else {
             // System.out.println("Unrecognized data-type received!!!");
             // return null;
-            if (DataType.findTypeName(d) != null)
-                return d.toString();
+            if (DataType.findTypeName(d) != null) {
+                if (d == null)
+                    return "";
+                else
+                    return d.toString();
+            }
         }
         System.out.println("Unrecognized data-type received!!!");
         return null;

Modified: pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/ExampleTuple.java Mon Dec 13 19:11:00 2010
@@ -24,9 +24,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 
 //Example tuple adds 2 booleans to Tuple
 //synthetic say whether the tuple was generated synthetically
@@ -36,17 +34,21 @@ public class ExampleTuple implements Tup
 
     public boolean synthetic = false;
     public boolean omittable = true;
-    Tuple t;
+    Object expr = null;
+    Tuple t = null;
 
     public ExampleTuple() {
 
     }
+    
+    public ExampleTuple(Object expr) {
+      this.expr = expr;
+    }
 
     public ExampleTuple(Tuple t) {
         // Have to do it like this because Tuple is an interface, we don't
         // have access to its internal structures.
         this.t = t;
-
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/LineageTracer.java Mon Dec 13 19:11:00 2010
@@ -20,7 +20,6 @@ package org.apache.pig.pen.util;
 import java.util.*;
 
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 public class LineageTracer {
 

Modified: pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java Mon Dec 13 19:11:00 2010
@@ -19,13 +19,11 @@
 package org.apache.pig.pen.util;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.LOFilter;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.util.IdentityHashSet;
 
@@ -118,24 +116,13 @@ public class MetricEvaluation {
         int noClasses = 0;
         int noCoveredClasses = 0;
         int noOperators = 0;
-        Map<Integer, Boolean> coveredClasses;
         float completeness = 0;
         if (!overallCompleteness) {
             Collection<IdentityHashSet<Tuple>> eqClasses = OperatorToEqClasses
                     .get(op);
-            DataBag bag;
 
-            if (op instanceof LOFilter)
-                bag = exampleData.get(((LOFilter) op).getInput());
-            else
-                bag = exampleData.get(op);
-            coveredClasses = getCompletenessLogic(bag, eqClasses);
+            noCoveredClasses = getCompletenessLogic(eqClasses);
             noClasses = eqClasses.size();
-            for (Map.Entry<Integer, Boolean> e : coveredClasses.entrySet()) {
-                if (e.getValue()) {
-                    noCoveredClasses++;
-                }
-            }
 
             return 100 * ((float) noCoveredClasses) / (float) noClasses;
         } else {
@@ -150,20 +137,8 @@ public class MetricEvaluation {
                     continue; // we want to consider join a single operator
                 noOperators++;
                 Collection<IdentityHashSet<Tuple>> eqClasses = e.getValue();
-                LogicalOperator lop = e.getKey();
-                DataBag bag;
-                if (lop instanceof LOFilter)
-                    bag = exampleData.get(((LOFilter) lop).getInput());
-                else
-                    bag = exampleData.get(lop);
-                coveredClasses = getCompletenessLogic(bag, eqClasses);
+                noCoveredClasses = getCompletenessLogic(eqClasses);
                 noClasses += eqClasses.size();
-                for (Map.Entry<Integer, Boolean> e_result : coveredClasses
-                        .entrySet()) {
-                    if (e_result.getValue()) {
-                        noCoveredClasses++;
-                    }
-                }
                 completeness += 100 * ((float) noCoveredClasses / (float) noClasses);
             }
             completeness /= (float) noOperators;
@@ -173,23 +148,13 @@ public class MetricEvaluation {
 
     }
 
-    private static Map<Integer, Boolean> getCompletenessLogic(DataBag bag,
+    private static int getCompletenessLogic(
             Collection<IdentityHashSet<Tuple>> eqClasses) {
-        Map<Integer, Boolean> coveredClasses = new HashMap<Integer, Boolean>();
-
-        for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-            int classId = 0;
-            for (IdentityHashSet<Tuple> eqClass : eqClasses) {
-
-                if (eqClass.contains(t) || eqClass.size() == 0) {
-                    coveredClasses.put(classId, true);
-                }
-                classId++;
-            }
+        int nCoveredClasses = 0;
+        for (IdentityHashSet<Tuple> eqClass : eqClasses) {
+            if (!eqClass.isEmpty())
+                nCoveredClasses++;    
         }
-
-        return coveredClasses;
-
+        return nCoveredClasses;
     }
 }

Modified: pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java (original)
+++ pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java Mon Dec 13 19:11:00 2010
@@ -33,6 +33,9 @@ import org.apache.pig.impl.util.Utils;
 
 public class PreOrderDepthFirstWalker<O extends Operator, P extends OperatorPlan<O>>
         extends PlanWalker<O, P> {
+  
+    private boolean branchFlag = false;
+    
     /**
      * @param plan
      *            Plan for this walker to traverse.
@@ -41,6 +44,14 @@ public class PreOrderDepthFirstWalker<O 
         super(plan);
     }
 
+    public void setBranchFlag() {
+        branchFlag = true;
+    }
+    
+    public boolean getBranchFlag() {
+        return branchFlag;
+    }
+    
     /**
      * Begin traversing the graph.
      * 
@@ -66,8 +77,10 @@ public class PreOrderDepthFirstWalker<O 
         if (predecessors == null)
             return;
 
+        boolean thisBranchFlag = branchFlag;
         for (O pred : predecessors) {
             if (seen.add(pred)) {
+                branchFlag = thisBranchFlag;
                 pred.visit(visitor);
                 Collection<O> newPredecessors = Utils.mergeCollection(mPlan.getPredecessors(pred), mPlan.getSoftLinkPredecessors(pred));
                 depthFirst(pred, newPredecessors, seen, visitor);

Added: pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java (added)
+++ pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+
+@SuppressWarnings("unchecked")
+public class ReverseDepthFirstWalker<O extends Operator, P extends OperatorPlan<O>>
+        extends PlanWalker<O, P> {
+    private HashSet<O> stoppers = null;
+    private LogicalOperator lo;
+    private LogicalPlan lp;
+    Map<LogicalOperator, O> l2pMap;
+    Set<O> seen;
+    
+    /**
+     * @param plan
+     *            Plan for this walker to traverse.
+     */
+    public ReverseDepthFirstWalker(P plan, LogicalOperator op, LogicalPlan lp, Map<LogicalOperator, O> l2pMap, Set<O> seen) {
+        super(plan);
+        if (lp != null && lp.getPredecessors(op) != null)
+        {
+            stoppers = new HashSet<O>();
+            for (LogicalOperator lo :  lp.getPredecessors(op))
+                stoppers.add(l2pMap.get(lo));
+        }
+        lo = op;
+        this.lp = lp;
+        this.l2pMap = l2pMap;
+        if (seen == null)
+            this.seen = new HashSet<O>();
+        else
+            this.seen = seen;
+    }
+
+    public ReverseDepthFirstWalker(P plan, LogicalOperator op, LogicalPlan lp, Map<LogicalOperator, O> l2pMap)
+    {
+        this(plan, op, lp, l2pMap, null);
+    }
+    
+    public PlanWalker<O, P> spawnChildWalker(P plan) {
+        return new ReverseDepthFirstWalker<O, P>(plan, lo, lp, l2pMap, seen);
+    }
+        
+    /**
+     * Begin traversing the graph.
+     * 
+     * @param visitor
+     *            Visitor this walker is being used by.
+     * @throws VisitorException
+     *             if an error is encountered while walking.
+     */
+    public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
+        O po = l2pMap.get(lo);
+        // TODO: After use new LO, this should be removed. Found necessary in the testForeach test.
+        if (po == null)
+            return;
+        depthFirst(null, mPlan.getPredecessors(po), visitor);
+        if (seen.add(po))
+            po.visit(visitor);
+    }
+
+    private void depthFirst(O node, Collection<O> predecessors,
+            PlanVisitor<O, P> visitor) throws VisitorException {
+        if (predecessors == null)
+            return;
+
+        for (O pred : predecessors) {
+            if (seen.add(pred)) {
+                Collection<O> newPredecessors = Utils.mergeCollection(mPlan.getPredecessors(pred), mPlan.getSoftLinkPredecessors(pred));
+                depthFirst(pred, newPredecessors, visitor);
+                pred.visit(visitor);
+            }
+            if (stoppers.contains(pred))
+                continue;
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Dec 13 19:11:00 2010
@@ -96,6 +96,7 @@ public class GruntParser extends PigScri
         mDone = false;
         mLoadOnly = false;
         mExplain = null;
+        mScriptIllustrate = false;
     }
 
     private void setBatchOn() {
@@ -202,6 +203,10 @@ public class GruntParser extends PigScri
         mJobConf = execEngine.getJobConf();
     }
 
+    public void setScriptIllustrate() {
+        mScriptIllustrate = true;
+    }
+    
     @Override
     public void prompt()
     {
@@ -271,7 +276,7 @@ public class GruntParser extends PigScri
                 }
                 setBatchOn();
                 try {
-                    loadScript(script, true, true, params, files);
+                    loadScript(script, true, true, false, params, files);
                 } catch(IOException e) {
                     discardBatch();
                     throw e;
@@ -416,20 +421,20 @@ public class GruntParser extends PigScri
                 setBatchOn();
                 mPigServer.setJobName(script);
                 try {
-                    loadScript(script, true, mLoadOnly, params, files);
+                    loadScript(script, true, false, mLoadOnly, params, files);
                     executeBatch();
                 } finally {
                     discardBatch();
                 }
             } else {
-                loadScript(script, false, mLoadOnly, params, files);
+                loadScript(script, false, false, mLoadOnly, params, files);
             }
         } else {
             log.warn("'run/exec' statement is ignored while processing 'explain -script' or '-check'");
         }
     }
 
-    private void loadScript(String script, boolean batch, boolean loadOnly,
+    private void loadScript(String script, boolean batch, boolean loadOnly, boolean illustrate,
                             List<String> params, List<String> files) 
         throws IOException, ParseException {
         
@@ -467,6 +472,8 @@ public class GruntParser extends PigScri
         parser.setConsoleReader(reader);
         parser.setInteractive(interactive);
         parser.setLoadOnly(loadOnly);
+        if (illustrate)
+            parser.setScriptIllustrate();
         parser.mExplain = mExplain;
         
         parser.prompt();
@@ -621,12 +628,43 @@ public class GruntParser extends PigScri
     }
     
     @Override
-    protected void processIllustrate(String alias) throws IOException
+    protected void processIllustrate(String alias, String script, String target, List<String> params, List<String> files) throws IOException, ParseException
     {
-        if(mExplain == null) { // process only if not in "explain" mode
-            mPigServer.getExamples(alias);
-        } else {
+        if (mScriptIllustrate)
+            throw new ParseException("'illustrate' statement can not appear in a script that is illustrated opon.");
+
+        if ((alias != null) && (script != null))
+            throw new ParseException("'illustrate' statement on an alias does not work when a script is in effect");
+        else if (mExplain != null)
             log.warn("'illustrate' statement is ignored while processing 'explain -script' or '-check'");
+        else {
+            try {
+                if (script != null) {
+                    if (!"true".equalsIgnoreCase(mPigServer.
+                                                 getPigContext()
+                                                 .getProperties().
+                                                 getProperty("opt.multiquery","true"))) {
+                        throw new ParseException("Cannot explain script if multiquery is disabled.");
+                    }
+                    setBatchOn();
+                    try {
+                        loadScript(script, true, true, true, params, files);
+                    } catch(IOException e) {
+                        discardBatch();
+                        throw e;
+                    } catch (ParseException e) {
+                        discardBatch();
+                        throw e;
+                    }
+                } else if (alias == null) {
+                    throw new ParseException("'illustrate' statement must be on an alias or on a script.");
+                }
+                mPigServer.getExamples(alias);
+            } finally {
+                if (script != null) {
+                    discardBatch();
+                }
+            }
         }
     }
 
@@ -998,4 +1036,5 @@ public class GruntParser extends PigScri
     private int mNumFailedJobs;
     private int mNumSucceededJobs;
     private FsShell shell;
+    private boolean mScriptIllustrate;
 }