You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/23 02:02:27 UTC
svn commit: r1304152 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/newplan/logical/relational/ src/org/apach...
Author: daijy
Date: Fri Mar 23 01:02:27 2012
New Revision: 1304152
URL: http://svn.apache.org/viewvc?rev=1304152&view=rev
Log:
PIG-1270: Push limit into loader
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar 23 01:02:27 2012
@@ -153,6 +153,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1270: Push limit into loader (daijy)
+
PIG-2589: Additional e2e test for 0.10 new features (daijy)
PIG-2182: Add more append support to DataByteArray (gsingers via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Mar 23 01:02:27 2012
@@ -341,6 +341,7 @@ public class JobControlCompiler{
ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
ArrayList<String> inpSignatureLists = new ArrayList<String>();
+ ArrayList<Long> inpLimits = new ArrayList<Long>();
ArrayList<POStore> storeLocations = new ArrayList<POStore>();
Path tmpLocation = null;
@@ -401,6 +402,7 @@ public class JobControlCompiler{
}
inpTargets.add(ldSucKeys);
inpSignatureLists.add(ld.getSignature());
+ inpLimits.add(ld.getLimit());
//Remove the POLoad from the plan
if (!pigContext.inIllustrator)
mro.mapPlan.remove(ld);
@@ -430,6 +432,7 @@ public class JobControlCompiler{
conf.set("pig.inputs", ObjectSerializer.serialize(inp));
conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
+ conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
// this is for unit tests since some don't create PigServer
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Mar 23 01:02:27 2012
@@ -116,7 +116,11 @@ public class PigInputFormat extends Inpu
InputFormat inputFormat = loadFunc.getInputFormat();
- return new PigRecordReader(inputFormat, pigSplit, loadFunc, context);
+ List<Long> inpLimitLists =
+ (ArrayList<Long>)ObjectSerializer.deserialize(
+ conf.get("pig.inpLimits"));
+
+ return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, inpLimitLists.get(pigSplit.getInputIndex()));
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri Mar 23 01:02:27 2012
@@ -84,6 +84,10 @@ public class PigRecordReader extends Rec
private TaskAttemptContext context;
+ private final long limit;
+
+ private long recordCount = 0;
+
/**
* the Configuration object with data specific to the input the underlying
* RecordReader will process (this is obtained after a
@@ -97,7 +101,7 @@ public class PigRecordReader extends Rec
*
*/
public PigRecordReader(InputFormat inputformat, PigSplit pigSplit,
- LoadFunc loadFunc, TaskAttemptContext context) throws IOException, InterruptedException {
+ LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
this.inputformat = inputformat;
this.pigSplit = pigSplit;
this.loadfunc = loadFunc;
@@ -106,6 +110,7 @@ public class PigRecordReader extends Rec
curReader = null;
progress = 0;
idx = 0;
+ this.limit = limit;
initNextRecordReader();
}
@@ -184,11 +189,14 @@ public class PigRecordReader extends Rec
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (limit != -1 && recordCount >= limit)
+ return false;
while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
if (!initNextRecordReader()) {
return false;
}
}
+ recordCount++;
return true;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Fri Mar 23 01:02:27 2012
@@ -211,6 +211,7 @@ public class SampleOptimizer extends MRO
FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc, rslargs));
POLoad newLoad = new POLoad(load.getOperatorKey(),load.getRequestedParallelism(), fs);
newLoad.setSignature(predLoad.getSignature());
+ newLoad.setLimit(predLoad.getLimit());
try {
mr.mapPlan.replace(load, newLoad);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Fri Mar 23 01:02:27 2012
@@ -62,6 +62,8 @@ public class POLoad extends PhysicalOper
// Alias for the POLoad
private String signature;
+ private long limit=-1;
+
public POLoad(OperatorKey k) {
this(k,-1, null);
}
@@ -229,4 +231,12 @@ public class POLoad extends PhysicalOper
} else
return (Tuple) out;
}
+
+ public long getLimit() {
+ return limit;
+ }
+
+ public void setLimit(long limit) {
+ this.limit = limit;
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Fri Mar 23 01:02:27 2012
@@ -49,6 +49,7 @@ public class LOLoad extends LogicalRelat
private LogicalSchema uidOnlySchema;
private String schemaFile = null;
private String signature = null;
+ private long limit = -1;
/**
*
@@ -282,4 +283,12 @@ public class LOLoad extends LogicalRelat
public LogicalSchema getScriptSchema() {
return scriptSchema;
}
+
+ public long getLimit() {
+ return limit;
+ }
+
+ public void setLimit(long limit) {
+ this.limit = limit;
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Mar 23 01:02:27 2012
@@ -127,6 +127,7 @@ public class LogToPhyTranslationVisitor
load.setPc(pc);
load.setResultType(DataType.BAG);
load.setSignature(loLoad.getSignature());
+ load.setLimit(loLoad.getLimit());
currentPlan.add(load);
logToPhyMap.put(loLoad, load);
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java Fri Mar 23 01:02:27 2012
@@ -75,8 +75,7 @@ public class LimitOptimizer extends Rule
// Limit cannot be pushed up
if (pred instanceof LOCogroup || pred instanceof LOFilter
- || pred instanceof LOLoad || pred instanceof LOSplit
- || pred instanceof LODistinct || pred instanceof LOJoin) {
+ || pred instanceof LOSplit || pred instanceof LODistinct || pred instanceof LOJoin) {
return false;
}
@@ -173,6 +172,14 @@ public class LimitOptimizer extends Rule
// remove the limit
currentPlan.removeAndReconnect(limit);
+ } else if (pred instanceof LOLoad) {
+ // Push limit to load
+ LOLoad load = (LOLoad) pred;
+ if (load.getLimit() == -1)
+ load.setLimit(limit.getLimit());
+ else
+ load.setLimit(load.getLimit() < limit.getLimit() ? load
+ .getLimit() : limit.getLimit());
} else if (pred instanceof LOLimit) {
// Limit is merged into another LOLimit
LOLimit beforeLimit = (LOLimit) pred;
Modified: pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java Fri Mar 23 01:02:27 2012
@@ -25,6 +25,7 @@ import java.util.*;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOStore;
@@ -104,6 +105,7 @@ public class TestOptimizeLimit {
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan2.dot");
+ Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 10);
}
@Test
@@ -138,6 +140,7 @@ public class TestOptimizeLimit {
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan5.dot");
+ Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 100);
}
@Test
@@ -150,6 +153,7 @@ public class TestOptimizeLimit {
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan6.dot");
+ Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 20);
}
@Test
@@ -194,6 +198,7 @@ public class TestOptimizeLimit {
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
optimizePlan(newLogicalPlan);
compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan10.dot");
+ Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 100);
}
@Test