You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/04/13 01:38:28 UTC

svn commit: r1738879 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ test/org/apache...

Author: rohini
Date: Tue Apr 12 23:38:28 2016
New Revision: 1738879

URL: http://svn.apache.org/viewvc?rev=1738879&view=rev
Log:
PIG-4868: Low values for bytes.per.reducer configured by user not honored in Tez for inputs (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr 12 23:38:28 2016
@@ -109,6 +109,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4868: Low values for bytes.per.reducer configured by user not honored in Tez for inputs (rohini)
+
 PIG-4869: Removing unwanted configuration in Tez broke ConfiguredFailoverProxyProvider (rohini)
 
 PIG-4867: -stop_on_failure does not work with Tez (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Tue Apr 12 23:38:28 2016
@@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i
         return reducers;
     }
 
-    static long getTotalInputFileSize(Configuration conf,
+    public static long getTotalInputFileSize(Configuration conf,
             List<POLoad> lds, Job job) throws IOException {
         return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
     }
@@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i
     /**
      * Get the input size for as many inputs as possible. Inputs that do not report
      * their size nor can pig look that up itself are excluded from this size.
-     * 
+     *
      * @param conf Configuration
      * @param lds List of POLoads
      * @param job Job

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Tue Apr 12 23:38:28 2016
@@ -235,6 +235,7 @@ public class TezOperator extends Operato
     }
 
     private LoaderInfo loaderInfo = new LoaderInfo();
+    private long totalInputFilesSize = -1;
 
     public TezOperator(OperatorKey k) {
         super(k);
@@ -651,6 +652,14 @@ public class TezOperator extends Operato
         return loaderInfo;
     }
 
+    public long getTotalInputFilesSize() {
+        return totalInputFilesSize;
+    }
+
+    public void setTotalInputFilesSize(long totalInputFilesSize) {
+        this.totalInputFilesSize = totalInputFilesSize;
+    }
+
     public void setUseGraceParallelism(boolean useGraceParallelism) {
         this.useGraceParallelism = useGraceParallelism;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Tue Apr 12 23:38:28 2016
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -175,6 +176,7 @@ public class LoaderProcessor extends Tez
             // splits can be moved to if(loads) block below
             int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
             tezOp.setRequestedParallelism(parallelism);
+            tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
         }
         return lds;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Tue Apr 12 23:38:28 2016
@@ -62,7 +62,6 @@ import org.apache.tez.dag.api.EdgeProper
  */
 public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
 
-    static private int maxTaskCount;
     static final double DEFAULT_FLATTEN_FACTOR = 10;
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
@@ -76,6 +75,8 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
 
     private PigContext pc;
+    private int maxTaskCount;
+    private long bytesPerReducer;
 
     @Override
     public void setPigContext(PigContext pc) {
@@ -94,6 +95,8 @@ public class TezOperDependencyParallelis
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
+        bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+
         // If parallelism is set explicitly, respect it
         if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
             return tezOper.getRequestedParallelism();
@@ -130,6 +133,12 @@ public class TezOperDependencyParallelis
                 boolean applyFactor = !tezOper.isUnion();
                 if (!pred.isVertexGroup() && applyFactor) {
                     predParallelism = predParallelism * pred.getParallelismFactor(tezOper);
+                    if (pred.getTotalInputFilesSize() > 0) {
+                        // Estimate similar to mapreduce and use the maximum of two
+                        int parallelismBySize = (int) Math.ceil((double) pred
+                                .getTotalInputFilesSize() / bytesPerReducer);
+                        predParallelism = Math.max(predParallelism, parallelismBySize);
+                    }
                 }
                 estimatedParallelism += predParallelism;
             }
@@ -156,12 +165,6 @@ public class TezOperDependencyParallelis
             roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
         }
 
-        if (roundedEstimatedParallelism == 0) {
-            throw new IOException("Estimated parallelism for "
-                    + tezOper.getOperatorKey().toString()
-                    + " is 0 which is unexpected");
-        }
-
         return roundedEstimatedParallelism;
     }
 

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1738879&r1=1738878&r2=1738879&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Tue Apr 12 23:38:28 2016
@@ -149,6 +149,43 @@ public class TestTezAutoParallelism {
             }
         });
         assertEquals(files.length, 1);
+        fs.delete(new Path("output1"), true);
+    }
+
+    @Test
+    public void testBytesPerReducer() throws IOException{
+
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class);
+        try {
+            pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("B = group A by name;");
+            pigServer.store("B", "output1");
+            FileSystem fs = cluster.getFileSystem();
+            FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
+                @Override
+                public boolean accept(Path path) {
+                    if (path.getName().startsWith("part")) {
+                        return true;
+                    }
+                    return false;
+                }
+            });
+            assertEquals(files.length, 10);
+            String log = writer.toString();
+            assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
+            assertTrue(log.contains("For vertex - scope-14: parallelism=10"));
+        } finally {
+            Util.removeLogAppender("testAutoParallelism", TezDagBuilder.class);
+            Util.deleteFile(cluster, "output1");
+        }
     }
 
     @Test