You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/05/27 04:07:06 UTC

[GitHub] [hive] zabetak opened a new pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

zabetak opened a new pull request #1035:
URL: https://github.com/apache/hive/pull/1035


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] zabetak commented on a change in pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

Posted by GitBox <gi...@apache.org>.
zabetak commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r433508844



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -197,6 +186,27 @@ public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int
     return true;
   }
 
+  private static long estimateReducers(HiveConf conf, ReduceSinkOperator rs) {
+    // TODO: Check if we can somehow exploit the logic in SetReducerParallelism
+    if (rs.getConf().getNumReducers() > 0) {
+      return rs.getConf().getNumReducers();
+    }
+    int constantReducers = conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+    if (constantReducers > 0) {
+      return constantReducers;
+    }
+    long inputTotalBytes = 0;
+    List<Operator<?>> rsSiblings = rs.getChildOperators().get(0).getParentOperators();
+    for (Operator<?> sibling : rsSiblings) {
+      if (sibling.getStatistics() != null) {

Review comment:
       In the caller of this method, before checking if the new parallelism is very low, we are checking if the parallelism changes (`newParallelism < oldParallelism`). If the parallelism does not change then we proceed in the deduplication as usual. If we don't have stats then the snippet below will return 1. The same will happen for both parent RS and child RS so the parallelism does not change since `1 < 1` returns false. 
   
   If you prefer to have an explicit check for the presence/absence of stats I can try to add it. Let me know.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] asfgit closed pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #1035:
URL: https://github.com/apache/hive/pull/1035


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] jcamachor commented on a change in pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

Posted by GitBox <gi...@apache.org>.
jcamachor commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r431276400



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR
    * If parent RS has not been assigned any partitioning column, we will use
    * partitioning columns (if exist) of child RS.
    */
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
       throws SemanticException {
     int[] result = extractMergeDirections(cRS, pRS, minReducer);
     if (result == null) {
       return false;
     }
 
+    // The partitioning columns of the child RS will replace the columns of the
+    // parent RS in two cases:
+    // - Parent RS columns are more specific than those of the child RS,
+    // and child columns are assigned;
+    // - Child RS columns are more specific than those of the parent RS,
+    // and parent columns are not assigned.
+    List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+    List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
+    boolean useChildsPartitionColumns =
+        result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+        result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+    if (useChildsPartitionColumns) {
+      List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+      long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+      long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
+      long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+      if (oldParallelism / newParallelism > threshold) {
+        return false;
+      }

Review comment:
       I think you are right, adding the check using the existing config seems to be the correct approach. We could still add on/off config for the new behavior optimization (default true... but in case we need to disable it). Could you make those changes?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR
    * If parent RS has not been assigned any partitioning column, we will use
    * partitioning columns (if exist) of child RS.
    */
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
       throws SemanticException {
     int[] result = extractMergeDirections(cRS, pRS, minReducer);
     if (result == null) {
       return false;
     }
 
+    // The partitioning columns of the child RS will replace the columns of the
+    // parent RS in two cases:
+    // - Parent RS columns are more specific than those of the child RS,
+    // and child columns are assigned;
+    // - Child RS columns are more specific than those of the parent RS,
+    // and parent columns are not assigned.
+    List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+    List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
+    boolean useChildsPartitionColumns =
+        result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+        result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+    if (useChildsPartitionColumns) {
+      List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+      long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+      long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
+      long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+      if (oldParallelism / newParallelism > threshold) {
+        return false;

Review comment:
       Do you think it makes sense to add these checks to the `extractMergeDirections` method? It seems the rest of checks are done within that method; if `extractMergeDirections` was successful, this method was only modifying the operators accordingly. I think keeping that separation may make the code more clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] zabetak commented on a change in pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

Posted by GitBox <gi...@apache.org>.
zabetak commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r431170080



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR
    * If parent RS has not been assigned any partitioning column, we will use
    * partitioning columns (if exist) of child RS.
    */
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
       throws SemanticException {
     int[] result = extractMergeDirections(cRS, pRS, minReducer);
     if (result == null) {
       return false;
     }
 
+    // The partitioning columns of the child RS will replace the columns of the
+    // parent RS in two cases:
+    // - Parent RS columns are more specific than those of the child RS,
+    // and child columns are assigned;
+    // - Child RS columns are more specific than those of the parent RS,
+    // and parent columns are not assigned.
+    List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+    List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
+    boolean useChildsPartitionColumns =
+        result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+        result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+    if (useChildsPartitionColumns) {
+      List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+      long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+      long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
+      long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+      if (oldParallelism / newParallelism > threshold) {
+        return false;
+      }

Review comment:
       If we don't care about comparing parallelism before/after we could possibly use the existing `hive.optimize.reducededuplication.min.reducer` config parameter and not introduce a new one. 
   ```
   long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
   if (newParallelism < minReducer) {
     return false;
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] jcamachor commented on a change in pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

Posted by GitBox <gi...@apache.org>.
jcamachor commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r433349779



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -197,6 +186,27 @@ public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int
     return true;
   }
 
+  private static long estimateReducers(HiveConf conf, ReduceSinkOperator rs) {
+    // TODO: Check if we can somehow exploit the logic in SetReducerParallelism
+    if (rs.getConf().getNumReducers() > 0) {
+      return rs.getConf().getNumReducers();
+    }
+    int constantReducers = conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+    if (constantReducers > 0) {
+      return constantReducers;
+    }
+    long inputTotalBytes = 0;
+    List<Operator<?>> rsSiblings = rs.getChildOperators().get(0).getParentOperators();
+    for (Operator<?> sibling : rsSiblings) {
+      if (sibling.getStatistics() != null) {

Review comment:
       I am wondering, if you do not have stats available for any of the siblings, we should probably skip the parallelism check, and thus, fallback to previous behavior.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org