You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/21 09:46:36 UTC

[GitHub] [hudi] boneanxs commented on a diff in pull request #6725: [HUDI-4881] Push down filters if possible when syncing partitions to Hive

boneanxs commented on code in PR #6725:
URL: https://github.com/apache/hudi/pull/6725#discussion_r1001578617


##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {

Review Comment:
   Make sense, will change this these days.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        .loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+    List<List<String>> partitionVals = writtenPartitionsSince
+        .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+        .filter(values -> !values.isEmpty())
+        .collect(Collectors.toList());
+
+    if (partitionVals.isEmpty()) {
+      // No partition is written
+      return Collections.emptyList();
+    }
+
+    int estimateSize = partitionKeys.size() * partitionVals.size();
+    if (estimateSize > config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {

Review Comment:
   If there are too many filters pushing down to the backend hive metastore server, it could cause HMS throw stackoverflow error, so we need to limit filters here. It depends on the HMS setting for the stack size limit(-Xss).
   
   Here I follow spark community default value for`spark.sql.hive.metastorePartitionPruningInSetThreshold`, which is 1000



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        .loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+    List<List<String>> partitionVals = writtenPartitionsSince
+        .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+        .filter(values -> !values.isEmpty())
+        .collect(Collectors.toList());
+
+    if (partitionVals.isEmpty()) {
+      // No partition is written
+      return Collections.emptyList();
+    }
+
+    int estimateSize = partitionKeys.size() * partitionVals.size();
+    if (estimateSize > config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {
+      return syncClient.getAllPartitions(tableName);

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org