You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/28 01:38:04 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5771: [HUDI-4071] Relax record key requirement and write with minimal options

alexeykudinkin commented on code in PR #5771:
URL: https://github.com/apache/hudi/pull/5771#discussion_r931694390


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -115,7 +115,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
       .key("hoodie.datasource.write.precombine.field")
-      .defaultValue("ts")
+      .noDefaultValue()

Review Comment:
   Need to call this out as non-BWC
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java:
##########
@@ -64,6 +63,9 @@ public String getPartitionPath(GenericRecord record) {
 
   @Override
   public String getRecordKey(Row row) {

Review Comment:
   You also need to handle this in `InternalRow` one



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -393,18 +396,28 @@ public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
 
   /**
    * Validate table properties.
-   * @param properties Properties from writeConfig.
+   * @param writeConfigProps Properties from writeConfig.
    */
-  public void validateTableProperties(Properties properties) {
+  public void validateTableProperties(Properties writeConfigProps) {
+    // Once table is configured to be append-only, it cannot be mutable or allow setting record key or precombine fields for updates
+    if (getTableConfig().isAppendOnlyTable()) {
+      boolean appendOnlyTable = Boolean.parseBoolean(
+          writeConfigProps.getProperty(HoodieTableConfig.APPEND_ONLY_TABLE.key(), String.valueOf(HoodieTableConfig.APPEND_ONLY_TABLE.defaultValue())));

Review Comment:
   Why do we need to specify on the write-config as well?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -235,6 +235,13 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Comma-separated list of metadata partitions that have been completely built and in-sync with data table. "
           + "These partitions are ready for use by the readers");
 
+  public static final ConfigProperty<Boolean> APPEND_ONLY_TABLE = ConfigProperty
+      .key("hoodie.table.append.only")

Review Comment:
   Boolean flags are really hard to evolve and better reserved for toggle-like configs (switching on/off).
   
   In that case if we'd consider adding "table-type" we'd be exposed to can of warms of banning various permutations of this configs.
   
   I'd suggest to generalize this to be a "table-type" config which we can declare either "mutable" or "immutable"



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -791,6 +809,13 @@ object HoodieSparkSqlWriter {
     if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
       mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
     }
+    // for the first ingest batch, if applicable, disable populate meta fields and enable append-only table config
+    if (tableConfig == null
+      && !mergedParams.contains(RECORDKEY_FIELD.key)
+      && !mergedParams.contains(PRECOMBINE_FIELD.key)) {
+      mergedParams.put(HoodieTableConfig.POPULATE_META_FIELDS.key, (!HoodieTableConfig.POPULATE_META_FIELDS.defaultValue).toString)

Review Comment:
   We should just go w/ `false` here (inverse default makes it cryptic)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java:
##########
@@ -64,6 +63,9 @@ public String getPartitionPath(GenericRecord record) {
 
   @Override
   public String getRecordKey(Row row) {
+    if (recordKeyFields.isEmpty()) {
+      return EMPTY_STRING;

Review Comment:
   I'd suggest create a method `emptyKey` and redirect there so that if we decided to change the sentinel value we can switch it easily



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -788,9 +790,10 @@ object DataSourceOptionsHelper {
     val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
     if (partitionFields != null) {
       val numPartFields = partitionFields.split(",").length
-      val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
-      val numRecordKeyFields = recordsKeyFields.split(",").length
-      if (numPartFields == 1 && numRecordKeyFields == 1) {
+      val numRecordKeyFields =
+        if(props.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key)) props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key).split(",").length

Review Comment:
   Spacing



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -791,6 +809,13 @@ object HoodieSparkSqlWriter {
     if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
       mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
     }
+    // for the first ingest batch, if applicable, disable populate meta fields and enable append-only table config
+    if (tableConfig == null
+      && !mergedParams.contains(RECORDKEY_FIELD.key)
+      && !mergedParams.contains(PRECOMBINE_FIELD.key)) {
+      mergedParams.put(HoodieTableConfig.POPULATE_META_FIELDS.key, (!HoodieTableConfig.POPULATE_META_FIELDS.defaultValue).toString)
+      mergedParams.put(HoodieTableConfig.APPEND_ONLY_TABLE.key, (!HoodieTableConfig.APPEND_ONLY_TABLE.defaultValue).toString)

Review Comment:
   Same as above



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -118,6 +118,19 @@ object HoodieSparkSqlWriter {
 
       operation = WriteOperationType.INSERT
     }
+    // If no record key field and precombine field is provided, assume it's an append-only workload and do bulk insert
+    if (!hoodieConfig.contains(RECORDKEY_FIELD)
+      && !hoodieConfig.contains(PRECOMBINE_FIELD)) {
+      if (!optParams.contains(OPERATION.key)
+        || optParams(OPERATION.key).equalsIgnoreCase(BULK_INSERT_OPERATION_OPT_VAL)) {
+        log.warn(s"Neither $RECORDKEY_FIELD nor $PRECOMBINE_FIELD is specified; " +

Review Comment:
   We shouldn't print this in case operation set to bulk-insert -- it'd be confusing



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -179,6 +194,9 @@ object HoodieSparkSqlWriter {
           basePath, path, instantTime, partitionColumns)
         return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
       }
+      if (tableExists && tableConfig.isAppendOnlyTable && operation != WriteOperationType.BULK_INSERT) {

Review Comment:
   Conditional above is short-circuiting for bulk-insert with row-writing.
   
   We should do this check as early as possible. We can also extract it to a `validateConfig` method 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -393,18 +396,28 @@ public HoodieArchivedTimeline getArchivedTimeline(String startTs) {
 
   /**
    * Validate table properties.
-   * @param properties Properties from writeConfig.
+   * @param writeConfigProps Properties from writeConfig.
    */
-  public void validateTableProperties(Properties properties) {
+  public void validateTableProperties(Properties writeConfigProps) {
+    // Once table is configured to be append-only, it cannot be mutable or allow setting record key or precombine fields for updates
+    if (getTableConfig().isAppendOnlyTable()) {
+      boolean appendOnlyTable = Boolean.parseBoolean(
+          writeConfigProps.getProperty(HoodieTableConfig.APPEND_ONLY_TABLE.key(), String.valueOf(HoodieTableConfig.APPEND_ONLY_TABLE.defaultValue())));
+      checkArgument(appendOnlyTable, String.format("%s is enabled. Please recreate the table with record key to support mutable tables.", HoodieTableConfig.APPEND_ONLY_TABLE.key()));
+      checkState(isNullOrEmpty(writeConfigProps.getProperty(HoodieTableConfig.RECORDKEY_FIELDS.key())), String.format("%s set for an append-only table", HoodieTableConfig.RECORDKEY_FIELDS.key()));
+      checkState(isNullOrEmpty(writeConfigProps.getProperty("hoodie.datasource.write.recordkey.field")), "hoodie.datasource.write.recordkey.field set for an append-only table");

Review Comment:
   These are leaking Spark impl details in here. We should vet these out on different level



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java:
##########
@@ -307,38 +307,48 @@ public void testNoPropsSet() {
     HoodieWriteConfig config = getConfigBuilder(schemaStr).build();
     List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
     Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
-    try {
-      Dataset<Row> preparedDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
-          new NonSortPartitionerWithRows(), false);
-      preparedDF.count();
-      fail("Should have thrown exception");
-    } catch (Exception e) {
-      // ignore
+
+    Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config,
+            new NonSortPartitionerWithRows(), false);
+    StructType resultSchema = result.schema();
+    assertEquals(10, result.count());
+    assertEquals(resultSchema.fieldNames().length, structType.fieldNames().length + HoodieRecord.HOODIE_META_COLUMNS.size());
+
+    for (Map.Entry<String, Integer> entry : HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.entrySet()) {

Review Comment:
   Instead, let's just assert full schemas



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -280,7 +298,7 @@ object HoodieSparkSqlWriter {
                 HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
             val hoodieAllIncomingRecords = genericRecords.map(gr => {
               val processedRecord = getProcessedRecord(partitionColumns, gr, dropPartitionColumns)
-              val hoodieRecord = if (shouldCombine) {
+              val hoodieRecord = if (shouldCombine && hoodieConfig.contains(PRECOMBINE_FIELD)) {

Review Comment:
   We should actually throw if drop-dups is configured by pre-combine is not



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -55,6 +55,7 @@ object HoodieDatasetBulkInsertHelper extends Logging {
     val populateMetaFields = config.populateMetaFields()
     val schema = df.schema
 
+    if (!config.contains(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)) config.setDefaultValue(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME)

Review Comment:
   Why do we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org