You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "vinothchandar (via GitHub)" <gi...@apache.org> on 2023/03/22 01:00:33 UTC

[GitHub] [hudi] vinothchandar commented on a diff in pull request #8107: [HUDI-5514] Adding auto generation of record keys support to Hudi/Spark

vinothchandar commented on code in PR #8107:
URL: https://github.com/apache/hudi/pull/8107#discussion_r1144114597


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java:
##########
@@ -36,8 +39,9 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
 
   public NonpartitionedAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
-        .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+    this.recordKeyFields = autoGenerateRecordKeys() ? Collections.emptyList() :

Review Comment:
   should this code be pushed to the super class?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java:
##########
@@ -32,7 +34,8 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+    this.recordKeyFields = autoGenerateRecordKeys() ? Collections.emptyList() :

Review Comment:
   may be a simple if instead of ternary is more readable? ternary is best used IMO for simple expressions. 
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null)),

Review Comment:
   won't it return `null` if the key is not found in `props` anyways?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, String partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
+    this.recordKeyFields = !recordKeyField.isPresent()
         ? Collections.emptyList()
-        : Collections.singletonList(recordKeyField);
+        : Collections.singletonList(recordKeyField.get());
     this.partitionPathFields = Collections.singletonList(partitionPathField);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
+    if (autoGenerateRecordKeys()) {
+      // To fetch partition path, caller will have to call getKey() on KeyGenerator and call .getPartitionPath. Hence we have to pass empty field to support

Review Comment:
   avoid this comment everywhere?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, String partitionPathField) {

Review Comment:
   This does not break any reflection based instantiation?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -82,9 +85,19 @@ object HoodieDatasetBulkInsertHelper
           val keyGenerator =
             ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
               .asInstanceOf[SparkKeyGeneratorInterface]
+          val partitionId = TaskContext.getPartitionId()
+          var rowId = 0

Review Comment:
   nts: this var gets copied over to each executed and we generate the `rowId` independently for each partition?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java:
##########
@@ -41,7 +42,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
   private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
 
   public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
-    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),

Review Comment:
   same. will `getString()` not return null if not found



##########
hudi-common/src/main/java/org/apache/hudi/exception/HoodieAutoRecordKeyException.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Used for exceptions related to Auto generation of record keys.
+ */
+public class HoodieAutoRecordKeyException extends HoodieException {

Review Comment:
   rename: HoodieKeyGenerationException



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.{HoodieAutoRecordKeyException, HoodieException}
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =

Review Comment:
   Can we simplify this more for the user. automatically turn on the config, if the user does `bulk_insert` from all Spark write paths. If they upsert the table later, then we generate keys for inserted records.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null)),

Review Comment:
   https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#getProperty-java.lang.String-



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =
+    Set(WriteOperationType.INSERT, WriteOperationType.BULK_INSERT, WriteOperationType.DELETE,
+      WriteOperationType.INSERT_OVERWRITE, WriteOperationType.INSERT_OVERWRITE_TABLE,
+      WriteOperationType.DELETE_PARTITION).map(_.name())

Review Comment:
   idk how we would autogenerate keys for inserted records within an upsert for eg. without implementing this at the HoodieMerge/Create handle layers? Could you sketch that impl?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.{HoodieAutoRecordKeyException, HoodieException}
+import org.apache.spark.TaskContext
+
+object AutoRecordKeyGenerationUtils {
+
+   // supported operation types when auto generation of record keys is enabled.
+   val supportedOperations: Set[String] =

Review Comment:
   I think making this a top level choice, introduces the need to care about a new config auto generation of key, to not configure an existing config - key generator.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java:
##########
@@ -29,24 +31,29 @@
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   public SimpleAvroKeyGenerator(TypedProperties props) {
-    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
   SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, null, partitionPathField);
+    this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
+  SimpleAvroKeyGenerator(TypedProperties props, Option<String> recordKeyField, String partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
+    this.recordKeyFields = !recordKeyField.isPresent()

Review Comment:
   use `Option.ifPresent`? or `Option.map.orElse` ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -260,6 +260,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("The metadata of secondary indexes");
 
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = ConfigProperty
+      .key("hoodie.table.auto.generate.record.keys")
+      .defaultValue("false")
+      .withDocumentation("Enables automatic generation of the record-keys in cases when dataset bears "

Review Comment:
   +1 we should track this in the separate usability improvement track



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -82,9 +85,19 @@ object HoodieDatasetBulkInsertHelper
           val keyGenerator =
             ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
               .asInstanceOf[SparkKeyGeneratorInterface]
+          val partitionId = TaskContext.getPartitionId()

Review Comment:
   we should be resilient to drive failures as well? the entire Hudi commit will abort if the driver is restarted? I am wondering if the RDD recomputation introduces any non-determinism here in this here. In general, as long as we don't recompute the source RDD, it should be resilient



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java:
##########
@@ -64,12 +67,22 @@ public String getPartitionPath(GenericRecord record) {
 
   @Override
   public String getRecordKey(Row row) {
+    if (autoGenerateRecordKeys()) {
+      // To fetch partition path, caller will have to call getKey() on KeyGenerator and call .getPartitionPath. Hence we have to pass empty field to support
+      // returning partition path for the callers.
+      return StringUtils.EMPTY_STRING;
+    }
     tryInitRowAccessor(row.schema());
     return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    if (autoGenerateRecordKeys()) {
+      // To fetch partition path, caller will have to call getKey() on KeyGenerator and call .getPartitionPath. Hence we have to pass empty field to support

Review Comment:
   remove comment



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -608,14 +617,31 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
     SerializableSchema avroSchema = new SerializableSchema(schemaProvider.getTargetSchema());
     SerializableSchema processedAvroSchema = new SerializableSchema(isDropPartitionColumns() ? HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
     if (recordType == HoodieRecordType.AVRO) {
-      records = avroRDD.map(record -> {
-        GenericRecord gr = isDropPartitionColumns() ? HoodieAvroUtils.removeFields(record, partitionColumns) : record;
+      // if auto generation of keys is enabled, lets generate one
+      JavaRDD<Tuple2<GenericRecord, Option<String>>> recordRecordKeyOverrideRdd = avroRDD.mapPartitions(
+          (FlatMapFunction<Iterator<GenericRecord>, Tuple2<GenericRecord, Option<String>>>) genericRecordIterator -> {
+            int rowId = 0;

Review Comment:
   we are repeating this in 3 places now? I see that its hard to restructure though. hmmm.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -260,6 +260,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("0.13.0")
       .withDocumentation("The metadata of secondary indexes");
 
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = ConfigProperty
+      .key("hoodie.table.auto.generate.record.keys")
+      .defaultValue("false")
+      .withDocumentation("Enables automatic generation of the record-keys in cases when dataset bears "
+          + "no natural record key satisfying requirements of being the primary-key in the Hudi table. "
+          + "Record key auto-gen is generally recommended for 'append-only' workloads, ie ones leveraging 'insert' or "

Review Comment:
   IMO auto-generation should happen at the writer level. Do we need a table property? if there is no key generator, then the keys were auto generated?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1116,31 +1124,47 @@ object HoodieSparkSqlWriter {
           Some(writerSchema))
 
         avroRecords.mapPartitions(it => {
+          val sparkPartitionId = TaskContext.getPartitionId()
+
           val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
           val consistentLogicalTimestampEnabled = parameters.getOrElse(
             DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
             DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean
 
-          it.map { avroRecord =>
+          // generate record keys is auto generation is enabled.
+          val recordsWithRecordKeyOverride = mayBeAutoGenerateRecordKeys(autoGenerateRecordKeys, it, instantTime, sparkPartitionId)

Review Comment:
   nts: need to dig deeper here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org