You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/09/17 12:34:26 UTC
[hudi] branch master updated: [HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) complexity (#6702)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d95caa9300 [HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) complexity (#6702)
d95caa9300 is described below
commit d95caa93007a490bd12c56a3c859e46c7ddcedd5
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Sep 17 20:34:17 2022 +0800
[HUDI-4865] Optimize HoodieAvroUtils#isMetadataField to use O(1) complexity (#6702)
---
.../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java | 12 ++++--------
.../main/java/org/apache/hudi/common/model/HoodieRecord.java | 5 +++--
.../test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java | 6 +++---
.../java/org/apache/hudi/table/catalog/HiveSchemaUtils.java | 3 ++-
.../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 4 ++--
.../org/apache/hudi/utilities/deltastreamer/DeltaSync.java | 10 +++++-----
6 files changed, 19 insertions(+), 21 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index af6478e56e..a352e86b96 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -75,6 +75,7 @@ import java.util.List;
import java.util.Map;
import java.util.Deque;
import java.util.LinkedList;
+import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
@@ -169,12 +170,7 @@ public class HoodieAvroUtils {
}
public static boolean isMetadataField(String fieldName) {
- return HoodieRecord.COMMIT_TIME_METADATA_FIELD.equals(fieldName)
- || HoodieRecord.COMMIT_SEQNO_METADATA_FIELD.equals(fieldName)
- || HoodieRecord.RECORD_KEY_METADATA_FIELD.equals(fieldName)
- || HoodieRecord.PARTITION_PATH_METADATA_FIELD.equals(fieldName)
- || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName)
- || HoodieRecord.OPERATION_METADATA_FIELD.equals(fieldName);
+ return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName);
}
public static Schema createHoodieWriteSchema(Schema originalSchema) {
@@ -245,7 +241,7 @@ public class HoodieAvroUtils {
return removeFields(schema, HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION);
}
- public static Schema removeFields(Schema schema, List<String> fieldsToRemove) {
+ public static Schema removeFields(Schema schema, Set<String> fieldsToRemove) {
List<Schema.Field> filteredFields = schema.getFields()
.stream()
.filter(field -> !fieldsToRemove.contains(field.name()))
@@ -422,7 +418,7 @@ public class HoodieAvroUtils {
* <p>
* To better understand how it removes please check {@link #rewriteRecord(GenericRecord, Schema)}
*/
- public static GenericRecord removeFields(GenericRecord record, List<String> fieldsToRemove) {
+ public static GenericRecord removeFields(GenericRecord record, Set<String> fieldsToRemove) {
Schema newSchema = removeFields(record.getSchema(), fieldsToRemove);
return rewriteRecord(record, newSchema);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index c7ef08a162..c18d1f333b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -48,8 +49,8 @@ public abstract class HoodieRecord<T> implements Serializable {
// Temporary to support the '_hoodie_operation' field, once we solve
// the compatibility problem, it can be removed.
- public static final List<String> HOODIE_META_COLUMNS_WITH_OPERATION =
- CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
+ public static final Set<String> HOODIE_META_COLUMNS_WITH_OPERATION =
+ CollectionUtils.createImmutableSet(COMMIT_TIME_METADATA_FIELD, COMMIT_SEQNO_METADATA_FIELD,
RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD,
OPERATION_METADATA_FIELD);
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index ad77e13b46..3371085d21 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -34,7 +34,7 @@ import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -249,7 +249,7 @@ public class TestHoodieAvroUtils {
rec.put("non_pii_col", "val1");
rec.put("pii_col", "val2");
rec.put("timestamp", 3.5);
- GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList("pii_col"));
+ GenericRecord rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton("pii_col"));
assertEquals("key1", rec1.get("_row_key"));
assertEquals("val1", rec1.get("non_pii_col"));
assertEquals(3.5, rec1.get("timestamp"));
@@ -262,7 +262,7 @@ public class TestHoodieAvroUtils {
+ "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ "{\"name\": \"pii_col\", \"type\": \"string\"}]},";
expectedSchema = new Schema.Parser().parse(schemaStr);
- rec1 = HoodieAvroUtils.removeFields(rec, Arrays.asList(""));
+ rec1 = HoodieAvroUtils.removeFields(rec, Collections.singleton(""));
assertEquals(expectedSchema, rec1.getSchema());
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index 2d3bf4389c..a057c02f2c 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.catalog;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -60,7 +61,7 @@ public class HiveSchemaUtils {
public static org.apache.flink.table.api.Schema convertTableSchema(Table hiveTable) {
List<FieldSchema> allCols = hiveTable.getSd().getCols().stream()
// filter out the metadata columns
- .filter(s -> !HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(s.getName()))
+ .filter(s -> !HoodieAvroUtils.isMetadataField(s.getName()))
.collect(Collectors.toList());
// need to refactor the partition key field positions: they are not always in the last
allCols.addAll(hiveTable.getPartitionKeys());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 99be676f14..b9ff4c0d1a 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -344,9 +344,9 @@ object HoodieSparkSqlWriter {
}
def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: Schema): Schema = {
- val fieldsToRemove = new java.util.ArrayList[String]()
+ val fieldsToRemove = new java.util.HashSet[String]()
partitionParam.split(",").map(partitionField => partitionField.trim)
- .filter(s => !s.isEmpty).map(field => fieldsToRemove.add(field))
+ .filter(s => s.nonEmpty).map(field => fieldsToRemove.add(field))
HoodieAvroUtils.removeFields(schema, fieldsToRemove)
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 89716047db..3fbb8c0f8d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -487,7 +487,7 @@ public class DeltaSync implements Serializable, Closeable {
}
boolean shouldCombine = cfg.filterDupes || cfg.operation.equals(WriteOperationType.UPSERT);
- List<String> partitionColumns = getPartitionColumns(keyGenerator, props);
+ Set<String> partitionColumns = getPartitionColumns(keyGenerator, props);
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
JavaRDD<HoodieRecord> records = avroRDD.map(record -> {
GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record;
@@ -952,14 +952,14 @@ public class DeltaSync implements Serializable, Closeable {
}
/**
- * Get the list of partition columns as a list of strings.
+ * Get the partition columns as a set of strings.
*
* @param keyGenerator KeyGenerator
* @param props TypedProperties
- * @return List of partition columns.
+ * @return Set of partition columns.
*/
- private List<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
+ private Set<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperties props) {
String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
- return Arrays.asList(partitionColumns.split(","));
+ return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
}
}