You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/09 09:52:36 UTC

[hudi] branch master updated: [HUDI-5725] Creating Hudi table with misspelled table type in Flink leads to Flink cluster crash (#7888)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b0615c52ba [HUDI-5725] Creating Hudi table with misspelled table type in Flink leads to Flink cluster crash (#7888)
2b0615c52ba is described below

commit 2b0615c52ba9e8310b2e62dc0fdc1efdeeebe1a8
Author: Paul Zhang <xz...@126.com>
AuthorDate: Thu Feb 9 17:52:26 2023 +0800

    [HUDI-5725] Creating Hudi table with misspelled table type in Flink leads to Flink cluster crash (#7888)
---
 .../org/apache/hudi/table/HoodieTableFactory.java  | 18 +++++++++++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 30 ++++++++++++++++++++++
 2 files changed, 48 insertions(+)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index d57d971f476..fd067f23255 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -143,12 +144,29 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
    * @param schema The table schema
    */
   private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+    checkTableType(conf);
+
     if (!OptionsResolver.isAppendMode(conf)) {
       checkRecordKey(conf, schema);
       checkPreCombineKey(conf, schema);
     }
   }
 
+  /**
+   * Validate the table type.
+   */
+  private void checkTableType(Configuration conf) {
+    String tableType = conf.get(FlinkOptions.TABLE_TYPE);
+    if (StringUtils.nonEmpty(tableType)) {
+      try {
+        HoodieTableType.valueOf(tableType);
+      } catch (IllegalArgumentException e) {
+        throw new HoodieValidationException("Invalid table type: " + tableType + ". Table type should be either "
+                + HoodieTableType.MERGE_ON_READ + " or " + HoodieTableType.COPY_ON_WRITE + ".");
+      }
+    }
+  }
+
   /**
    * Validate the record key.
    */
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index b9964b70f79..636eaeffa4c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -177,6 +177,36 @@ public class TestHoodieTableFactory {
     assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6));
   }
 
+  @Test
+  void testTableTypeCheck() {
+    ResolvedSchema schema = SchemaBuilder.instance()
+            .field("f0", DataTypes.INT().notNull())
+            .field("f1", DataTypes.VARCHAR(20))
+            .field("f2", DataTypes.TIMESTAMP(3))
+            .field("ts", DataTypes.TIMESTAMP(3))
+            .primaryKey("f0")
+            .build();
+
+    // Table type unset. The default value will be ok
+    final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema, "f2");
+    assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));
+
+    // Invalid table type will throw exception
+    this.conf.set(FlinkOptions.TABLE_TYPE, "INVALID_TABLE_TYPE");
+    final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema, "f2");
+    assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
+
+    // Valid table type will be ok
+    this.conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+    final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema, "f2");
+    assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3));
+
+    // Valid table type will be ok
+    this.conf.set(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE");
+    final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema, "f2");
+    assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext4));
+  }
+
   @Test
   void testSupplementTableConfig() throws Exception {
     String tablePath = new File(tempFile.getAbsolutePath(), "dummy").getAbsolutePath();