You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/11 14:58:23 UTC

[GitHub] [flink] twalthr commented on a diff in pull request #19349: [FLINK-27043][table] Removing old csv format references

twalthr commented on code in PR #19349:
URL: https://github.com/apache/flink/pull/19349#discussion_r847357250


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/WithTableEnvironment.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Using this annotation you can inject in the test method:
+ *
+ * <ul>
+ *   <li>{@link TableEnvironment}
+ *   <li>{@link StreamExecutionEnvironment} (Java or Scala)
+ *   <li>{@link StreamTableEnvironment} (Java or Scala)
+ * </ul>
+ *
+ * <p>When using with {@link ParameterizedTest}, make sure the table environment parameter is the
+ * last one in the signature.
+ */
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.PARAMETER})
+@Retention(RetentionPolicy.RUNTIME)
+@ExtendWith(TableJUnitExtensions.TableEnvironmentParameterResolver.class)
+public @interface WithTableEnvironment {

Review Comment:
   Shall we move this to `table-test-utils`?



##########
flink-table/flink-table-planner/src/test/resources/explain/testExecuteSqlWithExplainDetailsSelect.out:
##########
@@ -42,4 +42,4 @@ Calc(select=[a, b, c], where=[(a > 10)])
       "side" : "second"
     } ]
   } ]
-}
\ No newline at end of file
+}

Review Comment:
   remove these file changes



##########
flink-end-to-end-tests/test-scripts/test_batch_sql.sh:
##########
@@ -72,11 +72,14 @@ set_config_key "taskmanager.numberOfTaskSlots" "1"
 start_cluster
 
 # The task has total 2 x (1 + 1 + 1 + 1) + 1 = 9 slots
-$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "file://${OUTPUT_FILE_PATH}" -sqlStatement \
+$FLINK_DIR/bin/flink run -p 2 $TEST_PROGRAM_JAR -outputPath "${OUTPUT_FILE_PATH}" -sqlStatement \
     "INSERT INTO sinkTable $(sqlJobQuery)"
 
+# Concat result
+cat ${OUTPUT_FILE_PATH}/* > ${OUTPUT_FILE_PATH}/result.csv

Review Comment:
   did you check that the appending order is deterministic?



##########
flink-end-to-end-tests/flink-batch-sql-test/src/main/java/org/apache/flink/sql/tests/BatchSQLTestProgram.java:
##########
@@ -62,13 +63,17 @@ public static void main(String[] args) throws Exception {
                 .registerTableSourceInternal("table1", new GeneratorTableSource(10, 100, 60, 0));
         ((TableEnvironmentInternal) tEnv)
                 .registerTableSourceInternal("table2", new GeneratorTableSource(5, 0.2f, 60, 5));
-        ((TableEnvironmentInternal) tEnv)
-                .registerTableSinkInternal(
-                        "sinkTable",
-                        new CsvTableSink(outputPath)
-                                .configure(
-                                        new String[] {"f0", "f1"},
-                                        new TypeInformation[] {Types.INT, Types.SQL_TIMESTAMP}));
+        tEnv.createTemporaryTable(
+                "sinkTable",
+                TableDescriptor.forConnector("filesystem")
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("f0", DataTypes.INT())
+                                        .column("f1", DataTypes.TIMESTAMP(3))
+                                        .build())
+                        .option(FileSystemConnectorOptions.PATH, outputPath)
+                        .format(FormatDescriptor.forFormat("csv").build())

Review Comment:
   use `format("csv")`



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala:
##########
@@ -71,95 +73,82 @@ object TestTableSourceSinks {
          |  score DOUBLE,
          |  last STRING
          |) WITH (
-         |  'connector.type' = 'filesystem',
-         |  'connector.path' = '$getPersonCsvPath',
-         |  'format.type' = 'csv',
-         |  'format.field-delimiter' = '#',
-         |  'format.line-delimiter' = '$$',
-         |  'format.ignore-first-line' = 'true',
-         |  'format.comment-prefix' = '%'
+         |  'connector' = 'filesystem',
+         |  'path' = '${getPersonCsvPath(tempDir)}',
+         |  'format' = 'testcsv'
          |)
          |""".stripMargin)
   }
 
-  def createOrdersCsvTemporaryTable(tEnv: TableEnvironment, tableName: String): Unit = {
+  def createOrdersCsvTemporaryTable(
+    tEnv: TableEnvironment, tempDir: Path, tableName: String): Unit = {
     tEnv.executeSql(
       s"""
         |CREATE TEMPORARY TABLE $tableName (
         |  amount BIGINT,
         |  currency STRING,
         |  ts BIGINT
         |) WITH (
-        |  'connector.type' = 'filesystem',
-        |  'connector.path' = '$getOrdersCsvPath',
-        |  'format.type' = 'csv',
-        |  'format.field-delimiter' = ',',
-        |  'format.line-delimiter' = '$$'
+        |  'connector' = 'filesystem',
+        |  'path' = '${getOrdersCsvPath(tempDir)}',
+        |  'format' = 'testcsv'
         |)
         |""".stripMargin)
   }
 
-  def createRatesCsvTemporaryTable(tEnv: TableEnvironment, tableName: String): Unit = {
+  def createRatesCsvTemporaryTable(
+    tEnv: TableEnvironment, tempDir: Path, tableName: String): Unit = {
     tEnv.executeSql(
       s"""
         |CREATE TEMPORARY TABLE $tableName (
         |  currency STRING,
         |  rate BIGINT
         |) WITH (
-        |  'connector.type' = 'filesystem',
-        |  'connector.path' = '$getRatesCsvPath',
-        |  'format.type' = 'csv',
-        |  'format.field-delimiter' = ',',
-        |  'format.line-delimiter' = '$$'
+        |  'connector' = 'filesystem',
+        |  'path' = '${getRatesCsvPath(tempDir)}',
+        |  'format' = 'testcsv'
         |)
         |""".stripMargin)
   }
 
   def createCsvTemporarySinkTable(
       tEnv: TableEnvironment,
-      schema: TableSchema,
-      tableName: String,
-      numFiles: Int = 1): String = {
-    val tempFile = File.createTempFile("csv-test", null)
-    tempFile.deleteOnExit()
-    val path = tempFile.getAbsolutePath
-
-    val sinkOptions = collection.mutable.Map(
-      "connector.type" -> "filesystem",
-      "connector.path" -> path,
-      "format.type" -> "csv",
-      "format.write-mode" -> "OVERWRITE",
-      "format.num-files" -> numFiles.toString
-    )
-    sinkOptions.putAll(new Schema().schema(schema).toProperties)
-
-    val sink = new CsvBatchTableSinkFactory().createStreamTableSink(sinkOptions);
-    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(tableName, sink)
+      schema: org.apache.flink.table.api.Schema,

Review Comment:
   import this



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/test/WithTableEnvironment.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Using this annotation you can inject in the test method:
+ *
+ * <ul>
+ *   <li>{@link TableEnvironment}
+ *   <li>{@link StreamExecutionEnvironment} (Java or Scala)
+ *   <li>{@link StreamTableEnvironment} (Java or Scala)
+ * </ul>
+ *
+ * <p>The underlying parameter injector will infer automatically the type to use from the signature

Review Comment:
   Can we add a complete example in Java and Scala here? It will help in the adoption. One example with global class and one with parameters for environment.



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala:
##########
@@ -169,35 +158,31 @@ object TestTableSourceSinks {
     )
 
     writeToTempFile(
-      csvRecords.mkString("$"),
-      "csv-order-test",
-      "tmp")
+      tempDir,
+      csvRecords.mkString("\n"),
+      "csv-order-test")
   }
 
-  lazy val getRatesCsvPath = {
+  def getRatesCsvPath(tempDir: Path): String = {
     val csvRecords = Seq(
       "US Dollar,102",
       "Yen,1",
       "Euro,119",
       "RMB,702"
     )
     writeToTempFile(
-      csvRecords.mkString("$"),
-      "csv-rate-test",
-      "tmp")
-
+      tempDir,
+      csvRecords.mkString("\n"),
+      "csv-rate-test")
   }
 
   private def writeToTempFile(
+    tempDir: Path,

Review Comment:
   fix indention



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org