You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/17 11:05:48 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #6725: [HUDI-4881] Push down filters if possible when syncing partitions to Hive

xushiyan commented on code in PR #6725:
URL: https://github.com/apache/hudi/pull/6725#discussion_r996889670


##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java:
##########
@@ -64,6 +64,12 @@ public class HiveSyncConfig extends HoodieSyncConfig {
   public static final ConfigProperty<String> HIVE_SYNC_BUCKET_SYNC_SPEC = HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
   public static final ConfigProperty<String> HIVE_SYNC_COMMENT = HiveSyncConfigHolder.HIVE_SYNC_COMMENT;
 
+  public static final ConfigProperty<Integer> META_SYNC_FILTER_PUSHDOWN_MAX_SIZE = ConfigProperty

Review Comment:
   better call it `HIVE_SYNC_` as it's specific for hive sync (residing in HiveSyncConfig)



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {

Review Comment:
   can be a static helper in some other utils



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        .loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+    List<List<String>> partitionVals = writtenPartitionsSince
+        .stream().map(partitionValueExtractor::extractPartitionValuesInPath)

Review Comment:
   instead of nested lists, can we make use of org.apache.hudi.sync.common.model.Partition model?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        .loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS);

Review Comment:
   let's align the name, should be called partitionFields, applies to other involved API arg names too



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {

Review Comment:
   this and other new methods added below should be placed in a better class. they are very specific to filter generation and don't belong to the sync tool level. there should be specific helper used by hive sync client to encapsulate all these logic.



##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java:
##########
@@ -1168,6 +1168,74 @@ public void testSyncWithoutDiffs(String syncMode) throws Exception {
     assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get());
   }
 
+  @Test
+  public void testSyncWithPushDownFilters() {

Review Comment:
   high-level feedback about testing: we need many more variation and corner cases covered in UT before landing this feature: different partition val extractors, hive style or not, multi/single/non-partition fields, etc. 



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        .loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+    List<List<String>> partitionVals = writtenPartitionsSince
+        .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+        .filter(values -> !values.isEmpty())
+        .collect(Collectors.toList());
+
+    if (partitionVals.isEmpty()) {
+      // No partition is written
+      return Collections.emptyList();
+    }
+
+    int estimateSize = partitionKeys.size() * partitionVals.size();
+    if (estimateSize > config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {
+      return syncClient.getAllPartitions(tableName);

Review Comment:
   here we changed the default behavior for many users where filter size < 1000. We have to ensure the correctness of filter generation by having enough UT coverage. So I'd suggest keep the existing sync all partitions still as default and use filter only when people set some threshold, e.g., default = -1 (not using filter). then people set accordingly to their tables' partition numbers



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());

Review Comment:
   quite some data wrangling here.. can we make use of and evolve models from sync-common to encapsulate them? the code will not be easy to maintain if these logic spread around in methods



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java:
##########
@@ -307,14 +315,161 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea
     return schemaChanged;
   }
 
+  /**
+   * Combine filter with existing filters.
+   *
+   * 1. If no filter left in the filterBuilder, will simply add it.
+   * 2. If filterBuilder is not empty, will combine the existing filters
+   *    and new adding filter with operator if the filter is not empty,
+   *    otherwise return empty for OR operator; while return left side
+   *    for ADD operator. Wrap these filters with parenthesis.
+   */
+  private void combineFilter(StringBuilder filterBuilder, String operator, Option<String> filter) {
+    ValidationUtils.checkArgument(operator.equals("OR") || operator.equals("AND"),
+        "Filters can only accept `OR` or `AND` operator");
+    boolean shouldEnclosed = false;
+    if (filter.isPresent() && !filter.get().isEmpty()) {
+      if (filterBuilder.length() != 0) {
+        // If filterBuilder is not empty, which means there already has some filters
+        // in the filterBuilder, we need to use operator to combine already existed filters
+        // and the new filter.
+        filterBuilder.insert(0, "(");
+        filterBuilder.append(" " + operator + " ");
+        shouldEnclosed = true;
+      }
+      filterBuilder.append(filter.get());
+    } else {
+      if (operator.equals("OR")) {
+        filterBuilder.setLength(0);
+        return;
+      }
+    }
+
+    if (shouldEnclosed) {
+      filterBuilder.append(")");
+    }
+  }
+
+  private String quoteStringLiteral(String value) {
+    if (!value.contains("\"")) {
+      return "\"" + value + "\"";
+    } else if (!value.contains("'")) {
+      return "'" + value + "'";
+    } else {
+      throw new UnsupportedOperationException("Cannot pushdown filters if \" and ' both exist");
+    }
+  }
+
+  private Option<String> extractLiteralValue(String type, String value) {
+    switch (type.toLowerCase(Locale.ROOT)) {
+      case HiveSchemaUtil.STRING_TYPE_NAME:
+        return Option.of(quoteStringLiteral(value));
+      case HiveSchemaUtil.INT_TYPE_NAME:
+      case HiveSchemaUtil.BIGINT_TYPE_NAME:
+      case HiveSchemaUtil.DATE_TYPE_NAME:
+        return Option.of(value);
+      default:
+        return Option.empty();
+    }
+  }
+
+  private Option<String> generateEqualFilter(String key, String type, String value) {
+    Option<String> extracted = extractLiteralValue(type, value);
+    if (extracted.isPresent()) {
+      return Option.of(key + " = " + extracted.get());
+    }
+    return Option.empty();
+  }
+
+  /**
+   * Visit the partition and generate filters.
+   *
+   * Examples:
+   * 1. date=2022-09-20 => date = 2022-09-20
+   * 2. date=2022-09-20/hour=9 => (date = 2022-09-20 AND hour = 9)
+   * 3. date=2022-09-20/hour=9/min=30 => ((date = 2022-09-20 AND hour = 9) AND min = 30)
+   */
+  private String visitPartition(List<String> partitionKeys,
+                                Map<String, String> keyWithTypes,
+                                List<String> partitionValues) {
+    if (partitionKeys.size() != partitionValues.size()) {
+      throw new HoodieHiveSyncException("Partition key and values should be same length"
+          + ", but got partitionKey: " + partitionKeys + " with values: " + partitionValues);
+    }
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionValues.size(); i++) {
+      String key = partitionKeys.get(i);
+      combineFilter(filterBuilder, "AND",
+          generateEqualFilter(key, keyWithTypes.get(key), partitionValues.get(i)));
+    }
+
+    return filterBuilder.toString();
+  }
+
+  protected String generateWrittenPartitionsFilter(String tableName,
+                                                   List<String> partitionKeys,
+                                                   List<List<String>> partitionVals) {
+    // Hive store columns to lowercase, so we need to map partitions to lowercase to avoid any mismatch.
+    List<String> normalizedPartitionKeys = partitionKeys.stream()
+        .map(String::toLowerCase)
+        .collect(Collectors.toList());
+    List<String> partitionTypes = syncClient.getMetastoreFieldSchemas(tableName)
+        .stream()
+        .filter(f -> normalizedPartitionKeys.contains(f.getName()))
+        .map(FieldSchema::getType)
+        .collect(Collectors.toList());
+
+    if (partitionTypes.size() == 0) {
+      throw new HoodieHiveSyncException("Cannot get partition types from SyncClient, maybe "
+          + "table schema is not synced");
+    }
+
+    Map<String, String> keyWithTypes = CollectionUtils.zipToMap(normalizedPartitionKeys, partitionTypes);
+
+    StringBuilder filterBuilder = new StringBuilder();
+    for (int i = 0; i < partitionVals.size(); i++) {
+      combineFilter(filterBuilder, "OR",
+          Option.of(visitPartition(normalizedPartitionKeys, keyWithTypes, partitionVals.get(i))));
+    }
+    return filterBuilder.toString();
+  }
+
+  /**
+   * Fetch partitions from meta service, will try to push down more filters to avoid fetching
+   * too many unnecessary partitions.
+   */
+  private List<Partition> getTablePartitions(String tableName, List<String> writtenPartitionsSince) {
+    PartitionValueExtractor partitionValueExtractor = ReflectionUtils
+        .loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    List<String> partitionKeys = config.getSplitStrings(META_SYNC_PARTITION_FIELDS);
+    List<List<String>> partitionVals = writtenPartitionsSince
+        .stream().map(partitionValueExtractor::extractPartitionValuesInPath)
+        .filter(values -> !values.isEmpty())
+        .collect(Collectors.toList());
+
+    if (partitionVals.isEmpty()) {
+      // No partition is written
+      return Collections.emptyList();
+    }
+
+    int estimateSize = partitionKeys.size() * partitionVals.size();
+    if (estimateSize > config.getIntOrDefault(META_SYNC_FILTER_PUSHDOWN_MAX_SIZE)) {

Review Comment:
   any empirical evidence to share and show that usually more than 1000 filters will be less performant than fetch all partitions? how should we guide users on setting this config?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org