You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2021/05/25 03:37:00 UTC

[hudi] branch master updated: [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink (#2986)

This is an automated email from the ASF dual-hosted git repository.

wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aba1ead  [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink (#2986)
aba1ead is described below

commit aba1eadbfc015095f31271b2648faa7023126b99
Author: Town <62...@users.noreply.github.com>
AuthorDate: Tue May 25 11:36:43 2021 +0800

    [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink (#2986)
    
    * [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink #2976
    
    * Update ParquetSplitReaderUtil.java
---
 .../table/format/cow/ParquetSplitReaderUtil.java   | 11 ++-
 .../apache/hudi/table/HoodieDataSourceITCase.java  | 84 +++++++++++++++++++++-
 .../org/apache/hudi/utils/TestConfigurations.java  | 22 ++++++
 3 files changed, 108 insertions(+), 9 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index b626e88..778598f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -95,16 +95,13 @@ public class ParquetSplitReaderUtil {
       Path path,
       long splitStart,
       long splitLength) throws IOException {
-    List<String> nonPartNames = Arrays.stream(fullFieldNames)
-        .filter(n -> !partitionSpec.containsKey(n))
-        .collect(Collectors.toList());
-
     List<String> selNonPartNames = Arrays.stream(selectedFields)
         .mapToObj(i -> fullFieldNames[i])
-        .filter(nonPartNames::contains).collect(Collectors.toList());
+        .filter(n -> !partitionSpec.containsKey(n))
+        .collect(Collectors.toList());
 
-    int[] selParquetFields = selNonPartNames.stream()
-        .mapToInt(nonPartNames::indexOf)
+    int[] selParquetFields = Arrays.stream(selectedFields)
+        .filter(i -> !partitionSpec.containsKey(fullFieldNames[i]))
         .toArray();
 
     ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index bb80cf1..f475d56 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -33,8 +33,11 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
@@ -43,6 +46,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.util.Collection;
@@ -282,6 +286,55 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   }
 
   @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testWriteAndReadParMiddle(boolean streaming) throws Exception {
+    String hoodieTableDDL = "create table t1(\n"
+            + "  uuid varchar(20),\n"
+            + "  name varchar(10),\n"
+            + "  age int,\n"
+            + "  `partition` varchar(20),\n" // test streaming read with partition field in the middle
+            + "  ts timestamp(3),\n"
+            + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
+            + ")\n"
+            + "PARTITIONED BY (`partition`)\n"
+            + "with (\n"
+            + "  'connector' = 'hudi',\n"
+            + "  'path' = '" + tempFile.getAbsolutePath() + "',\n"
+            + "  'read.streaming.enabled' = '" + streaming + "'\n"
+            + ")";
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+            + "('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n"
+            + "('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n"
+            + "('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n"
+            + "('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n"
+            + "('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n"
+            + "('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n"
+            + "('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n"
+            + "('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')";
+    execInsertSql(streamTableEnv, insertInto);
+
+    final String expected = "["
+            + "id1,Danny,23,par1,1970-01-01T00:00:01, "
+            + "id2,Stephen,33,par1,1970-01-01T00:00:02, "
+            + "id3,Julian,53,par2,1970-01-01T00:00:03, "
+            + "id4,Fabian,31,par2,1970-01-01T00:00:04, "
+            + "id5,Sophia,18,par3,1970-01-01T00:00:05, "
+            + "id6,Emma,20,par3,1970-01-01T00:00:06, "
+            + "id7,Bob,44,par4,1970-01-01T00:00:07, "
+            + "id8,Han,56,par4,1970-01-01T00:00:08]";
+
+    List<Row> result = execSelectSql(streamTableEnv, "select * from t1", streaming);
+
+    assertRowsEquals(result, expected);
+
+    // insert another batch of data
+    execInsertSql(streamTableEnv, insertInto);
+    List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", streaming);
+    assertRowsEquals(result2, expected);
+  }
+
+  @ParameterizedTest
   @EnumSource(value = ExecMode.class)
   void testInsertOverwrite(ExecMode execMode) {
     TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
@@ -480,8 +533,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     }
   }
 
-  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException {
-    tEnv.executeSql(TestConfigurations.getCollectSinkDDL("sink"));
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, boolean streaming)
+          throws TableNotExistException, InterruptedException {
+    if (streaming) {
+      final String[] splits = select.split(" ");
+      final String tableName = splits[splits.length - 1];
+      return execSelectSql(tEnv, select, 10, tableName);
+    } else {
+      return CollectionUtil.iterableToList(
+          () -> tEnv.sqlQuery("select * from t1").execute().collect());
+    }
+  }
+
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout)
+          throws InterruptedException, TableNotExistException {
+    return execSelectSql(tEnv, select, timeout, null);
+  }
+
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable)
+          throws InterruptedException, TableNotExistException {
+    final String sinkDDL;
+    if (sourceTable != null) {
+      // use the source table schema as the sink schema if the source table was specified, .
+      ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
+      TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema();
+      sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema);
+    } else {
+      sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
+    }
+    tEnv.executeSql(sinkDDL);
     TableResult tableResult = tEnv.executeSql("insert into sink " + select);
     // wait for the timeout then cancels the job
     TimeUnit.SECONDS.sleep(timeout);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index f5b9fb4..c0dbfce 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -127,6 +127,28 @@ public class TestConfigurations {
         + ")";
   }
 
+  public static String getCollectSinkDDL(String tableName, TableSchema tableSchema) {
+    final StringBuilder builder = new StringBuilder("create table " + tableName + "(\n");
+    String[] fieldNames = tableSchema.getFieldNames();
+    DataType[] fieldTypes = tableSchema.getFieldDataTypes();
+    for (int i = 0; i < fieldNames.length; i++) {
+      builder.append("  `")
+              .append(fieldNames[i])
+              .append("` ")
+              .append(fieldTypes[i].toString());
+      if (i != fieldNames.length - 1) {
+        builder.append(",");
+      }
+      builder.append("\n");
+    }
+    final String withProps = ""
+            + ") with (\n"
+            + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+            + ")";
+    builder.append(withProps);
+    return builder.toString();
+  }
+
   public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE);
 
   public static Configuration getDefaultConf(String tablePath) {