You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/23 19:15:49 UTC
svn commit: r1052348 - in /pig/trunk:
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOpe...
Author: yanz
Date: Thu Dec 23 18:15:48 2010
New Revision: 1052348
URL: http://svn.apache.org/viewvc?rev=1052348&view=rev
Log:
ILLUSTRATE rework: _9.patch
Added:
pig/trunk/test/org/apache/pig/test/PORead.java
pig/trunk/test/org/apache/pig/test/data/illustrate7.pig
Removed:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java
pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
pig/trunk/src/org/apache/pig/pen/util/DependencyOrderLimitedWalker.java
pig/trunk/src/org/apache/pig/pen/util/FunctionalLogicalOptimizer.java
pig/trunk/src/org/apache/pig/pen/util/ReverseDepthFirstWalker.java
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
pig/trunk/test/org/apache/pig/test/TestFilter.java
pig/trunk/test/org/apache/pig/test/TestGrunt.java
pig/trunk/test/org/apache/pig/test/TestPODistinct.java
pig/trunk/test/org/apache/pig/test/TestPOSort.java
pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java
pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt
pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Dec 23 18:15:48 2010
@@ -116,11 +116,6 @@ public class PhyPlanSetter extends PhyPl
}
@Override
- public void visitRead(PORead read) throws VisitorException {
- read.setParentPlan(parent);
- }
-
- @Override
public void visitSort(POSort sort) throws VisitorException {
super.visitSort(sort);
sort.setParentPlan(parent);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Dec 23 18:15:48 2010
@@ -131,10 +131,6 @@ public class PhyPlanVisitor extends Plan
//do nothing
}
- public void visitRead(PORead read) throws VisitorException {
- //do nothing
- }
-
public void visitSort(POSort sort) throws VisitorException {
List<PhysicalPlan> inpPlans = sort.getSortPlans();
for (PhysicalPlan plan : inpPlans) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Dec 23 18:15:48 2010
@@ -219,7 +219,6 @@ public class POCollectedGroup extends Ph
outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
: new InternalCachedBag(1);
outputBag.add((Tuple)tup.get(1));
- illustratorMarkup(null, tup2, 0);
return res;
}
@@ -279,24 +278,6 @@ public class POCollectedGroup extends Ph
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- if (illustrator != null) {
- ExampleTuple tOut = new ExampleTuple(out);
- LineageTracer lineage = illustrator.getLineage();
- lineage.insert(tOut);
- DataBag bag;
- try {
- bag = (DataBag) tOut.get(1);
- } catch (ExecException e) {
- throw new RuntimeException("Illustrator markup exception" + e.getMessage());
- }
- boolean synthetic = false;
- while (!synthetic && bag.iterator().hasNext()) {
- synthetic |= ((ExampleTuple) bag.iterator().next()).synthetic;
- }
- tOut.synthetic = synthetic;
- illustrator.addData((Tuple) out);
- return tOut;
- } else
- return (Tuple) out;
+ return null;
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Thu Dec 23 18:15:48 2010
@@ -284,7 +284,7 @@ public class POMergeCogroup extends Phys
for(int i=0; i < relationCnt; i++)
out.set(i+1,(outBags[i]));
- return new Result(POStatus.STATUS_OK, illustratorMarkup(null, out, -1));
+ return new Result(POStatus.STATUS_OK, out);
}
@@ -600,35 +600,6 @@ public class POMergeCogroup extends Phys
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- LineageTracer lineageTracer = illustrator.getLineage();
- lineageTracer.insert((Tuple) out);
- Tuple tmp;
- boolean synthetic = false;
- try {
- for (int i = 1; i < relationCnt; i++)
- {
- DataBag dbs = (DataBag) ((Tuple) out).get(i);
- Iterator<Tuple> iter = dbs.iterator();
- while (iter.hasNext()) {
- tmp = iter.next();
- // any of synthetic data in bags causes the output tuple to be synthetic
- if (!synthetic && ((ExampleTuple)tmp).synthetic)
- synthetic = true;
- lineageTracer.union(tOut, tmp);
- // TODO constraint of >=2 tuples per eq. class
- illustrator.getEquivalenceClasses().get(i-1).add(tmp);
- }
- }
- } catch (ExecException e) {
- // TODO better exception handling
- throw new RuntimeException("Illustrator exception :"+e.getMessage());
- }
- tOut.synthetic = synthetic;
- illustrator.addData((Tuple) tOut);
- return tOut;
- } else
- return (Tuple) out;
+ return null;
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Thu Dec 23 18:15:48 2010
@@ -210,7 +210,6 @@ public class POMergeJoin extends Physica
for(int i=0; i < rightTupSize; i++)
joinedTup.set(i+leftTupSize, curJoiningRightTup.get(i));
- joinedTup = illustratorMarkup(null, joinedTup, 0);
return new Result(POStatus.STATUS_OK, joinedTup);
}
// Join with current right input has ended. But bag of left tuples
@@ -565,25 +564,6 @@ public class POMergeJoin extends Physica
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
- if(illustrator != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) out);
- tOut.synthetic = ((ExampleTuple) out).synthetic;
- LineageTracer lineageTracer = illustrator.getLineage();
- lineageTracer.insert(tOut);
- try {
- for (int i = 0; i < leftTupSize+rightTupSize; i++)
- {
- lineageTracer.union(tOut, (Tuple) tOut.get(i));
- illustrator.getEquivalenceClasses().get(i).add((Tuple)tOut);
- // TODO constraint of >=2 tuples per eq. class
- }
- } catch (ExecException e) {
- // TODO better exception handling
- throw new RuntimeException("Illustrator exception :"+e.getMessage());
- }
- illustrator.addData((Tuple) tOut);
- return tOut;
- }
- return (Tuple) out;
+ return null;
}
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Thu Dec 23 18:15:48 2010
@@ -201,4 +201,8 @@ public class LOJoin extends LogicalRelat
return false;
}
}
+
+ public List<Operator> getInputs(LogicalPlan plan) {
+ return plan.getPredecessors(this);
+ }
}
Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Thu Dec 23 18:15:48 2010
@@ -45,6 +45,7 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
@@ -285,6 +286,112 @@ public class AugmentBaseDataVisitor exte
}
@Override
+ public void visit(LOJoin join) throws FrontendException {
+ if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
+ return;
+ // we first get the outputconstraints for the current cogroup
+ DataBag outputConstraints = outputConstraintsMap.get(join);
+ outputConstraintsMap.remove(join);
+ boolean ableToHandle = true;
+ // we then check if we can handle this cogroup and try to collect some
+ // information about grouping
+ List<List<Integer>> groupSpecs = new LinkedList<List<Integer>>();
+ int numCols = -1;
+
+ for (int index = 0; index < join.getInputs((LogicalPlan)plan).size(); ++index) {
+ Collection<LogicalExpressionPlan> groupByPlans = (List<LogicalExpressionPlan>) join
+ .getExpressionPlans().get(index);
+ List<Integer> groupCols = new ArrayList<Integer>();
+ for (LogicalExpressionPlan plan : groupByPlans) {
+ Operator leaf = plan.getSinks().get(0);
+ if (leaf instanceof ProjectExpression) {
+ groupCols.add(Integer.valueOf(((ProjectExpression) leaf).getColNum()));
+ } else {
+ ableToHandle = false;
+ break;
+ }
+ }
+ if (numCols == -1) {
+ numCols = groupCols.size();
+ }
+ if (groupCols.size() != groupByPlans.size()
+ || groupCols.size() != numCols) {
+ // we came across an unworkable cogroup plan
+ break;
+ } else {
+ groupSpecs.add(groupCols);
+ }
+ }
+
+ // we should now have some workable data at this point to synthesize
+ // tuples
+ try {
+ if (ableToHandle) {
+ // we need to go through the output constraints first
+ int numInputs = join.getInputs((LogicalPlan) plan).size();
+ if (outputConstraints != null) {
+ for (Iterator<Tuple> it = outputConstraints.iterator(); it
+ .hasNext();) {
+ Tuple outputConstraint = it.next();
+
+ for (int input = 0; input < numInputs; input++) {
+
+ int numInputFields = ((LogicalRelationalOperator) join.getInputs((LogicalPlan) plan).get(input))
+ .getSchema().size();
+ List<Integer> groupCols = groupSpecs.get(input);
+
+ DataBag output = outputConstraintsMap.get(join
+ .getInputs((LogicalPlan) plan).get(input));
+ if (output == null) {
+ output = BagFactory.getInstance()
+ .newDefaultBag();
+ outputConstraintsMap.put(join.getInputs((LogicalPlan) plan).get(
+ input), output);
+ }
+
+ Tuple inputConstraint = GetJoinInput(
+ outputConstraint, groupCols, numInputFields);
+ if (inputConstraint != null)
+ output.add(inputConstraint);
+ }
+ }
+ }
+ // then, go through all organic data groups and add input
+ // constraints to make each group big enough
+ DataBag outputData = derivedData.get(join);
+
+ if (outputData.size() == 0) {
+ DataBag output0 = outputConstraintsMap.get(join.getInputs((LogicalPlan) plan).get(0));
+ if (output0 == null || output0.size() == 0) {
+ output0 = derivedData.get(join.getInputs((LogicalPlan) plan).get(0));
+ }
+ Tuple inputConstraint0 = output0.iterator().next();
+ for (int input = 1; input < numInputs; input++) {
+ DataBag output = outputConstraintsMap.get(join.getInputs((LogicalPlan) plan).get(input));
+ if (output == null)
+ {
+ output = BagFactory.getInstance().newDefaultBag();
+ outputConstraintsMap.put(join.getInputs((LogicalPlan) plan).get(input),
+ output);
+ }
+ int numInputFields = ((LogicalRelationalOperator)join.getInputs((LogicalPlan) plan).get(input)).getSchema().size();
+ Tuple inputConstraint = GetJoinInput(inputConstraint0, groupSpecs.get(0), groupSpecs.get(input), numInputFields);
+ if (inputConstraint != null)
+ output.add(inputConstraint);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log
+ .error("Error visiting Cogroup during Augmentation phase of Example Generator! "
+ + e.getMessage());
+ throw new FrontendException(
+ "Error visiting Cogroup during Augmentation phase of Example Generator! "
+ + e.getMessage());
+ }
+ }
+
+ @Override
public void visit(LOCross cs) throws FrontendException {
}
@@ -728,6 +835,42 @@ public class AugmentBaseDataVisitor exte
return t;
}
+ Tuple GetJoinInput(Tuple group, List<Integer> groupCols0, List<Integer> groupCols,
+ int numFields) throws ExecException {
+ Tuple t = TupleFactory.getInstance().newTuple(numFields);
+
+ if (groupCols.size() == 1) {
+ // GroupLabel would be a data atom
+ t.set(groupCols.get(0), group.get(groupCols0.get(0)));
+ } else {
+ if (!(group instanceof Tuple))
+ throw new RuntimeException("Unrecognized group label!");
+ for (int i = 0; i < groupCols.size(); i++) {
+ t.set(groupCols.get(i), group.get(groupCols0.get(i)));
+ }
+ }
+
+ return t;
+ }
+
+ Tuple GetJoinInput(Tuple group, List<Integer> groupCols,
+ int numFields) throws ExecException {
+ Tuple t = TupleFactory.getInstance().newTuple(numFields);
+
+ if (groupCols.size() == 1) {
+ // GroupLabel would be a data atom
+ t.set(groupCols.get(0), group);
+ } else {
+ if (!(group instanceof Tuple))
+ throw new RuntimeException("Unrecognized group label!");
+ for (int i = 0; i < groupCols.size(); i++) {
+ t.set(groupCols.get(i), group.get(i));
+ }
+ }
+
+ return t;
+ }
+
Tuple BackPropConstraint(Tuple outputConstraint, List<Integer> cols,
LogicalSchema inputSchema, boolean cast) throws ExecException {
Tuple inputConst = TupleFactory.getInstance().newTuple(
Modified: pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java (original)
+++ pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java Thu Dec 23 18:15:48 2010
@@ -195,11 +195,6 @@ public class IllustratorAttacher extends
innerPlanAttach(fl, fl.getPlan());
subExpResults = null;
}
-
- @Override
- public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
- setIllustrator(mg, 1);
- }
@Override
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
@@ -403,39 +398,10 @@ public class IllustratorAttacher extends
}
@Override
- public void visitFRJoin(POFRJoin join) throws VisitorException {
- // one eq. class per input
- setIllustrator(join, join.getInputs().size());
- }
-
- @Override
- public void visitMergeJoin(POMergeJoin join) throws VisitorException {
- // one eq. class per input
- setIllustrator(join, join.getInputs().size());
- }
-
- @Override
- public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
- // one eq. class per input
- setIllustrator(mergeCoGrp, mergeCoGrp.getInputs().size());
- }
-
- @Override
public void visitStream(POStream stream) throws VisitorException {
setIllustrator(stream, 1);
}
- @Override
- public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
- // one eq. class per input
- // do not go to inner plans as they are not booleans so no use of eq. classes
- setIllustrator(sk, sk.getInputs().size());
- }
-
- @Override
- public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
- }
-
/**
* @param optimizedForEach
*/
@@ -443,14 +409,6 @@ public class IllustratorAttacher extends
public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
visitPOForEach(optimizedForEach);
}
-
- /**
- * @param preCombinerLocalRearrange
- */
- @Override
- public void visitPreCombinerLocalRearrange(
- POPreCombinerLocalRearrange preCombinerLocalRearrange) {
- }
private void innerPlanAttach(PhysicalOperator po, PhysicalPlan plan) throws VisitorException {
PlanWalker<PhysicalOperator, PhysicalPlan> childWalker =
Modified: pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java (original)
+++ pig/trunk/src/org/apache/pig/pen/POOptimizeDisabler.java Thu Dec 23 18:15:48 2010
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.pig.pen;
import org.apache.pig.impl.logicalLayer.FrontendException;
Added: pig/trunk/test/org/apache/pig/test/PORead.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/PORead.java?rev=1052348&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/PORead.java (added)
+++ pig/trunk/test/org/apache/pig/test/PORead.java Thu Dec 23 18:15:48 2010
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This operator is used to read tuples from a databag in memory. Used mostly
+ * for testing. It'd also be useful for the example generator
+ *
+ */
+public class PORead extends PhysicalOperator {
+ private static final long serialVersionUID = 1L;
+ DataBag bag;
+ transient Iterator<Tuple> it;
+
+ public PORead(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, DataBag bag) {
+ super(k);
+ this.bag = bag;
+ }
+
+ @Override
+ public Result getNext(Tuple t) {
+ if (it == null) {
+ it = bag.iterator();
+ }
+ Result res = new Result();
+ if (it.hasNext()) {
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = it.next();
+ } else {
+ res.returnStatus = POStatus.STATUS_EOP;
+ }
+ return res;
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "PORead - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+ throw new VisitorException("Vistor not accepted !");
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
+}
Modified: pig/trunk/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFilter.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestFilter.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestFilter.java Thu Dec 23 18:15:48 2010
@@ -17,9 +17,6 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.*;
-
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -37,7 +34,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Thu Dec 23 18:15:48 2010
@@ -558,6 +558,23 @@ public class TestGrunt extends TestCase
grunt.exec();
}
+ @Test
+ public void testIllustrateScript7() throws Throwable {
+ // empty line/field test
+ PigServer server = new PigServer(ExecType.LOCAL, cluster.getProperties());
+ PigContext context = server.getPigContext();
+
+ String strCmd = "illustrate -script "
+ + basedir + "/illustrate7.pig;";
+
+ ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
+ InputStreamReader reader = new InputStreamReader(cmd);
+
+ Grunt grunt = new Grunt(new BufferedReader(reader), context);
+
+ grunt.exec();
+ }
+
/**
* verify that grunt commands are ignored in explain -script mode
*/
Modified: pig/trunk/test/org/apache/pig/test/TestPODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPODistinct.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPODistinct.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPODistinct.java Thu Dec 23 18:15:48 2010
@@ -36,7 +36,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
import org.junit.Before;
import org.junit.Test;
Modified: pig/trunk/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOSort.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOSort.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOSort.java Thu Dec 23 18:15:48 2010
@@ -33,11 +33,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.test.utils.GenRandomData;
import org.junit.Test;
Modified: pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOUserFunc.java Thu Dec 23 18:15:48 2010
@@ -41,7 +41,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.test.PORead;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.test.utils.GenRandomData;
Modified: pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt (original)
+++ pig/trunk/test/org/apache/pig/test/data/TestIllustrateInput2.txt Thu Dec 23 18:15:48 2010
@@ -2,6 +2,5 @@
13 11
15 17
11 14
-10 19
17 18
20 19
Added: pig/trunk/test/org/apache/pig/test/data/illustrate7.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/illustrate7.pig?rev=1052348&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/illustrate7.pig (added)
+++ pig/trunk/test/org/apache/pig/test/data/illustrate7.pig Thu Dec 23 18:15:48 2010
@@ -0,0 +1,4 @@
+a = load 'test/org/apache/pig/test/data/TestIllustrateInput.txt' as (x:int, y:int);
+b = load 'test/org/apache/pig/test/data/TestIllustrateInput2.txt' as (x:int, y:int);
+c = join a by x, b by x;
+store c into 'test.out';
Modified: pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=1052348&r1=1052347&r2=1052348&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Thu Dec 23 18:15:48 2010
@@ -25,7 +25,7 @@ import java.util.Random;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.test.PORead;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;