You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/17 12:34:26 UTC

[hudi] branch master updated: [HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) complexity (#6702)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d95caa9300 [HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) complexity (#6702)
d95caa9300 is described below

commit d95caa93007a490bd12c56a3c859e46c7ddcedd5
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Sep 17 20:34:17 2022 +0800

    [HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) complexity (#6702)
---
 .../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java  | 12 ++++--------
 .../main/java/org/apache/hudi/common/model/HoodieRecord.java |  5 +++--
 .../test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java  |  6 +++---
 .../java/org/apache/hudi/table/catalog/HiveSchemaUtils.java  |  3 ++-
 .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala    |  4 ++--
 .../org/apache/hudi/utilities/deltastreamer/DeltaSync.java   | 10 +++++-----
 6 files changed, 19 insertions(+), 21 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index af6478e56e..a352e86b96 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -75,6 +75,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Deque;
 import java.util.LinkedList;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
@@ -169,12 +170,7 @@ public class HoodieAvroUtils {
   }
 
   public static boolean isMetadataField(String fieldName) {
-    return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
-        || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
-        || HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
-        || HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
-        || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName)
-        || HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName);
+    return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName);
   }
 
   public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -245,7 +241,7 @@ public class HoodieAvroUtils {
     return removeFields(schema, HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION);
   }
 
-  public static Schema removeFields(Schema schema, List<String> fieldsToRemove) {
+  public static Schema removeFields(Schema schema, Set<String> fieldsToRemove) {
     List<Schema.Field> filteredFields = schema.getFields()
         .stream()
         .filter(field -> !fieldsToRemove.contains(field.name()))
@@ -422,7 +418,7 @@ public class HoodieAvroUtils {
    * <p>
    * To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
    */
-  public static GenericRecord removeFields(GenericRecord record, List<String> fieldsToRemove) {
+  public static GenericRecord removeFields(GenericRecord record, Set<String> fieldsToRemove) {
     Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
     return rewriteRecord(record, newSchema);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index c7ef08a162..c18d1f333b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -48,8 +49,8 @@ public abstract class HoodieRecord<T> implements Serializable {
 
   // Temporary to support the '_hoodie_operation' field, once we solve
   // the compatibility problem, it can be removed.
-  public static final List<String> HOODIE_META_COLUMNS_WITH_OPERATION =
-      CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
+  public static final Set<String> HOODIE_META_COLUMNS_WITH_OPERATION =
+      CollectionUtils.createImmutableSet(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
           RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD,
           OPERATION_METADATA_FIELD);
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index ad77e13b46..3371085d21 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -34,7 +34,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -249,7 +249,7 @@ public class TestHoodieAvroUtils {
     rec.put("non_pii_col", "val1");
     rec.put("pii_col", "val2");
     rec.put("timestamp", 3.5);
-    GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
+    GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton("pii_col"));
     assertEquals("key1", rec1.get("_row_key"));
     assertEquals("val1", rec1.get("non_pii_col"));
     assertEquals(3.5, rec1.get("timestamp"));
@@ -262,7 +262,7 @@ public class TestHoodieAvroUtils {
         + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
         + "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
     expectedSchema = new Schema.Parser().parse(schemaStr);
-    rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
+    rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton(""));
     assertEquals(expectedSchema, rec1.getSchema());
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index 2d3bf4389c..a057c02f2c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -60,7 +61,7 @@ public class HiveSchemaUtils {
   public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
     List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
         // filter out the metadata columns
-        .filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName()))
+        .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
         .collect(Collectors.toList());
     // need to refactor the partition key field positions: they are not always in the last
     allCols.addAll(hiveTable.getPartitionKeys());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 99be676f14..b9ff4c0d1a 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -344,9 +344,9 @@ object HoodieSparkSqlWriter {
   }
 
   def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
-    val fieldsToRemove = new java.util.ArrayList[String]()
+    val fieldsToRemove = new java.util.HashSet[String]()
     partitionParam.split(",").map(partitionField => partitionField.trim)
-      .filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
+      .filter(s => s.nonEmpty).map(field => fieldsToRemove.add(field))
     HoodieAvroUtils.removeFields(schema, fieldsToRemove)
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 89716047db..3fbb8c0f8d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -487,7 +487,7 @@ public class DeltaSync implements Serializable, Closeable {
     }
 
     boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
-    List<String> partitionColumns = getPartitionColumns(keyGenerator, props);
+    Set<String> partitionColumns = getPartitionColumns(keyGenerator, props);
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
     JavaRDD<HoodieRecord> records = avroRDD.map(record -> {
       GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record;
@@ -952,14 +952,14 @@ public class DeltaSync implements Serializable, Closeable {
   }
 
   /**
-   * Get the list of partition columns as a list of strings.
+   * Get the partition columns as a set of strings.
    *
    * @param keyGenerator KeyGenerator
    * @param props TypedProperties
-   * @return List of partition columns.
+   * @return Set of partition columns.
    */
-  private List<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
+  private Set<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
     String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
-    return Arrays.asList(partitionColumns.split(","));
+    return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
   }
 }