You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/02/01 01:45:51 UTC

[hudi] branch master updated: [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9469882d80f [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)
9469882d80f is described below

commit 9469882d80f6abf0fa1b233430de7c2c0ab53a5d
Author: chao chen <59...@users.noreply.github.com>
AuthorDate: Wed Feb 1 09:45:39 2023 +0800

    [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)
    
    
    Co-authored-by: danny0405 <yu...@gmail.com>
---
 .../apache/hudi/table/catalog/HiveSchemaUtils.java |  8 ++++---
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  2 +-
 .../hudi/table/catalog/TableOptionProperties.java  | 25 ++++++++++++++++++++--
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 15 +++++++++++++
 4 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index 4383b42e9f8..fac507cb7db 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -181,9 +181,11 @@ public class HiveSchemaUtils {
    */
   public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, boolean withOperationField) {
     List<FieldSchema> columns = new ArrayList<>();
-    Collection<String> metaFields = withOperationField
-        ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence
-        : HoodieRecord.HOODIE_META_COLUMNS;
+    Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS);
+    if (withOperationField) {
+      metaFields.add(HoodieRecord.OPERATION_METADATA_FIELD);
+    }
+
     for (String metaField : metaFields) {
       columns.add(new FieldSchema(metaField, "string", null));
     }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 6dcdf118415..fd36a39d237 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -597,7 +597,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat));
     serdeProperties.put("serialization.format", "1");
 
-    serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys));
+    serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys, withOperationField));
 
     sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index a0864bbf377..6e327bdc612 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -28,6 +29,8 @@ import org.apache.hudi.util.AvroSchemaConverter;
 
 import org.apache.avro.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,7 +42,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
@@ -49,6 +54,7 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
 import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
 
 /**
@@ -168,8 +174,10 @@ public class TableOptionProperties {
       CatalogTable catalogTable,
       Configuration hadoopConf,
       Map<String, String> properties,
-      List<String> partitionKeys) {
-    Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
+      List<String> partitionKeys,
+      boolean withOperationField) {
+    RowType rowType = supplementMetaFields((RowType) catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), withOperationField);
+    Schema schema = AvroSchemaConverter.convertToSchema(rowType);
     MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
     String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
     Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
@@ -184,6 +192,19 @@ public class TableOptionProperties {
             e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
   }
 
+  private static RowType supplementMetaFields(RowType rowType, boolean withOperationField) {
+    Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS);
+    if (withOperationField) {
+      metaFields.add(OPERATION_METADATA_FIELD);
+    }
+    ArrayList<RowType.RowField> rowFields = new ArrayList<>();
+    for (String metaField : metaFields) {
+      rowFields.add(new RowType.RowField(metaField, new VarCharType(10000)));
+    }
+    rowFields.addAll(rowType.getFields());
+    return new RowType(false, rowFields);
+  }
+
   public static Map<String, String> translateSparkTableProperties2Flink(Map<String, String> options) {
     if (options.containsKey(CONNECTOR.key())) {
       return options;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 5d27cdadbbb..c697cb92509 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -139,6 +139,21 @@ public class TestHoodieHiveCatalog {
         .collect(Collectors.joining(","));
     assertEquals("par1:string", partitionSchema);
 
+    // validate spark schema properties
+    String avroSchemaStr = hiveTable.getParameters().get("spark.sql.sources.schema.part.0");
+    String expectedAvroSchemaStr = ""
+        + "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"uuid\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},"
+        + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},"
+        + "{\"name\":\"par1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}";
+    assertEquals(expectedAvroSchemaStr, avroSchemaStr);
+
     // validate catalog table
     CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
     assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));