You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/01/11 03:54:54 UTC
svn commit: r1723975 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Author: rohini
Date: Mon Jan 11 02:54:54 2016
New Revision: 1723975
URL: http://svn.apache.org/viewvc?rev=1723975&view=rev
Log:
PIG-4775: Better default values for shuffle bytes per reducer (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 02:54:54 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4775: Better default values for shuffle bytes per reducer (rohini)
+
PIG-4753: Pigmix should have option to delete outputs after completing the tests (mitdesai via rohini)
PIG-4744: Honor tez.staging-dir setting in tez-site.xml (rohini via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1723975&r1=1723974&r2=1723975&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Jan 11 02:54:54 2016
@@ -165,6 +165,9 @@ import org.apache.tez.runtime.library.in
public class TezDagBuilder extends TezOpPlanVisitor {
private static final Log log = LogFactory.getLog(TezDagBuilder.class);
+ private static long SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT = 384 * 1024 * 1024L;
+ private static long SHUFFLE_BYTES_PER_REDUCER_DEFAULT = 256 * 1024 * 1024L;
+
private DAG dag;
private Map<String, LocalResource> localResources;
private PigContext pc;
@@ -705,17 +708,25 @@ public class TezDagBuilder extends TezOp
vmPluginName = ShuffleVertexManager.class.getName();
}
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
- if (stores.size() <= 0) {
- // Intermediate reduce. Set the bytes per reducer to be block size.
- vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
- intermediateTaskInputSize);
- } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
- InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
- InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
- vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
- vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
- InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+ // For Intermediate reduce, set the bytes per reducer to be block size.
+ long bytesPerReducer = intermediateTaskInputSize;
+ // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
+ // If not as default use 384MB for group bys and 256 MB for joins. Not using
+ // default 1G as that value was suited for mapreduce logic where numReducers=(map input size/bytesPerReducer).
+ // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
+ // as map input sizes are mostly always high compared to map output.
+ if (stores.size() > 0) {
+ if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+ bytesPerReducer = vmPluginConf.getLong(
+ InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+ } else if (tezOp.isGroupBy()) {
+ bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
+ } else {
+ bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
+ }
}
+ vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
}
}