You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/05/27 14:21:20 UTC

[GitHub] [hive] zabetak commented on a change in pull request #1035: HIVE-23365: Put RS deduplication optimization under cost based decision

zabetak commented on a change in pull request #1035:
URL: https://github.com/apache/hive/pull/1035#discussion_r431170080



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplicationUtils.java
##########
@@ -113,13 +116,36 @@ public static boolean merge(ReduceSinkOperator cRS, JoinOperator pJoin, int minR
    * If parent RS has not been assigned any partitioning column, we will use
    * partitioning columns (if exist) of child RS.
    */
-  public static boolean merge(ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
+  public static boolean merge(HiveConf hiveConf, ReduceSinkOperator cRS, ReduceSinkOperator pRS, int minReducer)
       throws SemanticException {
     int[] result = extractMergeDirections(cRS, pRS, minReducer);
     if (result == null) {
       return false;
     }
 
+    // The partitioning columns of the child RS will replace the columns of the
+    // parent RS in two cases:
+    // - Parent RS columns are more specific than those of the child RS,
+    // and child columns are assigned;
+    // - Child RS columns are more specific than those of the parent RS,
+    // and parent columns are not assigned.
+    List<ExprNodeDesc> childPCs = cRS.getConf().getPartitionCols();
+    List<ExprNodeDesc> parentPCs = pRS.getConf().getPartitionCols();
+    boolean useChildsPartitionColumns =
+        result[1] < 0 && (childPCs != null && !childPCs.isEmpty()) ||
+        result[1] > 0 && (parentPCs == null || parentPCs.isEmpty());
+
+    if (useChildsPartitionColumns) {
+      List<ExprNodeDesc> newPartitionCols = ExprNodeDescUtils.backtrack(childPCs, cRS, pRS);
+      long oldParallelism = estimateMaxPartitions(hiveConf, pRS, parentPCs);
+      long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
+      long threshold = hiveConf.getLongVar(HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATIONPARALLELISMDECTHRESHOLD);
+      if (oldParallelism / newParallelism > threshold) {
+        return false;
+      }

Review comment:
       If we don't care about comparing parallelism before/after we could possibly use the existing `hive.optimize.reducededuplication.min.reducer` config parameter and not introduce a new one. 
   ```
   long newParallelism = estimateMaxPartitions(hiveConf, pRS, newPartitionCols);
   if (newParallelism < minReducer) {
     return false;
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org