You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/11 03:54:54 UTC

svn commit: r1723975 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Author: rohini
Date: Mon Jan 11 02:54:54 2016
New Revision: 1723975

URL: http://svn.apache.org/viewvc?rev=1723975&view=rev
Log:
PIG-4775: Better default values for shuffle bytes per reducer (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 02:54:54 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4775: Better default values for shuffle bytes per reducer (rohini)
+
 PIG-4753: Pigmix should have option to delete outputs after completing the tests (mitdesai via rohini)
 
 PIG-4744: Honor tez.staging-dir setting in tez-site.xml (rohini via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Jan 11 02:54:54 2016
@@ -165,6 +165,9 @@ import org.apache.tez.runtime.library.in
 public class TezDagBuilder extends TezOpPlanVisitor {
     private static final Log log = LogFactory.getLog(TezDagBuilder.class);
 
+    private static long SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT = 384 * 1024 * 1024L;
+    private static long SHUFFLE_BYTES_PER_REDUCER_DEFAULT = 256 * 1024 * 1024L;
+
     private DAG dag;
     private Map<String, LocalResource> localResources;
     private PigContext pc;
@@ -705,17 +708,25 @@ public class TezDagBuilder extends TezOp
                         vmPluginName = ShuffleVertexManager.class.getName();
                     }
                     vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-                    if (stores.size() <= 0) {
-                        // Intermediate reduce. Set the bytes per reducer to be block size.
-                        vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                        intermediateTaskInputSize);
-                    } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
-                                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
-                        vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                        InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+                    // For Intermediate reduce, set the bytes per reducer to be block size.
+                    long bytesPerReducer = intermediateTaskInputSize;
+                    // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
+                    // If not as default use 384MB for group bys and 256 MB for joins. Not using
+                    // default 1G as that value was suited for mapreduce logic where numReducers=(map input size/bytesPerReducer).
+                    // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
+                    // as map input sizes are mostly always high compared to map output.
+                    if (stores.size() > 0) {
+                        if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+                            bytesPerReducer = vmPluginConf.getLong(
+                                            InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                            InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+                        } else if (tezOp.isGroupBy()) {
+                            bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
+                        } else {
+                            bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
+                        }
                     }
+                    vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
                     log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                 }
             }