You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/02 09:23:43 UTC
[hudi] 01/08: [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 22b634b081571f01d2d9de4e94ce240cecc46d7d
Author: chao chen <59...@users.noreply.github.com>
AuthorDate: Wed Feb 1 09:45:39 2023 +0800
[HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706)
Co-authored-by: danny0405 <yu...@gmail.com>
---
.../apache/hudi/table/catalog/HiveSchemaUtils.java | 8 ++++---
.../hudi/table/catalog/HoodieHiveCatalog.java | 2 +-
.../hudi/table/catalog/TableOptionProperties.java | 25 ++++++++++++++++++++--
.../hudi/table/catalog/TestHoodieHiveCatalog.java | 15 +++++++++++++
4 files changed, 44 insertions(+), 6 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index 4383b42e9f8..fac507cb7db 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -181,9 +181,11 @@ public class HiveSchemaUtils {
*/
public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, boolean withOperationField) {
List<FieldSchema> columns = new ArrayList<>();
- Collection<String> metaFields = withOperationField
- ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence
- : HoodieRecord.HOODIE_META_COLUMNS;
+ Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS);
+ if (withOperationField) {
+ metaFields.add(HoodieRecord.OPERATION_METADATA_FIELD);
+ }
+
for (String metaField : metaFields) {
columns.add(new FieldSchema(metaField, "string", null));
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 6dcdf118415..fd36a39d237 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -597,7 +597,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat));
serdeProperties.put("serialization.format", "1");
- serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys));
+ serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys, withOperationField));
sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index a0864bbf377..6e327bdc612 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.catalog;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
@@ -28,6 +29,8 @@ import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.avro.Schema;
import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -39,7 +42,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -49,6 +54,7 @@ import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
/**
@@ -168,8 +174,10 @@ public class TableOptionProperties {
CatalogTable catalogTable,
Configuration hadoopConf,
Map<String, String> properties,
- List<String> partitionKeys) {
- Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
+ List<String> partitionKeys,
+ boolean withOperationField) {
+ RowType rowType = supplementMetaFields((RowType) catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), withOperationField);
+ Schema schema = AvroSchemaConverter.convertToSchema(rowType);
MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(
@@ -184,6 +192,19 @@ public class TableOptionProperties {
e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue()));
}
+ private static RowType supplementMetaFields(RowType rowType, boolean withOperationField) {
+ Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS);
+ if (withOperationField) {
+ metaFields.add(OPERATION_METADATA_FIELD);
+ }
+ ArrayList<RowType.RowField> rowFields = new ArrayList<>();
+ for (String metaField : metaFields) {
+ rowFields.add(new RowType.RowField(metaField, new VarCharType(10000)));
+ }
+ rowFields.addAll(rowType.getFields());
+ return new RowType(false, rowFields);
+ }
+
public static Map<String, String> translateSparkTableProperties2Flink(Map<String, String> options) {
if (options.containsKey(CONNECTOR.key())) {
return options;
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 5d27cdadbbb..c697cb92509 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -139,6 +139,21 @@ public class TestHoodieHiveCatalog {
.collect(Collectors.joining(","));
assertEquals("par1:string", partitionSchema);
+ // validate spark schema properties
+ String avroSchemaStr = hiveTable.getParameters().get("spark.sql.sources.schema.part.0");
+ String expectedAvroSchemaStr = ""
+ + "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"uuid\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},"
+ + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},"
+ + "{\"name\":\"par1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}";
+ assertEquals(expectedAvroSchemaStr, avroSchemaStr);
+
// validate catalog table
CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));