You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/04/22 02:34:22 UTC
svn commit: r767337 [1/2] - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/backend/hadoop/executionengine...
Author: pradeepkth
Date: Wed Apr 22 00:34:21 2009
New Revision: 767337
URL: http://svn.apache.org/viewvc?rev=767337&view=rev
Log:
PIG-514:COUNT returns no results as a result of two filter statements in FOREACH (pradeepkth)
Added:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java
hadoop/pig/trunk/test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java
hadoop/pig/trunk/test/org/apache/pig/test/Util.java
hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Apr 22 00:34:21 2009
@@ -42,6 +42,8 @@
PIG-745: Add DataType.toString() to force basic types to chararray, useful
for UDFs that want to handle all simple types as strings (ciemo via gates).
+PIG-514: COUNT returns no results as a result of two filter statements in
+FOREACH (pradeepkth)
Release 0.2.0 - Unreleased
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Apr 22 00:34:21 2009
@@ -864,8 +864,14 @@
@Override
public void visit(LOProject op) throws VisitorException {
String scope = op.getOperatorKey().scope;
- POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
+ POProject exprOp;
+ if(op.isSendEmptyBagOnEOP()) {
+ exprOp = new PORelationToExprProject(new OperatorKey(scope, nodeGen
.getNextNodeId(scope)), op.getRequestedParallelism());
+ } else {
+ exprOp = new POProject(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), op.getRequestedParallelism());
+ }
exprOp.setResultType(op.getType());
exprOp.setColumns((ArrayList)op.getProjection());
exprOp.setStar(op.isStar());
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Apr 22 00:34:21 2009
@@ -287,6 +287,17 @@
return ret;
}
+ /**
+ * Reset internal state in an operator. For use in nested pipelines
+ * where operators like limit and sort may need to reset their state.
+ * Limit needs it because it needs to know it's seeing a fresh set of
+ * input. Blocking operators like sort and distinct need it because they
+ * may not have drained their previous input due to a limit and thus need
+ * to be told to drop their old input and start over.
+ */
+ public void reset() {
+ }
+
public static void setReporter(PigProgressable reporter) {
PhysicalOperator.reporter = reporter;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Wed Apr 22 00:34:21 2009
@@ -56,11 +56,13 @@
private static final long serialVersionUID = 1L;
private static TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ protected static BagFactory bagFactory = BagFactory.getInstance();
private boolean resultSingleTupleBag = false;
//The column to project
- ArrayList<Integer> columns;
+ protected ArrayList<Integer> columns;
//True if we are in the middle of streaming tuples
//in a bag
@@ -181,13 +183,21 @@
Result res = processInputBag();
if(res.returnStatus!=POStatus.STATUS_OK)
return res;
- DataBag inpBag = (DataBag) res.result;
+ return(consumeInputBag(res));
+ }
+ /**
+ * @param input
+ * @throws ExecException
+ */
+ protected Result consumeInputBag(Result input) throws ExecException {
+ DataBag inpBag = (DataBag) input.result;
+ Result retVal = new Result();
if(isInputAttached() || star){
- res.result = inpBag;
- res.returnStatus = POStatus.STATUS_OK;
+ retVal.result = inpBag;
+ retVal.returnStatus = POStatus.STATUS_OK;
detachInput();
- return res;
+ return retVal;
}
DataBag outBag;
@@ -201,7 +211,7 @@
tmpTuple.set(i, tuple.get(columns.get(i)));
outBag = new SingleTupleBag(tmpTuple);
} else {
- outBag = BagFactory.getInstance().newDefaultBag();
+ outBag = bagFactory.newDefaultBag();
for (Tuple tuple : inpBag) {
Tuple tmpTuple = tupleFactory.newTuple(columns.size());
for (int i = 0; i < columns.size(); i++)
@@ -209,9 +219,9 @@
outBag.add(tmpTuple);
}
}
- res.result = outBag;
- res.returnStatus = POStatus.STATUS_OK;
- return res;
+ retVal.result = outBag;
+ retVal.returnStatus = POStatus.STATUS_OK;
+ return retVal;
}
@Override
@@ -374,7 +384,7 @@
return clone;
}
- private Result processInputBag() throws ExecException {
+ protected Result processInputBag() throws ExecException {
Result res = new Result();
if (input==null && (inputs == null || inputs.size()==0)) {
Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=767337&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Wed Apr 22 00:34:21 2009
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+
+import java.util.ArrayList;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * Implements a specialized form of POProject which is
+ * used *ONLY* in the following case:
+ * This project is Project(*) introduced after a relational operator
+ * to supply a bag as output (as an expression). This project is either
+ * providing the bag as input to a successor expression operator or is
+ * itself the leaf in a inner plan
+ * If the predecessor relational operator sends an EOP
+ * then send an empty bag first to signal "empty" output
+ * and then send an EOP
+
+ * NOTE: A Project(*) of return type BAG whose predecessor is
+ * from an outside plan (i.e. not in the same inner plan as the project)
+ * will NOT lead us here. So a query like:
+ * a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
+ * will go through a regular project (without the following flag)
+ */
+public class PORelationToExprProject extends POProject {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ boolean sendEmptyBagOnEOP = false;
+
+ public PORelationToExprProject(OperatorKey k) {
+ this(k,-1,0);
+ }
+
+ public PORelationToExprProject(OperatorKey k, int rp) {
+ this(k, rp, 0);
+ }
+
+ public PORelationToExprProject(OperatorKey k, int rp, int col) {
+ super(k, rp, col);
+ }
+
+ public PORelationToExprProject(OperatorKey k, int rp, ArrayList<Integer> cols) {
+ super(k, rp, cols);
+ }
+
+ @Override
+ public String name() {
+
+ return "RelationToExpressionProject" + "[" + DataType.findTypeName(resultType) + "]" + ((star) ? "[*]" : columns) + " - " + mKey.toString();
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // for now the specialization in this class
+ // does not affect the way visitors visit it - so
+ // we can just use visitProject()
+ v.visitProject(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#reset()
+ */
+ @Override
+ public void reset() {
+ // the foreach in which this operator is
+ // present is starting with a new set of inputs
+ // if we see an EOP from the predecessor *first*
+ // (.i.e we do not see any other input before and EOP
+ // and only see an EOP - this can happen if a Filter
+ // is the predecessor and it filters away all its input)
+ // we should send an empty bag. Set a flag which can be
+ // checked if an EOP is encountered.
+ sendEmptyBagOnEOP = true;
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ Result input = processInputBag();
+ if(input.returnStatus!=POStatus.STATUS_OK) {
+ if(input.returnStatus == POStatus.STATUS_EOP && sendEmptyBagOnEOP) {
+ // we received an EOP from the predecessor
+ // since the successor in the pipeline is
+ // expecting a bag, send an empty bag
+ input.result = bagFactory.newDefaultBag();
+ input.returnStatus = POStatus.STATUS_OK;
+ // we should send EOP the next time we are called
+ // if the foreach in which this operator is present
+ // calls this.getNext(bag) with new inputs then
+ // this flag will be reset in this.reset()
+ } else {
+ // since we are sending down some result (empty bag or otherwise)
+ // we should not be sending an empty bag on EOP any more UNLESS
+ // we are processing new inputs (see reset())
+ sendEmptyBagOnEOP = false;
+ return input;
+ }
+ }
+ Result r = consumeInputBag(input);
+ // since we are sending down some result (empty bag or otherwise)
+ // we should not be sending an empty bag on EOP any more UNLESS
+ // we are processing new inputs (see reset())
+ sendEmptyBagOnEOP = false;
+ return(r);
+ }
+
+ @Override
+ public PORelationToExprProject clone() throws CloneNotSupportedException {
+ ArrayList<Integer> cols = new ArrayList<Integer>(columns.size());
+ // Can resuse the same Integer objects, as they are immutable
+ for (Integer i : columns) {
+ cols.add(i);
+ }
+ PORelationToExprProject clone = new PORelationToExprProject(new OperatorKey(mKey.scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+ requestedParallelism, cols);
+ clone.cloneHelper(this);
+ clone.star = star;
+ clone.overloaded = overloaded;
+ return clone;
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Apr 22 00:34:21 2009
@@ -38,8 +38,10 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
@@ -53,6 +55,7 @@
private static final long serialVersionUID = 1L;
protected List<PhysicalPlan> inputPlans;
+ protected List<PhysicalOperator> opsToBeReset;
protected Log log = LogFactory.getLog(getClass());
protected static TupleFactory mTupleFactory = TupleFactory.getInstance();
//Since the plan has a generate, this needs to be maintained
@@ -104,6 +107,7 @@
super(k, rp);
setUpFlattens(isToBeFlattened);
this.inputPlans = inp;
+ opsToBeReset = new ArrayList<PhysicalOperator>();
getLeaves();
}
@@ -194,7 +198,10 @@
}
attachInputToPlans((Tuple) inp.result);
-
+ for (PhysicalOperator po : opsToBeReset) {
+ po.reset();
+ }
+
res = processPlan();
processingPlan = true;
@@ -427,6 +434,18 @@
noItems = 0;
resultTypes = null;
}
+
+ if(inputPlans != null) {
+ for (PhysicalPlan pp : inputPlans) {
+ try {
+ ResetFinder lf = new ResetFinder(pp, opsToBeReset);
+ lf.visit();
+ } catch (VisitorException ve) {
+ String errMsg = "Internal Error: Unexpected error looking for nested operators which need to be reset in FOREACH";
+ throw new RuntimeException(errMsg, ve);
+ }
+ }
+ }
}
public List<PhysicalPlan> getInputPlans() {
@@ -499,9 +518,15 @@
flattens.add(b);
}
}
+
+ List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(opsToBeReset.size());
+ for (PhysicalOperator op : opsToBeReset) {
+ ops.add(op);
+ }
POForEach clone = new POForEach(new OperatorKey(mKey.scope,
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
requestedParallelism, plans, flattens);
+ clone.setOpsToBeReset(ops);
clone.setResultType(getResultType());
return clone;
}
@@ -522,4 +547,58 @@
}
}
}
+
+ /**
+ * Visits a pipeline and calls reset on all the nodes. Currently only
+ * pays attention to limit nodes, each of which need to be told to reset
+ * their limit.
+ */
+ private class ResetFinder extends PhyPlanVisitor {
+
+ ResetFinder(PhysicalPlan plan, List<PhysicalOperator> toBeReset) {
+ super(plan,
+ new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ @Override
+ public void visitDistinct(PODistinct d) throws VisitorException {
+ // FIXME: add only if limit is present
+ opsToBeReset.add(d);
+ }
+
+ @Override
+ public void visitLimit(POLimit limit) throws VisitorException {
+ opsToBeReset.add(limit);
+ }
+
+ @Override
+ public void visitSort(POSort sort) throws VisitorException {
+ // FIXME: add only if limit is present
+ opsToBeReset.add(sort);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
+ */
+ @Override
+ public void visitProject(POProject proj) throws VisitorException {
+ if(proj instanceof PORelationToExprProject) {
+ opsToBeReset.add(proj);
+ }
+ }
+ }
+
+ /**
+ * @return the opsToBeReset
+ */
+ public List<PhysicalOperator> getOpsToBeReset() {
+ return opsToBeReset;
+ }
+
+ /**
+ * @param opsToBeReset the opsToBeReset to set
+ */
+ public void setOpsToBeReset(List<PhysicalOperator> opsToBeReset) {
+ this.opsToBeReset = opsToBeReset;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/AVG.java Wed Apr 22 00:34:21 2009
@@ -86,8 +86,11 @@
// input is a bag with one tuple containing
// the column we are trying to avg
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- DataByteArray dba = (DataByteArray)tp.get(0);
+ DataByteArray dba = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ dba = (DataByteArray)tp.get(0);
+ }
t.set(0, dba != null ? Double.valueOf(dba.toString()) : null);
t.set(1, 1L);
return t;
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/COUNT.java Wed Apr 22 00:34:21 2009
@@ -72,8 +72,10 @@
// Since Initial is guaranteed to be called
// only in the map, it will be called with an
// input of a bag with a single tuple - the
- // count should always be 1.
- return mTupleFactory.newTuple(new Long(1));
+ // count should always be 1 if bag is non empty
+ DataBag bag = (DataBag)input.get(0);
+ return mTupleFactory.newTuple(bag.iterator().hasNext()?
+ new Long(1) : new Long(0));
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleAvg.java Wed Apr 22 00:34:21 2009
@@ -83,8 +83,12 @@
// input is a bag with one tuple containing
// the column we are trying to avg on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- t.set(0, (Double)(tp.get(0)));
+ Double d = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ d = (Double)(tp.get(0));
+ }
+ t.set(0, d);
t.set(1, 1L);
return t;
} catch (ExecException ee) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMax.java Wed Apr 22 00:34:21 2009
@@ -70,8 +70,12 @@
// input is a bag with one tuple containing
// the column we are trying to max on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Double)(tp.get(0)));
+ Double d = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ d = (Double)(tp.get(0));
+ }
+ return tfact.newTuple(d);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleMin.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
// input is a bag with one tuple containing
// the column we are trying to min on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Double)(tp.get(0)));
+ Double d = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ d = (Double)(tp.get(0));
+ }
+ return tfact.newTuple(d);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/DoubleSum.java Wed Apr 22 00:34:21 2009
@@ -76,8 +76,12 @@
// input is a bag with one tuple containing
// the column we are trying to sum
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Double)( tp.get(0)));
+ Double d = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ d = (Double)( tp.get(0));
+ }
+ return tfact.newTuple(d);
} catch (ExecException e) {
throw e;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatAvg.java Wed Apr 22 00:34:21 2009
@@ -80,8 +80,11 @@
// input is a bag with one tuple containing
// the column we are trying to avg on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- Float f = (Float)(tp.get(0));
+ Float f = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ f = (Float)(tp.get(0));
+ }
t.set(0, f != null ? new Double(f) : null);
t.set(1, 1L);
return t;
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMax.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
// input is a bag with one tuple containing
// the column we are trying to max on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Float)(tp.get(0)));
+ Float f = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ f = (Float)(tp.get(0));
+ }
+ return tfact.newTuple(f);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatMin.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
// input is a bag with one tuple containing
// the column we are trying to min on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Float)(tp.get(0)));
+ Float f = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ f = (Float)(tp.get(0));
+ }
+ return tfact.newTuple(f);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/FloatSum.java Wed Apr 22 00:34:21 2009
@@ -73,10 +73,13 @@
// input is a bag with one tuple containing
// the column we are trying to sum
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- // send down a double since intermediate
- // would be sending a double
- Float f = (Float)tp.get(0);
+ Float f = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ f = (Float)tp.get(0);
+ }
+ // send down a double since intermediate
+ // would be sending a double
return tfact.newTuple(f != null ?
new Double(f) : null);
} catch (ExecException e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntAvg.java Wed Apr 22 00:34:21 2009
@@ -81,8 +81,11 @@
// input is a bag with one tuple containing
// the column we are trying to avg on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- Integer i = (Integer)tp.get(0);
+ Integer i = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ i = (Integer)tp.get(0);
+ }
t.set(0, i != null ? new Long(i): null);
t.set(1, 1L);
return t;
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMax.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
// input is a bag with one tuple containing
// the column we are trying to max on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Integer)(tp.get(0)));
+ Integer i = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ i = (Integer)(tp.get(0));
+ }
+ return tfact.newTuple(i);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntMin.java Wed Apr 22 00:34:21 2009
@@ -71,8 +71,12 @@
// input is a bag with one tuple containing
// the column we are trying to min on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Integer)(tp.get(0)));
+ Integer i = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ i = (Integer)(tp.get(0));
+ }
+ return tfact.newTuple(i);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/IntSum.java Wed Apr 22 00:34:21 2009
@@ -72,8 +72,11 @@
// input is a bag with one tuple containing
// the column we are trying to sum
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- Integer i = (Integer)tp.get(0);
+ Integer i = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ i = (Integer)tp.get(0);
+ }
return tfact.newTuple(i != null ?
new Long(i) : null);
}catch(NumberFormatException nfe){
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongAvg.java Wed Apr 22 00:34:21 2009
@@ -80,8 +80,12 @@
// input is a bag with one tuple containing
// the column we are trying to avg on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- t.set(0, (Long)(tp.get(0)));
+ Long l = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ l = (Long)(tp.get(0));
+ }
+ t.set(0, l);
t.set(1, 1L);
return t;
} catch (ExecException ee) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMax.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
// input is a bag with one tuple containing
// the column we are trying to max on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Long)(tp.get(0)));
+ Long l = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ l = (Long)(tp.get(0));
+ }
+ return tfact.newTuple(l);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongMin.java Wed Apr 22 00:34:21 2009
@@ -69,8 +69,12 @@
// input is a bag with one tuple containing
// the column we are trying to min on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((Long)(tp.get(0)));
+ Long l = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ l = (Long)(tp.get(0));
+ }
+ return tfact.newTuple(l);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/LongSum.java Wed Apr 22 00:34:21 2009
@@ -73,8 +73,12 @@
// input is a bag with one tuple containing
// the column we are trying to sum
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple( (Long)tp.get(0));
+ Long l = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ l = (Long)tp.get(0);
+ }
+ return tfact.newTuple(l);
} catch (ExecException e) {
throw e;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MAX.java Wed Apr 22 00:34:21 2009
@@ -76,8 +76,11 @@
// input is a bag with one tuple containing
// the column we are trying to max on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- DataByteArray dba = (DataByteArray)tp.get(0);
+ DataByteArray dba = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ dba = (DataByteArray)tp.get(0);
+ }
return tfact.newTuple(dba != null ?
Double.valueOf(dba.toString()): null);
} catch (NumberFormatException e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/MIN.java Wed Apr 22 00:34:21 2009
@@ -76,8 +76,11 @@
// input is a bag with one tuple containing
// the column we are trying to min on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- DataByteArray dba = (DataByteArray)tp.get(0);
+ DataByteArray dba = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ dba = (DataByteArray)tp.get(0);
+ }
return tfact.newTuple(dba != null?
Double.valueOf(dba.toString()) : null);
} catch (NumberFormatException e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/SUM.java Wed Apr 22 00:34:21 2009
@@ -77,8 +77,11 @@
// input is a bag with one tuple containing
// the column we are trying to sum
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- DataByteArray dba = (DataByteArray)tp.get(0);
+ DataByteArray dba = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ dba = (DataByteArray)tp.get(0);
+ }
return tfact.newTuple(dba != null?
Double.valueOf(dba.toString()): null);
}catch(NumberFormatException nfe){
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMax.java Wed Apr 22 00:34:21 2009
@@ -65,8 +65,12 @@
// input is a bag with one tuple containing
// the column we are trying to max on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((String)(tp.get(0)));
+ String s = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ s = (String)(tp.get(0));
+ }
+ return tfact.newTuple(s);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/StringMin.java Wed Apr 22 00:34:21 2009
@@ -67,8 +67,12 @@
// input is a bag with one tuple containing
// the column we are trying to min on
DataBag bg = (DataBag) input.get(0);
- Tuple tp = bg.iterator().next();
- return tfact.newTuple((String)(tp.get(0)));
+ String s = null;
+ if(bg.iterator().hasNext()) {
+ Tuple tp = bg.iterator().next();
+ s = (String)(tp.get(0));
+ }
+ return tfact.newTuple(s);
} catch (ExecException ee) {
throw ee;
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Wed Apr 22 00:34:21 2009
@@ -36,7 +36,10 @@
* will be done on this bag to disk.
*/
public class NonSpillableDataBag implements DataBag {
-
+ // the reason this class does NOT extend DefaultAbstractBag
+ // is that we don't want to bloat this class with members it
+ // does not need (DefaultAbstractBag has many members related
+ // to spilling which are not needed here)
/**
*
*/
@@ -180,6 +183,14 @@
}
}
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ return compareTo(obj) == 0;
+ }
@Override
public int compareTo(Object other) {
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOProject.java Wed Apr 22 00:34:21 2009
@@ -68,6 +68,8 @@
private boolean mSentinel;
private boolean mOverloaded = false;
+ private boolean sendEmptyBagOnEOP = false;
+
/**
*
* @param plan
@@ -460,4 +462,18 @@
return clone;
}
+ /**
+ * @param sendEmptyBagOnEOP the sendEmptyBagOnEOP to set
+ */
+ public void setSendEmptyBagOnEOP(boolean sendEmptyBagOnEOP) {
+ this.sendEmptyBagOnEOP = sendEmptyBagOnEOP;
+ }
+
+ /**
+ * @return the sendEmptyBagOnEOP
+ */
+ public boolean isSendEmptyBagOnEOP() {
+ return sendEmptyBagOnEOP;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Apr 22 00:34:21 2009
@@ -3405,6 +3405,18 @@
} else {
item = new LOProject(lp, new OperatorKey(scope, getNextId()), op, -1);
((LOProject)item).setStar(true);
+ // This project is Project(*) introduced after a relational operator
+ // to supply a bag as output (as an expression). This project is either
+ // providing the bag as input to a successor expression operator or is
+ // itself the leaf in a inner plan
+ // If the predecessor relational operator sends an EOP
+ // then send an empty bag first to signal "empty" output
+ // and then send an EOP
+
+ // A query like:
+ // a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
+ /// will go through a regular project (without the following flag)
+ ((LOProject)item).setSendEmptyBagOnEOP(true);
log.debug("Set star to true");
item.setAlias(t1.image);
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBuiltin.java Wed Apr 22 00:34:21 2009
@@ -510,6 +510,132 @@
}
}
+
+ /**
+ * Test the case where an empty bag is given as input to
+ * the Initial function and the output is fed to Intermediate
+ * function whose output is fed to the Final function
+ * @throws Exception
+ */
+ @Test
+ public void testAggEmptyBagWithCombiner() throws Exception {
+
+ for (String[] aggGroup : aggs) {
+ String[] aggFinalTypes = null; // will contains AVGFinal, DoubleAvgFinal etc
+ String[] aggInitialTypes = null; // will contains AVGInitial, DoubleAvgInitial etc
+ String[] aggIntermediateTypes = null; // will contains AVGIntermediate, DoubleAvgIntermediate etc
+ for (String stage: stages) {
+ String[] aggTypesArray = null;
+ if(stage.equals("Initial")) {
+ aggInitialTypes = new String[aggGroup.length];
+ aggTypesArray = aggInitialTypes;
+ } else if (stage.equals("Intermediate")) {
+ aggIntermediateTypes = new String[aggGroup.length];
+ aggTypesArray = aggIntermediateTypes;
+ } else {// final
+ aggFinalTypes = new String[aggGroup.length];
+ aggTypesArray = aggFinalTypes;
+ }
+
+ for (int i = 0; i < aggTypesArray.length; i++) {
+ aggTypesArray[i] = aggGroup[i] + stage;
+ }
+ }
+ for(int k = 0; k < aggFinalTypes.length; k++) {
+ EvalFunc<?> aggInitial = evalFuncMap.get(aggInitialTypes[k]);
+ // To test this case, first <Agg>Initial is called with an empty bag
+ // as input. This is done in two ierations of 5 calls.
+ // The output from <Agg>Initial for the first half of inputs is
+ // put into one bag and the next half into another. Then these two
+ // bags are provided as inputs to two separate calls of <Agg>Intermediate.
+ // The outputs from the two calls to <Agg>Intermediate are put into a bag
+ // and sent as input to <Agg>Final
+
+ DataBag intermediateInputBg1 = bagFactory.newDefaultBag();
+ DataBag intermediateInputBg2 = bagFactory.newDefaultBag();
+ Tuple outputTuple = null;
+ for(int i = 0; i < 10; i++) {
+ // create empty bag input to be provided as input
+ // argument to the "Initial" function
+ DataBag initialInputBg = bagFactory.newDefaultBag();
+ Tuple initialInputTuple = tupleFactory.newTuple(initialInputBg);
+
+ if(i < 5) {
+ outputTuple = (Tuple)aggInitial.exec(initialInputTuple);
+ // check that output is null for all aggs except COUNT
+ // COUNT will give an output of 0 for empty bag input
+ checkZeroOrNull(aggInitial, outputTuple.get(0));
+ intermediateInputBg1.add(outputTuple);
+ } else {
+ outputTuple = (Tuple)aggInitial.exec(initialInputTuple);
+ // check that output is null for all aggs except COUNT
+ // COUNT will give an output of 0 for empty bag input
+ checkZeroOrNull(aggInitial, outputTuple.get(0));
+ intermediateInputBg2.add(outputTuple);
+ }
+ }
+
+ EvalFunc<?> aggIntermediate = evalFuncMap.get(aggIntermediateTypes[k]);
+ DataBag finalInputBg = bagFactory.newDefaultBag();
+ Tuple intermediateInputTuple = tupleFactory.newTuple(intermediateInputBg1);
+ outputTuple = (Tuple)aggIntermediate.exec(intermediateInputTuple);
+ // check that output is null for all aggs except COUNT
+ // COUNT will give an output of 0 for empty bag input
+ checkZeroOrNull(aggIntermediate, outputTuple.get(0));
+ finalInputBg.add(outputTuple);
+ intermediateInputTuple = tupleFactory.newTuple(intermediateInputBg2);
+ outputTuple = (Tuple)aggIntermediate.exec(intermediateInputTuple);
+ // check that output is null for all aggs except COUNT
+ // COUNT will give an output of 0 for empty bag input
+ checkZeroOrNull(aggIntermediate, outputTuple.get(0));
+ finalInputBg.add(outputTuple);
+
+ Tuple finalInputTuple = tupleFactory.newTuple(finalInputBg);
+
+ EvalFunc<?> aggFinal = evalFuncMap.get(aggFinalTypes[k]);
+ Object output = aggFinal.exec(finalInputTuple);
+ // check that output is null for all aggs except COUNT
+ // COUNT will give an output of 0 for empty bag input
+ checkZeroOrNull(aggFinal, output);
+ }
+ }
+
+ }
+
+ /**
+ * Test the case where an empty bag is given as input to the non
+ * combiner version of aggregate functions
+ * @throws Exception if there are issues executing the aggregate function
+ */
+ @Test
+ public void testAggEmptyBag() throws Exception {
+
+ for (String[] aggGroup : aggs) {
+
+ for(int k = 0; k < aggGroup.length; k++) {
+ EvalFunc<?> agg = evalFuncMap.get(aggGroup[k]);
+
+ // call agg with empty bag as input
+ DataBag inputBag = bagFactory.newDefaultBag();
+ Tuple inputTuple = tupleFactory.newTuple(inputBag);
+
+ Object output = agg.exec(inputTuple);
+ // check that output is null for all aggs except COUNT
+ // COUNT will give an output of 0 for empty bag input
+ checkZeroOrNull(agg, output);
+ }
+ }
+
+ }
+
+ private void checkZeroOrNull(EvalFunc<?> func, Object output) {
+ if(func.getClass().getName().contains("COUNT")) {
+ assertEquals(new Long(0), output);
+ } else {
+ assertEquals(null, output);
+ }
+ }
+
// Builtin MATH Functions
// =======================
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java?rev=767337&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestRelationToExprProject.java Wed Apr 22 00:34:21 2009
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.util.LogUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Test PORelationToExprProject which is a special project
+ * introduced to handle the following case:
+ * This project is Project(*) introduced after a relational operator
+ * to supply a bag as output (as an expression). This project is either
+ * providing the bag as input to a successor expression operator or is
+ * itself the leaf in a inner plan
+ * If the predecessor relational operator sends an EOP
+ * then send an empty bag first to signal "empty" output
+ * and then send an EOP
+
+ * NOTE: A Project(*) of return type BAG whose predecessor is
+ * from an outside plan (i.e. not in the same inner plan as the project)
+ * will NOT lead us here. So a query like:
+ * a = load 'baginp.txt' as (b:bag{t:tuple()}); b = foreach a generate $0; dump b;
+ * will go through a regular project (without the following flag)
+ */
+public class TestRelationToExprProject extends TestCase {
+
+ private MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
+ private static final String TEST_FILTER_COUNT3_INPUT="test/org/apache/pig/test/data/TestRelationToExprProjectInput.txt";
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Before
+ protected void setUp() throws Exception {
+ pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+ }
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#tearDown()
+ */
+ @After
+ protected void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ // based on the script provided in the jira issue:PIG-514
+ // tests that when a filter inside a foreach filters away all tuples
+ // for a group, an empty bag is still provided to udfs whose
+ // input is the filter
+ @Test
+ public void testFilterCount1() throws IOException, ParseException {
+
+ String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3"};
+ Util.createInputFile(cluster, "test.txt", inputData);
+ String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" +
+ "test2 = group test by col1;" +
+ "test3 = foreach test2 {" +
+ " filter_one = filter test by (col2==1);" +
+ " filter_notone = filter test by (col2!=1);" +
+ " generate group as col1, COUNT(filter_one) as cnt_one, COUNT(filter_notone) as cnt_notone;};";
+ Util.registerQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("test3");
+ Tuple[] expected = new DefaultTuple[2];
+ expected[0] = (Tuple) Util.getPigConstant("(1,1L,1L)");
+ expected[1] = (Tuple) Util.getPigConstant("(2,2L,0L)");
+ Object[] results = new Object[2];
+ int i = 0;
+ while(it.hasNext()) {
+ if(i == 2) {
+ fail("Got more tuples than expected!");
+ }
+ Tuple t = it.next();
+ if(t.get(0).equals(1)) {
+ // this is the first tuple
+ results[0] = t;
+ } else {
+ results[1] = t;
+ }
+ i++;
+ }
+ for (int j = 0; j < expected.length; j++) {
+ assertTrue(expected[j].equals(results[j]));
+ }
+ Util.deleteFile(cluster, "test.txt");
+ }
+
+ // based on jira PIG-710
+ // tests that when a filter inside a foreach filters away all tuples
+ // for a group, an empty bag is still provided to udfs whose
+ // input is the filter
+ @Test
+ public void testFilterCount2() throws IOException, ParseException {
+ Util.createInputFile(cluster, "filterbug.data", new String[] {
+ "a\thello" ,
+ "a\tgoodbye" ,
+ "b\tgoodbye" ,
+ "c\thello" ,
+ "c\thello" ,
+ "c\thello" ,
+ "d\twhat"
+ });
+ String query = "A = load 'filterbug.data' using PigStorage() as ( id:chararray, str:chararray );" +
+ "B = group A by ( id );" +
+ "Cfiltered = foreach B {" +
+ " D = filter A by (" +
+ " str matches 'hello'" +
+ " );" +
+ " matchedcount = COUNT(D);" +
+ " generate" +
+ " group," +
+ " matchedcount as matchedcount," +
+ " A.str;" +
+ " };";
+ Util.registerQuery(pigServer, query);
+ Iterator<Tuple> it = pigServer.openIterator("Cfiltered");
+ Map<String, Tuple> expected = new HashMap<String, Tuple>();
+ expected.put("a", (Tuple) Util.getPigConstant("('a',1L,{('hello'),('goodbye')})"));
+ expected.put("b", (Tuple) Util.getPigConstant("('b',0L,{('goodbye')})"));
+ expected.put("c", (Tuple) Util.getPigConstant("('c',3L,{('hello'),('hello'),('hello')})"));
+ expected.put("d", (Tuple) Util.getPigConstant("('d',0L,{('what')})"));
+ int i = 0;
+ while(it.hasNext()) {
+ Tuple actual = it.next();
+ assertEquals(expected.get(actual.get(0)), actual);
+ i++;
+ }
+ assertEquals(4, i);
+ Util.deleteFile(cluster, "filterbug.data");
+ }
+
+ // based on jira PIG-739
+ // tests that when a filter inside a foreach filters away all tuples
+ // for a group, an empty bag is still provided to udfs whose
+ // input is the filter
+ @Test
+ public void testFilterCount3() throws IOException, ParseException {
+ Util.copyFromLocalToCluster(cluster, TEST_FILTER_COUNT3_INPUT, "testdata");
+ String query = "TESTDATA = load 'testdata' using PigStorage() as (timestamp:chararray, testid:chararray, userid: chararray, sessionid:chararray, value:long, flag:int);" +
+ "TESTDATA_FILTERED = filter TESTDATA by (timestamp gte '1230800400000' and timestamp lt '1230804000000' and value != 0);" +
+ "TESTDATA_GROUP = group TESTDATA_FILTERED by testid;" +
+ "TESTDATA_AGG = foreach TESTDATA_GROUP {" +
+ " A = filter TESTDATA_FILTERED by (userid eq sessionid);" +
+ " C = distinct A.userid;" +
+ " generate group as testid, COUNT(TESTDATA_FILTERED) as counttestdata, COUNT(C) as distcount, SUM(TESTDATA_FILTERED.flag) as total_flags;" +
+ " }" +
+ "TESTDATA_AGG_1 = group TESTDATA_AGG ALL;" +
+ "TESTDATA_AGG_2 = foreach TESTDATA_AGG_1 generate COUNT(TESTDATA_AGG);" ;
+ Util.registerQuery(pigServer, query);
+ Iterator<Tuple> it = pigServer.openIterator("TESTDATA_AGG_2");
+
+ int i = 0;
+ while(it.hasNext()) {
+ Tuple actual = it.next();
+ assertEquals(20l, actual.get(0));
+ i++;
+ }
+ assertEquals(1, i);
+ Util.deleteFile(cluster, "testdata");
+ }
+
+ // test case where RelationToExprProject is present in the
+ // single inner plan of foreach - this will test that it does
+ // send an EOP eventually for each input of the foreach
+ @Test
+ public void testFilter1() throws IOException, ParseException {
+
+ String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"};
+ Util.createInputFile(cluster, "test.txt", inputData);
+ String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" +
+ "test2 = group test by col1;" +
+ "test3 = foreach test2 {" +
+ " filter_one = filter test by (col2==1);" +
+ " generate filter_one;};";
+ Util.registerQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("test3");
+ Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
+ expected.put((Tuple) Util.getPigConstant("({(1,1,3)})"), 0);
+ expected.put((Tuple) Util.getPigConstant("({(2,1,3),(2,1,3)})"), 0);
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(BagFactory.getInstance().newDefaultBag());
+ expected.put(t, 0);
+ int i = 0;
+ while(it.hasNext()) {
+ if(i == 3) {
+ fail("Got more tuples than expected!");
+ }
+ t = it.next();
+ assertTrue(expected.containsKey(t));
+ int occurences = expected.get(t);
+ occurences++;
+ expected.put(t, occurences);
+ i++;
+ }
+ for (Integer occurences : expected.values()) {
+ assertEquals(new Integer(1), occurences);
+ }
+ Util.deleteFile(cluster, "test.txt");
+ }
+
+ // test case where RelationToExprProject is present in a
+ // different inner plan along with another plan to project the group
+ // in foreach - this will test that reset() correctly resets
+ // the state that empty bags need to be sent on EOP if no non-EOP
+ // input has been seen on a fresh input from foreach.
+ @Test
+ public void testFilter2() throws IOException, ParseException {
+
+ String[] inputData = new String[] {"1\t1\t3","1\t2\t3", "2\t1\t3", "2\t1\t3", "3\t4\t4"};
+ Util.createInputFile(cluster, "test.txt", inputData);
+ String script = "test = load 'test.txt' as (col1: int, col2: int, col3: int);" +
+ "test2 = group test by col1;" +
+ "test3 = foreach test2 {" +
+ " filter_one = filter test by (col2==1);" +
+ " generate group, filter_one;};";
+ Util.registerQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("test3");
+ Map<Tuple, Integer> expected = new HashMap<Tuple, Integer>();
+ expected.put((Tuple) Util.getPigConstant("(1,{(1,1,3)})"), 0);
+ expected.put((Tuple) Util.getPigConstant("(2,{(2,1,3),(2,1,3)})"), 0);
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(new Integer(3));
+ t.append(BagFactory.getInstance().newDefaultBag());
+ expected.put(t, 0);
+ int i = 0;
+ while(it.hasNext()) {
+ if(i == 3) {
+ fail("Got more tuples than expected!");
+ }
+ t = it.next();
+ assertTrue(expected.containsKey(t));
+ int occurences = expected.get(t);
+ occurences++;
+ expected.put(t, occurences);
+ i++;
+ }
+ for (Integer occurences : expected.values()) {
+ assertEquals(new Integer(1), occurences);
+ }
+ Util.deleteFile(cluster, "test.txt");
+ }
+
+
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestUnion.java Wed Apr 22 00:34:21 2009
@@ -222,11 +222,12 @@
Util.createInputFile(cluster, "input1.txt", new String[] {"dummy"});
Util.createInputFile(cluster, "input2.txt", new String[] {"dummy"});
PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- Util.registerQuery(pig, "a = load 'input1.txt' ;" +
+ Util.registerQuery(pig, "a = load 'input1.txt';" +
"b = load 'input2.txt';" +
"c = foreach a generate 1, {(1, 'str1')};" +
"d = foreach b generate 2, {(2, 'str2')};" +
- "e = union c,d");
+ "e = union c,d;" +
+ "");
Iterator<Tuple> it = pig.openIterator("e");
Object[] expected = new Object[] { Util.getPigConstant("(1, {(1, 'str1')})"),
Util.getPigConstant("(2, {(2, 'str2')})")};
Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Wed Apr 22 00:34:21 2009
@@ -17,16 +17,22 @@
*/
package org.apache.pig.test;
+import static java.util.regex.Matcher.quoteReplacement;
+
+import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
+import java.io.FileReader;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import static java.util.regex.Matcher.quoteReplacement;
+
import junit.framework.Assert;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -38,13 +44,17 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.*;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.VisitorException;
public class Util {
private static BagFactory mBagFactory = BagFactory.getInstance();
@@ -254,6 +264,24 @@
Assert.assertEquals(expected, actual);
}
}
+
+ /**
+ * Utility method to copy a file form local filesystem to the dfs on
+ * the minicluster for testing in mapreduce mode
+ * @param cluster a reference to the minicluster
+ * @param localFileName the pathname of local file
+ * @param fileNameOnCluster the name with which the file should be created on the minicluster
+ * @throws IOException
+ */
+ static public void copyFromLocalToCluster(MiniCluster cluster, String localFileName, String fileNameOnCluster) throws IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(localFileName));
+ String line = null;
+ List<String> contents = new ArrayList<String>();
+ while((line = reader.readLine()) != null) {
+ contents.add(line);
+ }
+ Util.createInputFile(cluster, fileNameOnCluster, contents.toArray(new String[0]));
+ }
static public void printQueryOutput(Iterator<Tuple> actualResults,
Tuple[] expectedResults) {
@@ -342,9 +370,10 @@
}
public static void registerQuery(PigServer pigServer, String query) throws IOException {
- String[] queryLines = query.split(";");
- for (String line : queryLines) {
- pigServer.registerQuery(line + ";");
- }
+ File f = File.createTempFile("tmp", "");
+ PrintWriter pw = new PrintWriter(f);
+ pw.println(query);
+ pw.close();
+ pigServer.registerScript(f.getCanonicalPath());
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld?rev=767337&r1=767336&r2=767337&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/ComplexForeach.gld Wed Apr 22 00:34:21 2009
@@ -1,6 +1,6 @@
New For Each(false)[bag] - Test-Plan-Builder-22
| |
-| Project[bag][*] - Test-Plan-Builder-21
+| RelationToExpressionProject[bag][*] - Test-Plan-Builder-21
| |
| |---Filter[bag] - Test-Plan-Builder-17
| | |