You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/05/27 17:18:41 UTC

[GitHub] [hive] jcamachor commented on a change in pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

jcamachor commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r431276400



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR
    * If parent RS has not been assigned any partitioning column, we will use
    * partitioning columns (if exist) of child RS.
    */
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
       throws SemanticException {
     int[] result = extractMergeDirections(cRS, pRS, minReducer);
     if (result == null) {
       return false;
     }
 
+    // The partitioning columns of the child RS will replace the columns of the
+    // parent RS in two cases:
+    // - Parent RS columns are more specific than those of the child RS,
+    // and child columns are assigned;
+    // - Child RS columns are more specific than those of the parent RS,
+    // and parent columns are not assigned.
+    List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+    List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
+    boolean useChildsPartitionColumns =
+        result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+        result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+    if (useChildsPartitionColumns) {
+      List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+      long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+      long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
+      long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+      if (oldParallelism / newParallelism > threshold) {
+        return false;
+      }

Review comment:
       I think you are right, adding the check using the existing config seems to be the correct approach. We could still add on/off config for the new behavior optimization (default true... but in case we need to disable it). Could you make those changes?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR
    * If parent RS has not been assigned any partitioning column, we will use
    * partitioning columns (if exist) of child RS.
    */
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
       throws SemanticException {
     int[] result = extractMergeDirections(cRS, pRS, minReducer);
     if (result == null) {
       return false;
     }
 
+    // The partitioning columns of the child RS will replace the columns of the
+    // parent RS in two cases:
+    // - Parent RS columns are more specific than those of the child RS,
+    // and child columns are assigned;
+    // - Child RS columns are more specific than those of the parent RS,
+    // and parent columns are not assigned.
+    List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+    List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
+    boolean useChildsPartitionColumns =
+        result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+        result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+    if (useChildsPartitionColumns) {
+      List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+      long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+      long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
+      long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+      if (oldParallelism / newParallelism > threshold) {
+        return false;

Review comment:
       Do you think it makes sense to add these checks to the `extractMergeDirections` method? It seems the rest of checks are done within that method; if `extractMergeDirections` was successful, this method was only modifying the operators accordingly. I think keeping that separation may make the code more clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org