You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/02 23:48:31 UTC

[GitHub] [iceberg] stevenzwu commented on a diff in pull request #4904: Flink: new sink base on the unified sink API

stevenzwu commented on code in PR #4904:
URL: https://github.com/apache/iceberg/pull/4904#discussion_r936120650


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java:
##########
@@ -51,31 +51,40 @@ public class TestFlinkUpsert extends FlinkCatalogTestBase {
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
   private final boolean isStreamingJob;
+  private final boolean useNewSink;
   private final Map<String, String> tableUpsertProps = Maps.newHashMap();
   private TableEnvironment tEnv;
 
   public TestFlinkUpsert(
-      String catalogName, Namespace baseNamespace, FileFormat format, Boolean isStreamingJob) {
+      String catalogName,
+      Namespace baseNamespace,
+      FileFormat format,
+      Boolean isStreamingJob,
+      Boolean useNewSink) {
     super(catalogName, baseNamespace);
     this.isStreamingJob = isStreamingJob;
+    this.useNewSink = useNewSink;
     tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
     tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
     tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
   }
 
   @Parameterized.Parameters(
-      name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
+      name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, useNewSink={4}")
   public static Iterable<Object[]> parameters() {
     List<Object[]> parameters = Lists.newArrayList();
     for (FileFormat format :
         new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO, FileFormat.ORC}) {
       for (Boolean isStreaming : new Boolean[] {true, false}) {
-        // Only test with one catalog as this is a file operation concern.
-        // FlinkCatalogTestBase requires the catalog name start with testhadoop if using hadoop
-        // catalog.
-        String catalogName = "testhadoop";
-        Namespace baseNamespace = Namespace.of("default");
-        parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming});
+        for (Boolean useNewSink : new Boolean[] {true, false}) {

Review Comment:
   nit: inconsistent style with the TestFlinkTableSink, where we use two lines of `add` (rather a for loop like here).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org