You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/05/24 11:31:10 UTC

[GitHub] [hudi] danny0405 commented on a change in pull request #2986: [HUDI-1919] Type mismatch when streaming read copy_on_write table using flink

danny0405 commented on a change in pull request #2986:
URL: https://github.com/apache/hudi/pull/2986#discussion_r637879103



##########
File path: hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
##########
@@ -480,15 +533,42 @@ private void execInsertSql(TableEnvironment tEnv, String insert) {
     }
   }
 
-  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException {
-    tEnv.executeSql(TestConfigurations.getCollectSinkDDL("sink"));
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, boolean streaming)
+          throws TableNotExistException, InterruptedException {
+    if (streaming) {
+      final String[] splits = select.split(" ");
+      final String tableName = splits[splits.length - 1];
+      return execSelectSql(tEnv, select, 10, tableName);
+    } else {
+      return CollectionUtil.iterableToList(
+          () -> tEnv.sqlQuery("select * from t1").execute().collect());
+    }
+  }
+
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout)
+          throws InterruptedException, TableNotExistException {
+    return execSelectSql(tEnv, select, timeout, null);
+  }
+
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable)
+          throws InterruptedException, TableNotExistException {
+    final String sinkDDL;
+    if (sourceTable != null) {
+      // use the source table schema as the sink schema if the source table was specified, .
+      ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable);
+      TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema();
+      sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema);
+    } else {
+      sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
+    }
+    tEnv.executeSql(sinkDDL);
     TableResult tableResult = tEnv.executeSql("insert into sink " + select);
     // wait for the timeout then cancels the job
     TimeUnit.SECONDS.sleep(timeout);
     tableResult.getJobClient().ifPresent(JobClient::cancel);
     tEnv.executeSql("DROP TABLE IF EXISTS sink");
     return CollectSinkTableFactory.RESULT.values().stream()
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
   }

Review comment:
       Fix the indention

##########
File path: hudi-flink/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
##########
@@ -104,7 +106,7 @@ public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
         .filter(nonPartNames::contains).collect(Collectors.toList());
 
     int[] selParquetFields = selNonPartNames.stream()
-        .mapToInt(nonPartNames::indexOf)
+        .mapToInt(fullNames::indexOf)
         .toArray();

Review comment:
       indexOf is not very efficient




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org