You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/06 21:39:33 UTC

[GitHub] [flink] afedulov commented on a diff in pull request #19224: [hotfix][test][formats] Detach TableCsvFormatITCase from JsonPlanTestBase

afedulov commented on code in PR #19224:
URL: https://github.com/apache/flink/pull/19224#discussion_r844444614


##########
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/TableCsvFormatITCase.java:
##########
@@ -165,30 +269,87 @@ private static String formatSqlTimestamp(long timestamp) {
         return TimeFormats.SQL_TIMESTAMP_FORMAT.format(toLocalDateTime(timestamp));
     }
 
-    private void createSourceTable(String tableName, List<String> data, String... fieldNameAndTypes)
+    private void createSourceTable(String tableName, List<String> data, Schema schema)
             throws IOException {
+
         File sourceFile = TEMPORARY_FOLDER.newFile();
         Collections.shuffle(data);
         Files.write(sourceFile.toPath(), String.join("\n", data).getBytes());
 
-        Map<String, String> properties = new HashMap<>();
-        properties.put("connector", "filesystem");
-        properties.put("path", sourceFile.getAbsolutePath());
-        properties.put("format", "csv");
-
-        createTestSourceTable(tableName, fieldNameAndTypes, null, properties);
+        tableEnv.createTemporaryTable(
+                tableName,
+                TableDescriptor.forConnector(FileSystemTableFactory.IDENTIFIER)
+                        .option(FileSystemConnectorOptions.PATH, sourceFile.getAbsolutePath())
+                        .format(CsvCommons.IDENTIFIER)
+                        .schema(schema)
+                        .build());
     }
 
-    private File createSinkTable(String tableName, String... fieldNameAndTypes) throws IOException {
+    private File createSinkTable(String tableName, Schema schema) throws IOException {
         File sinkPath = TEMPORARY_FOLDER.newFolder();
 
-        Map<String, String> properties = new HashMap<>();
-        properties.put("connector", "filesystem");
-        properties.put("path", sinkPath.getAbsolutePath());
-        properties.put("format", "csv");
-        properties.put("csv.disable-quote-character", "true");
+        tableEnv.createTemporaryTable(
+                tableName,
+                TableDescriptor.forConnector(FileSystemTableFactory.IDENTIFIER)
+                        .option(FileSystemConnectorOptions.PATH, sinkPath.getAbsolutePath())
+                        .option("csv.disable-quote-character", "true")
+                        .format(CsvCommons.IDENTIFIER)
+                        .schema(schema)
+                        .build());
 
-        createTestSinkTable(tableName, fieldNameAndTypes, null, properties);
         return sinkPath;
     }
+
+    private void createTestValuesSourceTable(
+            String tableName,
+            List<Row> data,
+            Schema schema,
+            Map<String, String> extraProperties,
+            @Nullable String... partitionFields) {
+
+        String dataId = TestValuesTableFactory.registerData(data);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("data-id", dataId);
+        properties.put("bounded", "true");
+        properties.put("disable-lookup", "true");
+        properties.putAll(extraProperties);
+
+        TableDescriptor.Builder descriptor =
+                TableDescriptor.forConnector("values")
+                        .schema(schema)
+                        .partitionedBy(partitionFields);
+
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            descriptor.option(entry.getKey(), entry.getValue());
+        }
+
+        tableEnv.createTemporaryTable(tableName, descriptor.build());
+    }
+
+    private void assertResult(List<String> expected, File resultFile) throws IOException {
+        List<String> actual = readLines(resultFile);
+        assertResult(expected, actual);
+    }
+
+    private void assertResult(List<String> expected, List<String> actual) {
+        Collections.sort(expected);
+        Collections.sort(actual);
+        assertThat(actual).isEqualTo(expected);
+    }

Review Comment:
   Good catch, that was a residue from the JsonPlanTestBase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org