You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/05/05 17:49:58 UTC
svn commit: r771844 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/impl/logicalLay...
Author: gates
Date: Tue May 5 15:49:58 2009
New Revision: 771844
URL: http://svn.apache.org/viewvc?rev=771844&view=rev
Log:
PIG-741 Allow limit to be nested in foreach (gates).
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue May 5 15:49:58 2009
@@ -34,6 +34,8 @@
PIG-775: PORelationToExprProject should create a NonSpillableDataBag to create
empty bags (pradeepkth)
+PIG-741: Allow limit to be nested in a foreach.
+
PIG-743: To implement clover (gkesavan)
PIG-701: Implement IVY for resolving pig dependencies (gkesavan)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue May 5 15:49:58 2009
@@ -185,7 +185,7 @@
* Starts batch execution mode.
*/
public void setBatchOn() {
- log.info("Create a new graph.");
+ log.debug("Create a new graph.");
if (currDAG != null) {
graphs.push(currDAG);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Tue May 5 15:49:58 2009
@@ -38,6 +38,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
@@ -762,6 +763,10 @@
sawNonAlgebraic = true;
}
+ public void visitLimit(POLimit limit) throws VisitorException {
+ sawNonAlgebraic = true;
+ }
+
private boolean checkSuccessorIsLeaf(PhysicalOperator leaf, PhysicalOperator opToCheck) {
List<PhysicalOperator> succs = mPlan.getSuccessors(opToCheck);
if(succs.size() == 1) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Tue May 5 15:49:58 2009
@@ -94,9 +94,7 @@
res.result = it.next();
if (res.result == null){
res.returnStatus = POStatus.STATUS_EOP;
- inputsAccumulated = false;
- distinctBag = null;
- it = null;
+ reset();
} else {
res.returnStatus = POStatus.STATUS_OK;
}
@@ -119,6 +117,13 @@
}
@Override
+ public void reset() {
+ inputsAccumulated = false;
+ distinctBag = null;
+ it = null;
+ }
+
+ @Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitDistinct(this);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Tue May 5 15:49:58 2009
@@ -201,7 +201,6 @@
for (PhysicalOperator po : opsToBeReset) {
po.reset();
}
-
res = processPlan();
processingPlan = true;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Tue May 5 15:49:58 2009
@@ -29,6 +29,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -115,4 +116,18 @@
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitLimit(this);
}
+
+ @Override
+ public void reset() {
+ soFar = 0;
+ }
+
+ @Override
+ public POLimit clone() throws CloneNotSupportedException {
+ POLimit newLimit = new POLimit(new OperatorKey(this.mKey.scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
+ this.requestedParallelism, this.inputs);
+ newLimit.mLimit = this.mLimit;
+ return newLimit;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Tue May 5 15:49:58 2009
@@ -282,9 +282,7 @@
res.returnStatus = POStatus.STATUS_OK;
} else {
res.returnStatus = POStatus.STATUS_EOP;
- inputsAccumulated = false;
- sortedBag = null;
- it = null;
+ reset();
}
return res;
}
@@ -307,6 +305,13 @@
v.visitSort(this);
}
+ @Override
+ public void reset() {
+ inputsAccumulated = false;
+ sortedBag = null;
+ it = null;
+ }
+
public List<PhysicalPlan> getSortPlans() {
return sortPlans;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue May 5 15:49:58 2009
@@ -1278,7 +1278,7 @@
}
{
(
- input = NestedExpr(lp) {log.debug("Filter input: " + input);}
+ input = NestedExpr(lp) {log.debug("Limit input: " + input);}
t = <INTEGER>
)
{
@@ -2184,6 +2184,7 @@
| item = NestedFilter(over,specs,lp, input)
| item = NestedSortOrArrange(over,specs,lp, input)
| item = NestedDistinct(over,specs,lp, input)
+| item = NestedLimit(over,specs,lp, input)
)
)
{
@@ -2411,7 +2412,40 @@
}
}
-
+LogicalOperator NestedLimit(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
+{
+ LogicalOperator eOp;
+ Schema subSchema = null;
+ Token t;
+ log.trace("Entering LimitClause");
+}
+{
+ (
+ <LIMIT>
+ (
+ LOOKAHEAD(NestedProject(over, specs, lp, input)) eOp = NestedProject(over, specs, lp, input)
+| LOOKAHEAD({ null != specs.get(getToken(1).image) }) t = <IDENTIFIER> {eOp = specs.get(t.image);}
+| eOp = BaseEvalSpec(over, specs, lp, input)
+ )
+ {subSchema = eOp.getSchema();}
+ t = <INTEGER>
+ )
+ {
+ lp.add(eOp);
+ log.debug("Added " + eOp.getAlias() + " to the logical plan");
+ long l = Integer.parseInt(t.image);
+ LogicalOperator limit = new LOLimit(lp, new OperatorKey(scope, getNextId()), l);
+ lp.add(limit);
+ log.debug("Added operator " + limit.getClass().getName() + " to the logical plan");
+
+ lp.connect(eOp, limit);
+ log.debug("Connected the limit input to the limit");
+
+ log.trace("Exiting NestedLimit");
+ return limit;
+ }
+}
+
LogicalOperator GenerateStatement(Schema over, Map<String, LogicalOperator> specs, LogicalPlan lp, LogicalOperator input):
{
LogicalOperator spec = null;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=771844&r1=771843&r2=771844&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Tue May 5 15:49:58 2009
@@ -27,8 +27,7 @@
import java.util.Iterator;
import java.util.Random;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
import java.text.DecimalFormat;
public class TestForEachNestedPlanLocal extends TestCase {
@@ -65,6 +64,30 @@
}
}
+ @Test
+ public void testInnerLimit() throws Exception {
+ File tmpFile = genDataSetFileOneGroup();
+ pig.registerQuery("a = load '" + Util.generateURI(tmpFile.toString()) + "'; ");
+ pig.registerQuery("b = group a by $0; ");
+ pig.registerQuery("c = foreach b { " + " c1 = limit $1 5; "
+ + " generate COUNT(c1); " + "};");
+ Iterator<Tuple> it = pig.openIterator("c");
+ Tuple t = null;
+ long count[] = new long[3];
+ for (int i = 0; i < 3 && it.hasNext(); i++) {
+ t = it.next();
+ count[i] = (Long)t.get(0);
+ }
+
+ Assert.assertFalse(it.hasNext());
+
+ Assert.assertEquals(3L, count[0]);
+ Assert.assertEquals(5L, count[1]);
+ Assert.assertEquals(5L, count[2]);
+ }
+
+
+
/*
@Test
@@ -113,4 +136,34 @@
return TestHelper.createTempFile(data) ;
}
+
+ private File genDataSetFileOneGroup() throws IOException {
+
+ File fp1 = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(fp1));
+
+ ps.println("lost\tjack");
+ ps.println("lost\tkate");
+ ps.println("lost\tsawyer");
+ ps.println("lost\tdesmond");
+ ps.println("lost\thurley");
+ ps.println("lost\tlocke");
+ ps.println("lost\tsun");
+ ps.println("lost\tcharlie");
+ ps.println("lost\tjin");
+ ps.println("lost\tben");
+ ps.println("lotr\tfrodo");
+ ps.println("lotr\tsam");
+ ps.println("lotr\tmerry");
+ ps.println("lotr\tpippen");
+ ps.println("lotr\tbilbo");
+ ps.println("3stooges\tlarry");
+ ps.println("3stooges\tmoe");
+ ps.println("3stooges\tcurly");
+
+ ps.close();
+
+ return fp1;
+ }
+
}