You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/23 02:02:27 UTC

svn commit: r1304152 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/newplan/logical/relational/ src/org/apach...

Author: daijy
Date: Fri Mar 23 01:02:27 2012
New Revision: 1304152

URL: http://svn.apache.org/viewvc?rev=1304152&view=rev
Log:
PIG-1270: Push limit into loader

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar 23 01:02:27 2012
@@ -153,6 +153,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1270: Push limit into loader (daijy)
+
 PIG-2589: Additional e2e test for 0.10 new features (daijy)
 
 PIG-2182: Add more append support to DataByteArray (gsingers via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Mar 23 01:02:27 2012
@@ -341,6 +341,7 @@ public class JobControlCompiler{
         ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
         ArrayList<String> inpSignatureLists = new ArrayList<String>();
+        ArrayList<Long> inpLimits = new ArrayList<Long>();
         ArrayList<POStore> storeLocations = new ArrayList<POStore>();
         Path tmpLocation = null;
         
@@ -401,6 +402,7 @@ public class JobControlCompiler{
                     }
                     inpTargets.add(ldSucKeys);
                     inpSignatureLists.add(ld.getSignature());
+                    inpLimits.add(ld.getLimit());
                     //Remove the POLoad from the plan
                     if (!pigContext.inIllustrator)
                         mro.mapPlan.remove(ld);
@@ -430,6 +432,7 @@ public class JobControlCompiler{
             conf.set("pig.inputs", ObjectSerializer.serialize(inp));
             conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
             conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
+            conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
             conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
             conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
             // this is for unit tests since some don't create PigServer

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Mar 23 01:02:27 2012
@@ -116,7 +116,11 @@ public class PigInputFormat extends Inpu
         
         InputFormat inputFormat = loadFunc.getInputFormat();
         
-        return new PigRecordReader(inputFormat, pigSplit, loadFunc, context);
+        List<Long> inpLimitLists = 
+                (ArrayList<Long>)ObjectSerializer.deserialize(
+                        conf.get("pig.inpLimits"));
+        
+        return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, inpLimitLists.get(pigSplit.getInputIndex()));
     }
     
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri Mar 23 01:02:27 2012
@@ -84,6 +84,10 @@ public class PigRecordReader extends Rec
     
     private TaskAttemptContext context;
     
+    private final long limit;
+
+    private long recordCount = 0;
+    
     /**
      * the Configuration object with data specific to the input the underlying
      * RecordReader will process (this is obtained after a 
@@ -97,7 +101,7 @@ public class PigRecordReader extends Rec
      * 
      */
     public PigRecordReader(InputFormat inputformat, PigSplit pigSplit, 
-            LoadFunc loadFunc, TaskAttemptContext context) throws IOException, InterruptedException {
+            LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
         this.inputformat = inputformat;
         this.pigSplit = pigSplit; 
         this.loadfunc = loadFunc;
@@ -106,6 +110,7 @@ public class PigRecordReader extends Rec
         curReader = null;
         progress = 0;
         idx = 0;
+        this.limit = limit;
         initNextRecordReader();
     }
     
@@ -184,11 +189,14 @@ public class PigRecordReader extends Rec
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (limit != -1 && recordCount >= limit)
+            return false;
         while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
             if (!initNextRecordReader()) {
               return false;
             }
         }
+        recordCount++;
         return true;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SampleOptimizer.java Fri Mar 23 01:02:27 2012
@@ -211,6 +211,7 @@ public class SampleOptimizer extends MRO
         FileSpec fs = new FileSpec(predFs.getFileName(),new FuncSpec(loadFunc, rslargs));
         POLoad newLoad = new POLoad(load.getOperatorKey(),load.getRequestedParallelism(), fs);
         newLoad.setSignature(predLoad.getSignature());
+        newLoad.setLimit(predLoad.getLimit());
         try {
             mr.mapPlan.replace(load, newLoad);
             

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Fri Mar 23 01:02:27 2012
@@ -62,6 +62,8 @@ public class POLoad extends PhysicalOper
     // Alias for the POLoad
     private String signature;
     
+    private long limit=-1;
+    
     public POLoad(OperatorKey k) {
         this(k,-1, null);
     }
@@ -229,4 +231,12 @@ public class POLoad extends PhysicalOper
         } else
           return (Tuple) out;
     }
+
+    public long getLimit() {
+        return limit;
+    }
+
+    public void setLimit(long limit) {
+        this.limit = limit;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Fri Mar 23 01:02:27 2012
@@ -49,6 +49,7 @@ public class LOLoad extends LogicalRelat
     private LogicalSchema uidOnlySchema;
     private String schemaFile = null;
     private String signature = null;
+    private long limit = -1;
 
     /**
      * 
@@ -282,4 +283,12 @@ public class LOLoad extends LogicalRelat
     public LogicalSchema getScriptSchema() {
         return scriptSchema;
     }
+
+    public long getLimit() {
+        return limit;
+    }
+
+    public void setLimit(long limit) {
+        this.limit = limit;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Mar 23 01:02:27 2012
@@ -127,6 +127,7 @@ public class LogToPhyTranslationVisitor 
         load.setPc(pc);
         load.setResultType(DataType.BAG);
         load.setSignature(loLoad.getSignature());
+        load.setLimit(loLoad.getLimit());
         currentPlan.add(load);
         logToPhyMap.put(loLoad, load);
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java Fri Mar 23 01:02:27 2012
@@ -75,8 +75,7 @@ public class LimitOptimizer extends Rule
 
             // Limit cannot be pushed up
             if (pred instanceof LOCogroup || pred instanceof LOFilter
-                    || pred instanceof LOLoad || pred instanceof LOSplit
-                    || pred instanceof LODistinct || pred instanceof LOJoin) {
+                    || pred instanceof LOSplit || pred instanceof LODistinct || pred instanceof LOJoin) {
                 return false;
             }
 
@@ -173,6 +172,14 @@ public class LimitOptimizer extends Rule
 
                 // remove the limit
                 currentPlan.removeAndReconnect(limit);
+            } else if (pred instanceof LOLoad) {
+                // Push limit to load
+                LOLoad load = (LOLoad) pred;
+                if (load.getLimit() == -1)
+                    load.setLimit(limit.getLimit());
+                else
+                    load.setLimit(load.getLimit() < limit.getLimit() ? load
+                            .getLimit() : limit.getLimit());
             } else if (pred instanceof LOLimit) {
                 // Limit is merged into another LOLimit
                 LOLimit beforeLimit = (LOLimit) pred;

Modified: pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java?rev=1304152&r1=1304151&r2=1304152&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestOptimizeLimit.java Fri Mar 23 01:02:27 2012
@@ -25,6 +25,7 @@ import java.util.*;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOStore;
@@ -104,6 +105,7 @@ public class TestOptimizeLimit {
 	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
 	    optimizePlan(newLogicalPlan);
 	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan2.dot");
+        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 10);
 	}
 
 	@Test
@@ -138,6 +140,7 @@ public class TestOptimizeLimit {
 	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
 	    optimizePlan(newLogicalPlan);
         compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan5.dot");
+        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 100);
     }
 	
     @Test
@@ -150,6 +153,7 @@ public class TestOptimizeLimit {
 	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
 	    optimizePlan(newLogicalPlan);
 	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan6.dot");
+        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 20);
 	}
     
     @Test
@@ -194,6 +198,7 @@ public class TestOptimizeLimit {
 	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
 	    optimizePlan(newLogicalPlan);
         compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan10.dot");
+        Assert.assertTrue(((LOLoad) newLogicalPlan.getSources().get(0)).getLimit() == 100);
     }
 
     @Test