You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/23 19:15:49 UTC

svn commit: r1052348 - in /pig/trunk: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOpe...

Author: yanz
Date: Thu Dec 23 18:15:48 2010
New Revision: 1052348

URL: http://svn.apache.org/viewvc?rev=1052348&view=rev
Log:
ILLUSTRATE rework: _9.patch

Added:
    pig/trunk/test/org/apache/pig/test/PORead.java
    pig/trunk/test/org/apache/pig/test/data/illustrate7.pig
Removed:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java
    pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
    pig/trunk/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java
    pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
    pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java
Modified:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
    pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
    pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
    pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
    pig/trunk/test/org/apache/pig/test/TestFilter.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestPODistinct.java
    pig/trunk/test/org/apache/pig/test/TestPOSort.java
    pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java
    pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt
    pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Dec 23 18:15:48 2010
@@ -116,11 +116,6 @@ public class PhyPlanSetter extends PhyPl
     }
     
     @Override
-    public void visitRead(PORead read) throws VisitorException {
-        read.setParentPlan(parent);		
-    }
-    
-    @Override
     public void visitSort(POSort sort) throws VisitorException {
         super.visitSort(sort);
         sort.setParentPlan(parent);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Dec 23 18:15:48 2010
@@ -131,10 +131,6 @@ public class PhyPlanVisitor extends Plan
         //do nothing		
 	}
 
-	public void visitRead(PORead read) throws VisitorException {
-        //do nothing		
-	}
-
 	public void visitSort(POSort sort) throws VisitorException {
         List<PhysicalPlan> inpPlans = sort.getSortPlans();
         for (PhysicalPlan plan : inpPlans) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Dec 23 18:15:48 2010
@@ -219,7 +219,6 @@ public class POCollectedGroup extends Ph
             outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
                     : new InternalCachedBag(1);
             outputBag.add((Tuple)tup.get(1));
-            illustratorMarkup(null, tup2, 0);
             return res;
         }
 
@@ -279,24 +278,6 @@ public class POCollectedGroup extends Ph
     
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-        if (illustrator != null) {
-            ExampleTuple tOut = new ExampleTuple(out);
-            LineageTracer lineage = illustrator.getLineage();
-            lineage.insert(tOut);
-            DataBag bag;
-            try {
-                bag = (DataBag) tOut.get(1);
-            } catch (ExecException e) {
-                throw new RuntimeException("Illustrator markup exception" + e.getMessage());
-            }
-            boolean synthetic = false;
-            while (!synthetic && bag.iterator().hasNext()) {
-                synthetic |= ((ExampleTuple) bag.iterator().next()).synthetic;
-            }
-            tOut.synthetic = synthetic;
-            illustrator.addData((Tuple) out);
-            return tOut;
-        } else
-          return (Tuple) out;
+        return null;
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Thu Dec 23 18:15:48 2010
@@ -284,7 +284,7 @@ public class POMergeCogroup extends Phys
         for(int i=0; i < relationCnt; i++)
             out.set(i+1,(outBags[i]));
 
-        return new Result(POStatus.STATUS_OK, illustratorMarkup(null, out, -1));        
+        return new Result(POStatus.STATUS_OK, out);        
     }
 
 
@@ -600,35 +600,6 @@ public class POMergeCogroup extends Phys
     
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-        if(illustrator != null) {
-            ExampleTuple tOut = new ExampleTuple((Tuple) out);
-            LineageTracer lineageTracer = illustrator.getLineage();
-            lineageTracer.insert((Tuple) out);
-            Tuple tmp;
-            boolean synthetic = false;
-            try {
-                for (int i = 1; i < relationCnt; i++)
-                {
-                    DataBag dbs = (DataBag) ((Tuple) out).get(i);
-                    Iterator<Tuple> iter = dbs.iterator();
-                    while (iter.hasNext()) {
-                        tmp = iter.next();
-                        // any of synthetic data in bags causes the output tuple to be synthetic
-                        if (!synthetic && ((ExampleTuple)tmp).synthetic)
-                            synthetic = true;
-                        lineageTracer.union(tOut, tmp);
-                        // TODO constraint of >=2 tuples per eq. class
-                        illustrator.getEquivalenceClasses().get(i-1).add(tmp);
-                    }
-                }
-            } catch (ExecException e) {
-              // TODO better exception handling
-              throw new RuntimeException("Illustrator exception :"+e.getMessage());
-            }
-            tOut.synthetic = synthetic;
-            illustrator.addData((Tuple) tOut);
-            return tOut;
-        } else
-            return (Tuple) out;
+        return null;
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Thu Dec 23 18:15:48 2010
@@ -210,7 +210,6 @@ public class POMergeJoin extends Physica
                 for(int i=0; i < rightTupSize; i++)
                     joinedTup.set(i+leftTupSize, curJoiningRightTup.get(i));
 
-                joinedTup = illustratorMarkup(null, joinedTup, 0);
                 return new Result(POStatus.STATUS_OK, joinedTup);
             }
             // Join with current right input has ended. But bag of left tuples
@@ -565,25 +564,6 @@ public class POMergeJoin extends Physica
     
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-        if(illustrator != null) {
-            ExampleTuple tOut = new ExampleTuple((Tuple) out);
-            tOut.synthetic = ((ExampleTuple) out).synthetic;
-            LineageTracer lineageTracer = illustrator.getLineage();
-            lineageTracer.insert(tOut);
-            try {
-                for (int i = 0; i < leftTupSize+rightTupSize; i++)
-                {
-                    lineageTracer.union(tOut,  (Tuple) tOut.get(i));
-                    illustrator.getEquivalenceClasses().get(i).add((Tuple)tOut);
-                    // TODO constraint of >=2 tuples per eq. class
-                }
-            } catch (ExecException e) {
-              // TODO better exception handling
-              throw new RuntimeException("Illustrator exception :"+e.getMessage());
-            }
-            illustrator.addData((Tuple) tOut);
-            return tOut;
-        }
-        return (Tuple) out;
+        return null;
     }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Thu Dec 23 18:15:48 2010
@@ -201,4 +201,8 @@ public class LOJoin extends LogicalRelat
             return false;
         }
     }
+    
+    public List<Operator> getInputs(LogicalPlan plan) {
+        return plan.getPredecessors(this);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Thu Dec 23 18:15:48 2010
@@ -45,6 +45,7 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.CastExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -285,6 +286,112 @@ public class AugmentBaseDataVisitor exte
     }
 
     @Override
+    public void visit(LOJoin join) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
+            return;
+        // we first get the outputconstraints for the current cogroup
+        DataBag outputConstraints = outputConstraintsMap.get(join);
+        outputConstraintsMap.remove(join);
+        boolean ableToHandle = true;
+        // we then check if we can handle this cogroup and try to collect some
+        // information about grouping
+        List<List<Integer>> groupSpecs = new LinkedList<List<Integer>>();
+        int numCols = -1;
+
+        for (int index = 0; index < join.getInputs((LogicalPlan)plan).size(); ++index) {
+            Collection<LogicalExpressionPlan> groupByPlans = (List<LogicalExpressionPlan>) join
+                    .getExpressionPlans().get(index);
+            List<Integer> groupCols = new ArrayList<Integer>();
+            for (LogicalExpressionPlan plan : groupByPlans) {
+                Operator leaf = plan.getSinks().get(0);
+                if (leaf instanceof ProjectExpression) {
+                    groupCols.add(Integer.valueOf(((ProjectExpression) leaf).getColNum()));
+                } else {
+                    ableToHandle = false;
+                    break;
+                }
+            }
+            if (numCols == -1) {
+                numCols = groupCols.size();
+            }
+            if (groupCols.size() != groupByPlans.size()
+                    || groupCols.size() != numCols) {
+                // we came across an unworkable cogroup plan
+                break;
+            } else {
+                groupSpecs.add(groupCols);
+            }
+        }
+
+        // we should now have some workable data at this point to synthesize
+        // tuples
+        try {
+            if (ableToHandle) {
+                // we need to go through the output constraints first
+                int numInputs = join.getInputs((LogicalPlan) plan).size();
+                if (outputConstraints != null) {
+                    for (Iterator<Tuple> it = outputConstraints.iterator(); it
+                            .hasNext();) {
+                        Tuple outputConstraint = it.next();
+
+                        for (int input = 0; input < numInputs; input++) {
+
+                            int numInputFields = ((LogicalRelationalOperator) join.getInputs((LogicalPlan) plan).get(input))
+                                    .getSchema().size();
+                            List<Integer> groupCols = groupSpecs.get(input);
+
+                            DataBag output = outputConstraintsMap.get(join
+                                    .getInputs((LogicalPlan) plan).get(input));
+                            if (output == null) {
+                                output = BagFactory.getInstance()
+                                        .newDefaultBag();
+                                outputConstraintsMap.put(join.getInputs((LogicalPlan) plan).get(
+                                        input), output);
+                            }
+
+                            Tuple inputConstraint = GetJoinInput(
+                                    outputConstraint, groupCols, numInputFields);
+                            if (inputConstraint != null)
+                                output.add(inputConstraint);
+                        }
+                    }
+                }
+                // then, go through all organic data groups and add input
+                // constraints to make each group big enough
+                DataBag outputData = derivedData.get(join);
+
+                if (outputData.size() == 0) {
+                    DataBag output0 = outputConstraintsMap.get(join.getInputs((LogicalPlan) plan).get(0));
+                    if (output0 == null || output0.size() == 0) {
+                        output0 = derivedData.get(join.getInputs((LogicalPlan) plan).get(0));
+                    }
+                    Tuple inputConstraint0 = output0.iterator().next();
+                    for (int input = 1; input < numInputs; input++) {
+                        DataBag output = outputConstraintsMap.get(join.getInputs((LogicalPlan) plan).get(input));
+                        if (output == null)
+                        {
+                            output = BagFactory.getInstance().newDefaultBag();
+                            outputConstraintsMap.put(join.getInputs((LogicalPlan) plan).get(input),
+                                    output);
+                        }
+                        int numInputFields = ((LogicalRelationalOperator)join.getInputs((LogicalPlan) plan).get(input)).getSchema().size();
+                        Tuple inputConstraint = GetJoinInput(inputConstraint0, groupSpecs.get(0), groupSpecs.get(input), numInputFields);
+                        if (inputConstraint != null)
+                            output.add(inputConstraint);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log
+                    .error("Error visiting Cogroup during Augmentation phase of Example Generator! "
+                            + e.getMessage());
+            throw new FrontendException(
+                    "Error visiting Cogroup during Augmentation phase of Example Generator! "
+                            + e.getMessage());
+        }
+    }
+
+    @Override
     public void visit(LOCross cs) throws FrontendException {
 
     }
@@ -728,6 +835,42 @@ public class AugmentBaseDataVisitor exte
         return t;
     }
 
+    Tuple GetJoinInput(Tuple group, List<Integer> groupCols0, List<Integer> groupCols,
+        int numFields) throws ExecException {
+        Tuple t = TupleFactory.getInstance().newTuple(numFields);
+
+        if (groupCols.size() == 1) {
+            // GroupLabel would be a data atom
+            t.set(groupCols.get(0), group.get(groupCols0.get(0)));
+        } else {
+            if (!(group instanceof Tuple))
+                throw new RuntimeException("Unrecognized group label!");
+            for (int i = 0; i < groupCols.size(); i++) {
+                t.set(groupCols.get(i), group.get(groupCols0.get(i)));
+            }
+        }
+
+        return t;
+    }
+    
+    Tuple GetJoinInput(Tuple group, List<Integer> groupCols,
+        int numFields) throws ExecException {
+        Tuple t = TupleFactory.getInstance().newTuple(numFields);
+
+        if (groupCols.size() == 1) {
+            // GroupLabel would be a data atom
+            t.set(groupCols.get(0), group);
+        } else {
+            if (!(group instanceof Tuple))
+                throw new RuntimeException("Unrecognized group label!");
+            for (int i = 0; i < groupCols.size(); i++) {
+                t.set(groupCols.get(i), group.get(i));
+            }
+        }
+
+        return t;
+    }
+    
     Tuple BackPropConstraint(Tuple outputConstraint, List<Integer> cols,
             LogicalSchema inputSchema, boolean cast) throws ExecException {
         Tuple inputConst = TupleFactory.getInstance().newTuple(

Modified: pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java (original)
+++ pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java Thu Dec 23 18:15:48 2010
@@ -195,11 +195,6 @@ public class IllustratorAttacher extends
         innerPlanAttach(fl, fl.getPlan());
         subExpResults = null;
     }
- 
-    @Override
-    public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
-        setIllustrator(mg, 1);
-    }
     
     @Override
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
@@ -403,39 +398,10 @@ public class IllustratorAttacher extends
     }
     
     @Override
-    public void visitFRJoin(POFRJoin join) throws VisitorException {
-        // one eq. class per input
-        setIllustrator(join, join.getInputs().size());
-    }
-    
-    @Override
-    public void visitMergeJoin(POMergeJoin join) throws VisitorException {
-        // one eq. class per input
-        setIllustrator(join, join.getInputs().size());
-    }
-    
-    @Override
-    public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
-        // one eq. class per input
-        setIllustrator(mergeCoGrp, mergeCoGrp.getInputs().size());
-    }
-
-    @Override
     public void visitStream(POStream stream) throws VisitorException {
         setIllustrator(stream, 1);
     }
 
-    @Override
-	public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
-        // one eq. class per input
-        // do not go to inner plans as they are not booleans so no use of eq. classes
-        setIllustrator(sk, sk.getInputs().size());
-	}
-
-    @Override
-	public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
-	}
-
     /**
      * @param optimizedForEach
      */
@@ -443,14 +409,6 @@ public class IllustratorAttacher extends
     public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
         visitPOForEach(optimizedForEach);
     }
-
-    /**
-     * @param preCombinerLocalRearrange
-     */
-    @Override
-    public void visitPreCombinerLocalRearrange(
-            POPreCombinerLocalRearrange preCombinerLocalRearrange) {
-    }
     
     private void innerPlanAttach(PhysicalOperator po, PhysicalPlan plan) throws VisitorException {
         PlanWalker<PhysicalOperator, PhysicalPlan> childWalker =

Modified: pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java (original)
+++ pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java Thu Dec 23 18:15:48 2010
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.pig.pen;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;

Added: pig/trunk/test/org/apache/pig/test/PORead.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/PORead.java?rev=1052348&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/PORead.java (added)
+++ pig/trunk/test/org/apache/pig/test/PORead.java Thu Dec 23 18:15:48 2010
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This operator is used to read tuples from a databag in memory. Used mostly
+ * for testing. It'd also be useful for the example generator
+ * 
+ */
+public class PORead extends PhysicalOperator {
+    private static final long serialVersionUID = 1L;
+    DataBag bag;
+    transient Iterator<Tuple> it;
+
+    public PORead(OperatorKey k) {
+        super(k);
+        // TODO Auto-generated constructor stub
+    }
+
+    public PORead(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public PORead(OperatorKey k, int rp) {
+        super(k, rp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public PORead(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public PORead(OperatorKey k, DataBag bag) {
+        super(k);
+        this.bag = bag;
+    }
+
+    @Override
+    public Result getNext(Tuple t) {
+        if (it == null) {
+            it = bag.iterator();
+        }
+        Result res = new Result();
+        if (it.hasNext()) {
+            res.returnStatus = POStatus.STATUS_OK;
+            res.result = it.next();
+        } else {
+            res.returnStatus = POStatus.STATUS_EOP;
+        }
+        return res;
+    }
+
+    @Override
+    public String name() {
+        // TODO Auto-generated method stub
+        return "PORead - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        // TODO Auto-generated method stub
+        throw new VisitorException("Vistor not accepted !");
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFilter.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFilter.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFilter.java Thu Dec 23 18:15:48 2010
@@ -17,9 +17,6 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -37,7 +34,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;

Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Thu Dec 23 18:15:48 2010
@@ -558,6 +558,23 @@ public class TestGrunt extends TestCase 
         grunt.exec();
     }
     
+    @Test
+    public void testIllustrateScript7() throws Throwable {
+        // empty line/field test
+        PigServer server = new PigServer(ExecType.LOCAL, cluster.getProperties());
+        PigContext context = server.getPigContext();
+        
+        String strCmd = "illustrate -script "
+                + basedir + "/illustrate7.pig;";
+        
+        ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+        InputStreamReader reader = new InputStreamReader(cmd);
+        
+        Grunt grunt = new Grunt(new BufferedReader(reader), context);
+    
+        grunt.exec();
+    }
+    
     /**
      * verify that grunt commands are ignored in explain -script mode
      */

Modified: pig/trunk/test/org/apache/pig/test/TestPODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPODistinct.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPODistinct.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPODistinct.java Thu Dec 23 18:15:48 2010
@@ -36,7 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
 import org.junit.Before;
 import org.junit.Test;
 

Modified: pig/trunk/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOSort.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOSort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOSort.java Thu Dec 23 18:15:48 2010
@@ -33,11 +33,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.test.utils.GenRandomData;
 import org.junit.Test;
 

Modified: pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java Thu Dec 23 18:15:48 2010
@@ -41,7 +41,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.test.utils.GenRandomData;

Modified: pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt (original)
+++ pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt Thu Dec 23 18:15:48 2010
@@ -2,6 +2,5 @@
 13	11
 15	17
 11	14
-10	19
 17	18
 20	19

Added: pig/trunk/test/org/apache/pig/test/data/illustrate7.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/illustrate7.pig?rev=1052348&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/illustrate7.pig (added)
+++ pig/trunk/test/org/apache/pig/test/data/illustrate7.pig Thu Dec 23 18:15:48 2010
@@ -0,0 +1,4 @@
+a = load 'test/org/apache/pig/test/data/TestIllustrateInput.txt' as (x:int, y:int);
+b = load 'test/org/apache/pig/test/data/TestIllustrateInput2.txt' as (x:int, y:int);
+c = join a by x, b by x;
+store c into 'test.out';

Modified: pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Thu Dec 23 18:15:48 2010
@@ -25,7 +25,7 @@ import java.util.Random;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.test.PORead;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;