You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by wa...@apache.org on 2021/05/25 03:37:00 UTC
[hudi] branch master updated: [HUDI-1919] Type mismatch when
streaming read copy_on_write table using flink (#2986)
This is an automated email from the ASF dual-hosted git repository.
wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new aba1ead [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink (#2986)
aba1ead is described below
commit aba1eadbfc015095f31271b2648faa7023126b99
Author: Town <62...@users.noreply.github.com>
AuthorDate: Tue May 25 11:36:43 2021 +0800
[HUDI-1919] Type mismatch when streaming read copy_on_write table using flink (#2986)
* [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink #2976
* Update ParquetSplitReaderUtil.java
---
.../table/format/cow/ParquetSplitReaderUtil.java | 11 ++-
.../apache/hudi/table/HoodieDataSourceITCase.java | 84 +++++++++++++++++++++-
.../org/apache/hudi/utils/TestConfigurations.java | 22 ++++++
3 files changed, 108 insertions(+), 9 deletions(-)
diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index b626e88..778598f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -95,16 +95,13 @@ public class ParquetSplitReaderUtil {
Path path,
long splitStart,
long splitLength) throws IOException {
- List<String> nonPartNames = Arrays.stream(fullFieldNames)
- .filter(n -> !partitionSpec.containsKey(n))
- .collect(Collectors.toList());
-
List<String> selNonPartNames = Arrays.stream(selectedFields)
.mapToObj(i -> fullFieldNames[i])
- .filter(nonPartNames::contains).collect(Collectors.toList());
+ .filter(n -> !partitionSpec.containsKey(n))
+ .collect(Collectors.toList());
- int[] selParquetFields = selNonPartNames.stream()
- .mapToInt(nonPartNames::indexOf)
+ int[] selParquetFields = Arrays.stream(selectedFields)
+ .filter(i -> !partitionSpec.containsKey(fullFieldNames[i]))
.toArray();
ParquetColumnarRowSplitReader.ColumnBatchGenerator gen = readVectors -> {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index bb80cf1..f475d56 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -33,8 +33,11 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
@@ -43,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.util.Collection;
@@ -282,6 +286,55 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
@ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWriteAndReadParMiddle(boolean streaming) throws Exception {
+ String hoodieTableDDL = "create table t1(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " `partition` varchar(20),\n" // test streaming read with partition field in the middle
+ + " ts timestamp(3),\n"
+ + " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ + ")\n"
+ + "PARTITIONED BY (`partition`)\n"
+ + "with (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'path' = '" + tempFile.getAbsolutePath() + "',\n"
+ + " 'read.streaming.enabled' = '" + streaming + "'\n"
+ + ")";
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,'par1',TIMESTAMP '1970-01-01 00:00:01'),\n"
+ + "('id2','Stephen',33,'par1',TIMESTAMP '1970-01-01 00:00:02'),\n"
+ + "('id3','Julian',53,'par2',TIMESTAMP '1970-01-01 00:00:03'),\n"
+ + "('id4','Fabian',31,'par2',TIMESTAMP '1970-01-01 00:00:04'),\n"
+ + "('id5','Sophia',18,'par3',TIMESTAMP '1970-01-01 00:00:05'),\n"
+ + "('id6','Emma',20,'par3',TIMESTAMP '1970-01-01 00:00:06'),\n"
+ + "('id7','Bob',44,'par4',TIMESTAMP '1970-01-01 00:00:07'),\n"
+ + "('id8','Han',56,'par4',TIMESTAMP '1970-01-01 00:00:08')";
+ execInsertSql(streamTableEnv, insertInto);
+
+ final String expected = "["
+ + "id1,Danny,23,par1,1970-01-01T00:00:01, "
+ + "id2,Stephen,33,par1,1970-01-01T00:00:02, "
+ + "id3,Julian,53,par2,1970-01-01T00:00:03, "
+ + "id4,Fabian,31,par2,1970-01-01T00:00:04, "
+ + "id5,Sophia,18,par3,1970-01-01T00:00:05, "
+ + "id6,Emma,20,par3,1970-01-01T00:00:06, "
+ + "id7,Bob,44,par4,1970-01-01T00:00:07, "
+ + "id8,Han,56,par4,1970-01-01T00:00:08]";
+
+ List<Row> result = execSelectSql(streamTableEnv, "select * from t1", streaming);
+
+ assertRowsEquals(result, expected);
+
+ // insert another batch of data
+ execInsertSql(streamTableEnv, insertInto);
+ List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", streaming);
+ assertRowsEquals(result2, expected);
+ }
+
+ @ParameterizedTest
@EnumSource(value = ExecMode.class)
void testInsertOverwrite(ExecMode execMode) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
@@ -480,8 +533,35 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
}
}
- private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException {
- tEnv.executeSql(TestConfigurations.getCollectSinkDDL("sink"));
+ private List<Row> execSelectSql(TableEnvironment tEnv, String select, boolean streaming)
+ throws TableNotExistException, InterruptedException {
+ if (streaming) {
+ final String[] splits = select.split(" ");
+ final String tableName = splits[splits.length - 1];
+ return execSelectSql(tEnv, select, 10, tableName);
+ } else {
+ return CollectionUtil.iterableToList(
+ () -> tEnv.sqlQuery("select * from t1").execute().collect());
+ }
+ }
+
+ private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout)
+ throws InterruptedException, TableNotExistException {
+ return execSelectSql(tEnv, select, timeout, null);
+ }
+
+ private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable)
+ throws InterruptedException, TableNotExistException {
+ final String sinkDDL;
+ if (sourceTable != null) {
+ // use the source table schema as the sink schema if the source table was specified, .
+ ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
+ TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema();
+ sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema);
+ } else {
+ sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
+ }
+ tEnv.executeSql(sinkDDL);
TableResult tableResult = tEnv.executeSql("insert into sink " + select);
// wait for the timeout then cancels the job
TimeUnit.SECONDS.sleep(timeout);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index f5b9fb4..c0dbfce 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -127,6 +127,28 @@ public class TestConfigurations {
+ ")";
}
+ public static String getCollectSinkDDL(String tableName, TableSchema tableSchema) {
+ final StringBuilder builder = new StringBuilder("create table " + tableName + "(\n");
+ String[] fieldNames = tableSchema.getFieldNames();
+ DataType[] fieldTypes = tableSchema.getFieldDataTypes();
+ for (int i = 0; i < fieldNames.length; i++) {
+ builder.append(" `")
+ .append(fieldNames[i])
+ .append("` ")
+ .append(fieldTypes[i].toString());
+ if (i != fieldNames.length - 1) {
+ builder.append(",");
+ }
+ builder.append("\n");
+ }
+ final String withProps = ""
+ + ") with (\n"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+ + ")";
+ builder.append(withProps);
+ return builder.toString();
+ }
+
public static final RowDataSerializer SERIALIZER = new RowDataSerializer(ROW_TYPE);
public static Configuration getDefaultConf(String tablePath) {