You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/09 09:52:36 UTC
[hudi] branch master updated: [HUDI-5725] Creating Hudi table with misspelled table type in Flink leads to Flink cluster crash (#7888)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b0615c52ba [HUDI-5725] Creating Hudi table with misspelled table type in Flink leads to Flink cluster crash (#7888)
2b0615c52ba is described below
commit 2b0615c52ba9e8310b2e62dc0fdc1efdeeebe1a8
Author: Paul Zhang <xz...@126.com>
AuthorDate: Thu Feb 9 17:52:26 2023 +0800
[HUDI-5725] Creating Hudi table with misspelled table type in Flink leads to Flink cluster crash (#7888)
---
.../org/apache/hudi/table/HoodieTableFactory.java | 18 +++++++++++++
.../apache/hudi/table/TestHoodieTableFactory.java | 30 ++++++++++++++++++++++
2 files changed, 48 insertions(+)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index d57d971f476..fd067f23255 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
@@ -143,12 +144,29 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
* @param schema The table schema
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+ checkTableType(conf);
+
if (!OptionsResolver.isAppendMode(conf)) {
checkRecordKey(conf, schema);
checkPreCombineKey(conf, schema);
}
}
+ /**
+ * Validate the table type.
+ */
+ private void checkTableType(Configuration conf) {
+ String tableType = conf.get(FlinkOptions.TABLE_TYPE);
+ if (StringUtils.nonEmpty(tableType)) {
+ try {
+ HoodieTableType.valueOf(tableType);
+ } catch (IllegalArgumentException e) {
+ throw new HoodieValidationException("Invalid table type: " + tableType + ". Table type should be either "
+ + HoodieTableType.MERGE_ON_READ + " or " + HoodieTableType.COPY_ON_WRITE + ".");
+ }
+ }
+ }
+
/**
* Validate the record key.
*/
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index b9964b70f79..636eaeffa4c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -177,6 +177,36 @@ public class TestHoodieTableFactory {
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6));
}
+ @Test
+ void testTableTypeCheck() {
+ ResolvedSchema schema = SchemaBuilder.instance()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+
+ // Table type unset. The default value will be ok
+ final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema, "f2");
+ assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1));
+
+ // Invalid table type will throw exception
+ this.conf.set(FlinkOptions.TABLE_TYPE, "INVALID_TABLE_TYPE");
+ final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema, "f2");
+ assertThrows(HoodieValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2));
+
+ // Valid table type will be ok
+ this.conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+ final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema, "f2");
+ assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3));
+
+ // Valid table type will be ok
+ this.conf.set(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE");
+ final MockContext sourceContext4 = MockContext.getInstance(this.conf, schema, "f2");
+ assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext4));
+ }
+
@Test
void testSupplementTableConfig() throws Exception {
String tablePath = new File(tempFile.getAbsolutePath(), "dummy").getAbsolutePath();